feat(platform-services): Add platform service log streaming, improve health checks and provisioning robustness
This commit is contained in:
@@ -83,6 +83,12 @@ export class OneboxHttpServer {
|
||||
return this.handleLogStreamUpgrade(req, serviceName);
|
||||
}
|
||||
|
||||
// Platform service log streaming WebSocket
|
||||
if (path.startsWith('/api/platform-services/') && path.endsWith('/logs/stream') && req.headers.get('upgrade') === 'websocket') {
|
||||
const platformType = path.split('/')[3];
|
||||
return this.handlePlatformLogStreamUpgrade(req, platformType);
|
||||
}
|
||||
|
||||
// Network access logs WebSocket
|
||||
if (path === '/api/network/logs/stream' && req.headers.get('upgrade') === 'websocket') {
|
||||
return this.handleNetworkLogStreamUpgrade(req, new URL(req.url));
|
||||
@@ -1060,6 +1066,123 @@ export class OneboxHttpServer {
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle WebSocket upgrade for platform service log streaming
|
||||
*/
|
||||
private handlePlatformLogStreamUpgrade(req: Request, platformType: string): Response {
|
||||
const { socket, response } = Deno.upgradeWebSocket(req);
|
||||
|
||||
socket.onopen = async () => {
|
||||
logger.info(`Platform log stream WebSocket connected for: ${platformType}`);
|
||||
|
||||
try {
|
||||
// Get the platform service from database
|
||||
const platformService = this.oneboxRef.database.getPlatformServiceByType(platformType as any);
|
||||
if (!platformService) {
|
||||
socket.send(JSON.stringify({ error: 'Platform service not found' }));
|
||||
socket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!platformService.containerId) {
|
||||
socket.send(JSON.stringify({ error: 'Platform service has no container' }));
|
||||
socket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the container
|
||||
logger.info(`Looking up container for platform service ${platformType}, containerID: ${platformService.containerId}`);
|
||||
const container = await this.oneboxRef.docker.getContainerById(platformService.containerId);
|
||||
|
||||
if (!container) {
|
||||
logger.error(`Container not found for platform service ${platformType}, containerID: ${platformService.containerId}`);
|
||||
socket.send(JSON.stringify({ error: 'Container not found' }));
|
||||
socket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
// Start streaming logs
|
||||
const logStream = await container.streamLogs({
|
||||
stdout: true,
|
||||
stderr: true,
|
||||
timestamps: true,
|
||||
tail: 100, // Start with last 100 lines
|
||||
});
|
||||
|
||||
// Send initial connection message
|
||||
socket.send(JSON.stringify({
|
||||
type: 'connected',
|
||||
serviceName: platformType,
|
||||
}));
|
||||
|
||||
// Demultiplex and pipe log data to WebSocket
|
||||
// Docker streams use 8-byte headers: [STREAM_TYPE, 0, 0, 0, SIZE_BYTE1, SIZE_BYTE2, SIZE_BYTE3, SIZE_BYTE4]
|
||||
let buffer = new Uint8Array(0);
|
||||
|
||||
logStream.on('data', (chunk: Uint8Array) => {
|
||||
if (socket.readyState !== WebSocket.OPEN) return;
|
||||
|
||||
// Append new data to buffer
|
||||
const newBuffer = new Uint8Array(buffer.length + chunk.length);
|
||||
newBuffer.set(buffer);
|
||||
newBuffer.set(chunk, buffer.length);
|
||||
buffer = newBuffer;
|
||||
|
||||
// Process complete frames
|
||||
while (buffer.length >= 8) {
|
||||
// Read frame size from header (bytes 4-7, big-endian)
|
||||
const frameSize = (buffer[4] << 24) | (buffer[5] << 16) | (buffer[6] << 8) | buffer[7];
|
||||
|
||||
// Check if we have the complete frame
|
||||
if (buffer.length < 8 + frameSize) {
|
||||
break; // Wait for more data
|
||||
}
|
||||
|
||||
// Extract the frame data (skip 8-byte header)
|
||||
const frameData = buffer.slice(8, 8 + frameSize);
|
||||
|
||||
// Send the clean log line
|
||||
socket.send(new TextDecoder().decode(frameData));
|
||||
|
||||
// Remove processed frame from buffer
|
||||
buffer = buffer.slice(8 + frameSize);
|
||||
}
|
||||
});
|
||||
|
||||
logStream.on('error', (error: Error) => {
|
||||
logger.error(`Platform log stream error for ${platformType}: ${getErrorMessage(error)}`);
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.send(JSON.stringify({ error: getErrorMessage(error) }));
|
||||
}
|
||||
});
|
||||
|
||||
logStream.on('end', () => {
|
||||
logger.info(`Platform log stream ended for ${platformType}`);
|
||||
socket.close();
|
||||
});
|
||||
|
||||
// Clean up on close
|
||||
socket.onclose = () => {
|
||||
logger.info(`Platform log stream WebSocket closed for ${platformType}`);
|
||||
logStream.destroy();
|
||||
};
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`Failed to start platform log stream for ${platformType}: ${getErrorMessage(error)}`);
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.send(JSON.stringify({ error: getErrorMessage(error) }));
|
||||
socket.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
socket.onerror = (error) => {
|
||||
logger.error(`Platform log stream WebSocket error: ${error}`);
|
||||
};
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle WebSocket upgrade for network access log streaming
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user