Files
onebox/ts/opsserver/handlers/platform.handler.ts

330 lines
13 KiB
TypeScript

import * as plugins from '../../plugins.ts';
import { logger } from '../../logging.ts';
import type { OpsServer } from '../classes.opsserver.ts';
import * as interfaces from '../../../ts_interfaces/index.ts';
import { requireValidIdentity } from '../helpers/guards.ts';
export class PlatformHandler {
public typedrouter = new plugins.typedrequest.TypedRouter();
private activeLogStreams = new Map<string, boolean>();
constructor(private opsServerRef: OpsServer) {
this.opsServerRef.typedrouter.addTypedRouter(this.typedrouter);
this.registerHandlers();
this.startLogStreaming();
}
/**
* Start streaming logs from all running containers (platform + user services)
* and push new entries to connected dashboard clients via TypedSocket
*/
private async startLogStreaming(): Promise<void> {
const checkAndStream = async () => {
// Stream platform service containers
const platformServices = this.opsServerRef.oneboxRef.database.getAllPlatformServices();
for (const service of platformServices) {
if (service.status !== 'running' || !service.containerId) continue;
const key = `platform:${service.type}`;
if (this.activeLogStreams.has(key)) continue;
this.activeLogStreams.set(key, true);
logger.info(`Starting log stream for platform service: ${service.type}`);
try {
await this.opsServerRef.oneboxRef.docker.streamContainerLogs(
service.containerId,
(line: string, isError: boolean) => {
this.pushPlatformLogToClients(service.type as interfaces.data.TPlatformServiceType, line, isError);
}
);
} catch (err) {
logger.warn(`Log stream failed for ${service.type}: ${(err as Error).message}`);
this.activeLogStreams.delete(key);
}
}
// Stream user service containers
const userServices = this.opsServerRef.oneboxRef.services.listServices();
for (const service of userServices) {
if (service.status !== 'running' || !service.containerID) continue;
const key = `service:${service.name}`;
if (this.activeLogStreams.has(key)) continue;
this.activeLogStreams.set(key, true);
logger.info(`Starting log stream for user service: ${service.name}`);
try {
await this.opsServerRef.oneboxRef.docker.streamContainerLogs(
service.containerID,
(line: string, isError: boolean) => {
this.pushServiceLogToClients(service.name, line, isError);
}
);
} catch (err) {
logger.warn(`Log stream failed for ${service.name}: ${(err as Error).message}`);
this.activeLogStreams.delete(key);
}
}
};
// Initial check after a short delay (let services start first)
setTimeout(() => checkAndStream(), 5000);
// Re-check periodically for newly started services
setInterval(() => checkAndStream(), 15000);
}
private parseLogLine(line: string, isError: boolean): { timestamp: string; level: string; message: string } {
const tsMatch = line.match(/^(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)\s+(.*)/);
const timestamp = tsMatch ? tsMatch[1] : new Date().toISOString();
const message = tsMatch ? tsMatch[2] : line;
const msgLower = message.toLowerCase();
const level = isError || msgLower.includes('error') || msgLower.includes('fatal')
? 'error'
: msgLower.includes('warn')
? 'warn'
: 'info';
return { timestamp, level, message };
}
private pushPlatformLogToClients(
serviceType: interfaces.data.TPlatformServiceType,
line: string,
isError: boolean,
): void {
const typedsocket = (this.opsServerRef.server as any)?.typedserver?.typedsocket;
if (!typedsocket) return;
const entry = this.parseLogLine(line, isError);
typedsocket.findAllTargetConnectionsByTag('role', 'ops_dashboard')
.then((connections: any[]) => {
for (const conn of connections) {
typedsocket.createTypedRequest<interfaces.requests.IReq_PushPlatformServiceLog>(
'pushPlatformServiceLog',
conn,
).fire({ serviceType, entry }).catch(() => {});
}
})
.catch(() => {});
}
private pushServiceLogToClients(
serviceName: string,
line: string,
isError: boolean,
): void {
const typedsocket = (this.opsServerRef.server as any)?.typedserver?.typedsocket;
if (!typedsocket) return;
const entry = this.parseLogLine(line, isError);
typedsocket.findAllTargetConnectionsByTag('role', 'ops_dashboard')
.then((connections: any[]) => {
for (const conn of connections) {
typedsocket.createTypedRequest<interfaces.requests.IReq_PushServiceLog>(
'pushServiceLog',
conn,
).fire({ serviceName, entry }).catch(() => {});
}
})
.catch(() => {});
}
private registerHandlers(): void {
// Get all platform services
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetPlatformServices>(
'getPlatformServices',
async (dataArg) => {
await requireValidIdentity(this.opsServerRef.adminHandler, dataArg);
const platformServices = this.opsServerRef.oneboxRef.platformServices.getAllPlatformServices();
const providers = this.opsServerRef.oneboxRef.platformServices.getAllProviders();
const result = providers.map((provider: any) => {
const service = platformServices.find((s: any) => s.type === provider.type);
const isCore = 'isCore' in provider && (provider as any).isCore === true;
let status: string = service?.status || 'not-deployed';
if (provider.type === 'caddy') {
const proxyStatus = this.opsServerRef.oneboxRef.reverseProxy.getStatus() as any;
status = (proxyStatus.running ?? proxyStatus.http?.running) ? 'running' : 'stopped';
}
return {
type: provider.type,
displayName: provider.displayName,
resourceTypes: provider.resourceTypes,
status: status as interfaces.data.TPlatformServiceStatus,
containerId: service?.containerId,
isCore,
createdAt: service?.createdAt,
updatedAt: service?.updatedAt,
};
});
return { platformServices: result as interfaces.data.IPlatformService[] };
},
),
);
// Get specific platform service
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetPlatformService>(
'getPlatformService',
async (dataArg) => {
await requireValidIdentity(this.opsServerRef.adminHandler, dataArg);
const provider = this.opsServerRef.oneboxRef.platformServices.getProvider(dataArg.serviceType);
if (!provider) {
throw new plugins.typedrequest.TypedResponseError(`Unknown platform service type: ${dataArg.serviceType}`);
}
const service = this.opsServerRef.oneboxRef.database.getPlatformServiceByType(dataArg.serviceType);
const isCore = 'isCore' in provider && (provider as any).isCore === true;
let rawStatus: string = service?.status || 'not-deployed';
if (dataArg.serviceType === 'caddy') {
const proxyStatus = this.opsServerRef.oneboxRef.reverseProxy.getStatus() as any;
rawStatus = (proxyStatus.running ?? proxyStatus.http?.running) ? 'running' : 'stopped';
}
return {
platformService: {
type: provider.type,
displayName: provider.displayName,
resourceTypes: provider.resourceTypes,
status: rawStatus as interfaces.data.TPlatformServiceStatus,
containerId: service?.containerId,
isCore,
createdAt: service?.createdAt,
updatedAt: service?.updatedAt,
} as interfaces.data.IPlatformService,
};
},
),
);
// Start platform service
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_StartPlatformService>(
'startPlatformService',
async (dataArg) => {
await requireValidIdentity(this.opsServerRef.adminHandler, dataArg);
const provider = this.opsServerRef.oneboxRef.platformServices.getProvider(dataArg.serviceType);
if (!provider) {
throw new plugins.typedrequest.TypedResponseError(`Unknown platform service type: ${dataArg.serviceType}`);
}
logger.info(`Starting platform service: ${dataArg.serviceType}`);
const service = await this.opsServerRef.oneboxRef.platformServices.ensureRunning(dataArg.serviceType);
return {
platformService: {
type: service.type,
displayName: provider.displayName,
resourceTypes: provider.resourceTypes,
status: service.status,
containerId: service.containerId,
},
};
},
),
);
// Stop platform service
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_StopPlatformService>(
'stopPlatformService',
async (dataArg) => {
await requireValidIdentity(this.opsServerRef.adminHandler, dataArg);
const provider = this.opsServerRef.oneboxRef.platformServices.getProvider(dataArg.serviceType);
if (!provider) {
throw new plugins.typedrequest.TypedResponseError(`Unknown platform service type: ${dataArg.serviceType}`);
}
const isCore = 'isCore' in provider && (provider as any).isCore === true;
if (isCore) {
throw new plugins.typedrequest.TypedResponseError(
`${provider.displayName} is a core service and cannot be stopped`,
);
}
logger.info(`Stopping platform service: ${dataArg.serviceType}`);
await this.opsServerRef.oneboxRef.platformServices.stopPlatformService(dataArg.serviceType);
return {
platformService: {
type: dataArg.serviceType,
displayName: provider.displayName,
resourceTypes: provider.resourceTypes,
status: 'stopped' as const,
},
};
},
),
);
// Get platform service stats
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetPlatformServiceStats>(
'getPlatformServiceStats',
async (dataArg) => {
await requireValidIdentity(this.opsServerRef.adminHandler, dataArg);
const service = this.opsServerRef.oneboxRef.database.getPlatformServiceByType(dataArg.serviceType);
if (!service || !service.containerId) {
throw new plugins.typedrequest.TypedResponseError('Platform service has no container');
}
const stats = await this.opsServerRef.oneboxRef.docker.getContainerStats(service.containerId);
if (!stats) {
throw new plugins.typedrequest.TypedResponseError('Could not retrieve container stats');
}
return { stats };
},
),
);
// Get platform service logs
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetPlatformServiceLogs>(
'getPlatformServiceLogs',
async (dataArg) => {
await requireValidIdentity(this.opsServerRef.adminHandler, dataArg);
const service = this.opsServerRef.oneboxRef.database.getPlatformServiceByType(dataArg.serviceType);
if (!service || !service.containerId) {
throw new plugins.typedrequest.TypedResponseError('Platform service has no container');
}
const tail = dataArg.tail || 100;
const rawLogs = await this.opsServerRef.oneboxRef.docker.getContainerLogs(service.containerId, tail);
// Parse raw log output into structured entries
const logLines = (rawLogs.stdout + rawLogs.stderr)
.split('\n')
.filter((line: string) => line.trim());
const logs = logLines.map((line: string, index: number) => {
// Try to parse Docker timestamp from beginning of line
const tsMatch = line.match(/^(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)\s+(.*)/);
const timestamp = tsMatch ? new Date(tsMatch[1]).getTime() : Date.now();
const message = tsMatch ? tsMatch[2] : line;
const msgLower = message.toLowerCase();
const isError = msgLower.includes('error') || msgLower.includes('fatal');
const isWarn = msgLower.includes('warn');
return {
id: index,
serviceId: 0,
timestamp,
message,
level: (isError ? 'error' : isWarn ? 'warn' : 'info') as 'info' | 'warn' | 'error' | 'debug',
source: 'stdout' as const,
};
});
return { logs };
},
),
);
}
}