feat(appstore,workspace): add App Store upgrade progress tracking and interactive workspace processes
This commit is contained in:
+15
-25
@@ -47,6 +47,10 @@ export interface IMigrationResult {
|
||||
warnings: string[];
|
||||
}
|
||||
|
||||
export interface IAppStoreUpgradeApplyOptions {
|
||||
onProgress?: (progressArg: { step: string; message: string }) => void | Promise<void>;
|
||||
}
|
||||
|
||||
export class AppStoreManager {
|
||||
private appStoreCache: IAppStoreIndex | null = null;
|
||||
private appStoreResolver: plugins.servezoneAppstore.AppStoreResolver;
|
||||
@@ -297,16 +301,13 @@ export class AppStoreManager {
|
||||
serviceNameArg: string,
|
||||
migrationResultArg: IMigrationResult,
|
||||
newVersionArg: string,
|
||||
optionsArg: IAppStoreUpgradeApplyOptions = {},
|
||||
): Promise<IService> {
|
||||
const service = this.oneboxRef.database.getServiceByName(serviceNameArg);
|
||||
if (!service) {
|
||||
throw new Error(`Service not found: ${serviceNameArg}`);
|
||||
}
|
||||
|
||||
if (service.containerID && service.status === 'running') {
|
||||
await this.oneboxRef.services.stopService(serviceNameArg);
|
||||
}
|
||||
|
||||
const updates: Partial<IService> = {
|
||||
appTemplateVersion: newVersionArg,
|
||||
};
|
||||
@@ -336,29 +337,18 @@ export class AppStoreManager {
|
||||
updates.envVars = mergedEnvVars;
|
||||
}
|
||||
|
||||
this.oneboxRef.database.updateService(service.id!, updates);
|
||||
|
||||
const newImage = migrationResultArg.image || service.image;
|
||||
if (migrationResultArg.image && migrationResultArg.image !== service.image) {
|
||||
await this.oneboxRef.docker.pullImage(newImage);
|
||||
}
|
||||
|
||||
const updatedService = this.oneboxRef.database.getServiceByName(serviceNameArg)!;
|
||||
if (service.containerID) {
|
||||
try {
|
||||
await this.oneboxRef.docker.removeContainer(service.containerID, true);
|
||||
} catch {
|
||||
// Container might already be gone.
|
||||
}
|
||||
}
|
||||
|
||||
const containerID = await this.oneboxRef.docker.createContainer(updatedService);
|
||||
this.oneboxRef.database.updateService(service.id!, { containerID, status: 'starting' });
|
||||
await this.oneboxRef.docker.startContainer(containerID);
|
||||
this.oneboxRef.database.updateService(service.id!, { status: 'running' });
|
||||
const updatedService = await this.oneboxRef.services.updateService(
|
||||
serviceNameArg,
|
||||
updates,
|
||||
{
|
||||
onProgress: async (progressArg) => {
|
||||
await optionsArg.onProgress?.(progressArg);
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
logger.success(`Service '${serviceNameArg}' upgraded to App Store version ${newVersionArg}`);
|
||||
return this.oneboxRef.database.getServiceByName(serviceNameArg)!;
|
||||
return updatedService;
|
||||
}
|
||||
|
||||
public normalizeVolumes(volumesArg: IAppStoreVersionConfig['volumes'] = []): IServiceVolume[] {
|
||||
|
||||
+45
-21
@@ -14,6 +14,12 @@ type TExpandedPublishedPort = Required<Pick<
|
||||
'targetPort' | 'publishedPort' | 'protocol' | 'hostIp'
|
||||
>>;
|
||||
|
||||
export interface IInteractiveContainerExec {
|
||||
stream: plugins.nodeStream.Duplex;
|
||||
close: () => Promise<void>;
|
||||
inspect: () => Promise<{ ExitCode?: number | null; Running?: boolean }>;
|
||||
}
|
||||
|
||||
export class OneboxDockerManager {
|
||||
private dockerClient: InstanceType<typeof plugins.docker.Docker> | null = null;
|
||||
private networkName = 'onebox-network';
|
||||
@@ -1128,32 +1134,37 @@ export class OneboxDockerManager {
|
||||
/**
|
||||
* Execute a command in a running container
|
||||
*/
|
||||
private async resolveContainer(containerID: string): Promise<any> {
|
||||
let container: any = null;
|
||||
try {
|
||||
container = await this.dockerClient!.getContainerById(containerID);
|
||||
} catch {
|
||||
// Not a direct container ID — try Swarm service lookup.
|
||||
}
|
||||
|
||||
if (!container) {
|
||||
const serviceContainerId = await this.getContainerIdForService(containerID);
|
||||
if (serviceContainerId) {
|
||||
try {
|
||||
container = await this.dockerClient!.getContainerById(serviceContainerId);
|
||||
} catch {
|
||||
// Service container also not found.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!container) {
|
||||
throw new Error(`Container not found: ${containerID}`);
|
||||
}
|
||||
return container;
|
||||
}
|
||||
|
||||
async execInContainer(
|
||||
containerID: string,
|
||||
cmd: string[]
|
||||
): Promise<{ stdout: string; stderr: string; exitCode: number }> {
|
||||
try {
|
||||
let container: any = null;
|
||||
try {
|
||||
container = await this.dockerClient!.getContainerById(containerID);
|
||||
} catch {
|
||||
// Not a direct container ID — try Swarm service lookup
|
||||
}
|
||||
|
||||
if (!container) {
|
||||
const serviceContainerId = await this.getContainerIdForService(containerID);
|
||||
if (serviceContainerId) {
|
||||
try {
|
||||
container = await this.dockerClient!.getContainerById(serviceContainerId);
|
||||
} catch {
|
||||
// Service container also not found
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!container) {
|
||||
throw new Error(`Container not found: ${containerID}`);
|
||||
}
|
||||
const container = await this.resolveContainer(containerID);
|
||||
|
||||
const { stream, inspect } = await container.exec(cmd, {
|
||||
attachStdout: true,
|
||||
@@ -1190,6 +1201,19 @@ export class OneboxDockerManager {
|
||||
}
|
||||
}
|
||||
|
||||
async startInteractiveExecInContainer(
|
||||
containerID: string,
|
||||
cmd: string[],
|
||||
): Promise<IInteractiveContainerExec> {
|
||||
const container = await this.resolveContainer(containerID);
|
||||
return await container.exec(cmd, {
|
||||
tty: true,
|
||||
attachStdin: true,
|
||||
attachStdout: true,
|
||||
attachStderr: true,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a platform service container (MongoDB, MinIO, etc.)
|
||||
* Platform containers are long-running infrastructure services
|
||||
|
||||
+48
-1
@@ -11,6 +11,26 @@ import { OneboxDatabase } from './database.ts';
|
||||
import { OneboxDockerManager } from './docker.ts';
|
||||
import type { PlatformServicesManager } from './platform-services/index.ts';
|
||||
|
||||
export type TServiceUpdateProgressStep =
|
||||
| 'stopping'
|
||||
| 'pulling-image'
|
||||
| 'updating-record'
|
||||
| 'removing-container'
|
||||
| 'creating-container'
|
||||
| 'starting'
|
||||
| 'restoring-route'
|
||||
| 'syncing-gateway'
|
||||
| 'complete';
|
||||
|
||||
export interface IServiceUpdateProgress {
|
||||
step: TServiceUpdateProgressStep;
|
||||
message: string;
|
||||
}
|
||||
|
||||
export interface IServiceUpdateOptions {
|
||||
onProgress?: (progressArg: IServiceUpdateProgress) => void | Promise<void>;
|
||||
}
|
||||
|
||||
export class OneboxServicesManager {
|
||||
private oneboxRef: any; // Will be Onebox instance
|
||||
private database: OneboxDatabase;
|
||||
@@ -583,9 +603,15 @@ export class OneboxServicesManager {
|
||||
envVars?: Record<string, string>;
|
||||
volumes?: IService['volumes'];
|
||||
publishedPorts?: IService['publishedPorts'];
|
||||
}
|
||||
imageDigest?: string;
|
||||
appTemplateVersion?: string;
|
||||
},
|
||||
optionsArg: IServiceUpdateOptions = {},
|
||||
): Promise<IService> {
|
||||
try {
|
||||
const emitProgress = async (step: TServiceUpdateProgressStep, message: string) => {
|
||||
await optionsArg.onProgress?.({ step, message });
|
||||
};
|
||||
const service = this.database.getServiceByName(name);
|
||||
if (!service) {
|
||||
throw new Error(`Service not found: ${name}`);
|
||||
@@ -599,6 +625,7 @@ export class OneboxServicesManager {
|
||||
// Stop the container if running
|
||||
if (wasRunning && oldContainerID) {
|
||||
logger.info(`Stopping service ${name} for updates...`);
|
||||
await emitProgress('stopping', `Stopping ${name} before updating its container`);
|
||||
try {
|
||||
await this.docker.stopContainer(oldContainerID);
|
||||
} catch (error) {
|
||||
@@ -609,10 +636,12 @@ export class OneboxServicesManager {
|
||||
// Pull new image if changed
|
||||
if (updates.image && updates.image !== service.image) {
|
||||
logger.info(`Pulling new image: ${updates.image}`);
|
||||
await emitProgress('pulling-image', `Pulling image ${updates.image}`);
|
||||
await this.docker.pullImage(updates.image, updates.registry || service.registry);
|
||||
}
|
||||
|
||||
// Update service in database
|
||||
await emitProgress('updating-record', `Updating service record for ${name}`);
|
||||
const updateData: any = {
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
@@ -623,6 +652,8 @@ export class OneboxServicesManager {
|
||||
if (updates.envVars !== undefined) updateData.envVars = updates.envVars;
|
||||
if (updates.volumes !== undefined) updateData.volumes = updates.volumes;
|
||||
if (updates.publishedPorts !== undefined) updateData.publishedPorts = updates.publishedPorts;
|
||||
if (updates.imageDigest !== undefined) updateData.imageDigest = updates.imageDigest;
|
||||
if (updates.appTemplateVersion !== undefined) updateData.appTemplateVersion = updates.appTemplateVersion;
|
||||
|
||||
this.database.updateService(service.id!, updateData);
|
||||
|
||||
@@ -631,6 +662,7 @@ export class OneboxServicesManager {
|
||||
|
||||
// Remove old container
|
||||
if (oldContainerID) {
|
||||
await emitProgress('removing-container', `Removing old container for ${name}`);
|
||||
try {
|
||||
await this.docker.removeContainer(oldContainerID, true);
|
||||
logger.info(`Removed old container for ${name}`);
|
||||
@@ -641,6 +673,7 @@ export class OneboxServicesManager {
|
||||
|
||||
// Create new container with updated config
|
||||
logger.info(`Creating new container for ${name}...`);
|
||||
await emitProgress('creating-container', `Creating replacement container for ${name}`);
|
||||
const containerID = await this.docker.createContainer(updatedService);
|
||||
this.database.updateService(service.id!, { containerID });
|
||||
|
||||
@@ -674,6 +707,7 @@ export class OneboxServicesManager {
|
||||
// Restart the container if it was running
|
||||
if (wasRunning) {
|
||||
logger.info(`Starting updated service ${name}...`);
|
||||
await emitProgress('starting', `Starting updated service ${name}`);
|
||||
this.database.updateService(service.id!, { status: 'starting' });
|
||||
await this.docker.startContainer(containerID);
|
||||
this.database.updateService(service.id!, { status: 'running' });
|
||||
@@ -685,8 +719,21 @@ export class OneboxServicesManager {
|
||||
|
||||
const refreshedService = this.database.getServiceByName(name)!;
|
||||
if (refreshedService.domain && refreshedService.status === 'running') {
|
||||
await emitProgress('restoring-route', `Restoring route ${refreshedService.domain} -> ${refreshedService.port}`);
|
||||
try {
|
||||
await this.oneboxRef.reverseProxy.addRoute(
|
||||
refreshedService.id!,
|
||||
refreshedService.domain,
|
||||
refreshedService.port,
|
||||
);
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to restore reverse proxy route for ${refreshedService.domain}: ${getErrorMessage(error)}`);
|
||||
throw error;
|
||||
}
|
||||
await emitProgress('syncing-gateway', `Syncing external gateway route for ${refreshedService.domain}`);
|
||||
await this.syncExternalGatewayRoute(refreshedService);
|
||||
}
|
||||
await emitProgress('complete', `Service ${name} update completed`);
|
||||
await this.broadcastServiceUpdate(name, 'updated');
|
||||
return refreshedService;
|
||||
} catch (error) {
|
||||
|
||||
@@ -3,15 +3,205 @@ import { logger } from '../../logging.ts';
|
||||
import type { OpsServer } from '../classes.opsserver.ts';
|
||||
import * as interfaces from '../../../ts_interfaces/index.ts';
|
||||
import { requireAdminIdentity } from '../helpers/guards.ts';
|
||||
import { getErrorMessage } from '../../utils/error.ts';
|
||||
|
||||
type IAppStoreUpgradeOperation = interfaces.requests.IAppStoreUpgradeOperation;
|
||||
type TAppStoreUpgradeStep = interfaces.requests.TAppStoreUpgradeStep;
|
||||
|
||||
export class AppStoreHandler {
|
||||
public typedrouter = new plugins.typedrequest.TypedRouter();
|
||||
private upgradeOperations = new Map<string, IAppStoreUpgradeOperation>();
|
||||
|
||||
constructor(private opsServerRef: OpsServer) {
|
||||
this.opsServerRef.typedrouter.addTypedRouter(this.typedrouter);
|
||||
this.registerHandlers();
|
||||
}
|
||||
|
||||
private getUpgradeOperations(): IAppStoreUpgradeOperation[] {
|
||||
return Array.from(this.upgradeOperations.values())
|
||||
.sort((a, b) => b.startedAt - a.startedAt)
|
||||
.slice(0, 25);
|
||||
}
|
||||
|
||||
private getRunningUpgrade(serviceNameArg: string): IAppStoreUpgradeOperation | null {
|
||||
for (const operation of this.upgradeOperations.values()) {
|
||||
if (operation.serviceName === serviceNameArg && operation.status === 'running') {
|
||||
return operation;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private async createUpgradeOperation(
|
||||
serviceNameArg: string,
|
||||
targetVersionArg: string,
|
||||
): Promise<IAppStoreUpgradeOperation> {
|
||||
const existingRunning = this.getRunningUpgrade(serviceNameArg);
|
||||
if (existingRunning) {
|
||||
throw new plugins.typedrequest.TypedResponseError(
|
||||
`An upgrade is already running for ${serviceNameArg}`,
|
||||
);
|
||||
}
|
||||
|
||||
const existingService = this.opsServerRef.oneboxRef.database.getServiceByName(serviceNameArg);
|
||||
if (!existingService) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Service not found: ${serviceNameArg}`);
|
||||
}
|
||||
if (!existingService.appTemplateId) {
|
||||
throw new plugins.typedrequest.TypedResponseError('Service was not deployed from an app template');
|
||||
}
|
||||
if (!existingService.appTemplateVersion) {
|
||||
throw new plugins.typedrequest.TypedResponseError('Service has no tracked template version');
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const operation: IAppStoreUpgradeOperation = {
|
||||
id: crypto.randomUUID(),
|
||||
serviceName: existingService.name,
|
||||
appTemplateId: existingService.appTemplateId,
|
||||
fromVersion: existingService.appTemplateVersion,
|
||||
targetVersion: targetVersionArg,
|
||||
status: 'running',
|
||||
step: 'queued',
|
||||
progressLines: [`Queued upgrade ${existingService.appTemplateVersion} -> ${targetVersionArg}`],
|
||||
warnings: [],
|
||||
startedAt: now,
|
||||
updatedAt: now,
|
||||
};
|
||||
|
||||
this.upgradeOperations.set(operation.id, operation);
|
||||
await this.pushUpgradeProgress(operation);
|
||||
return operation;
|
||||
}
|
||||
|
||||
private async updateUpgradeOperation(
|
||||
operationIdArg: string,
|
||||
stepArg: TAppStoreUpgradeStep,
|
||||
messageArg: string,
|
||||
updatesArg: Partial<IAppStoreUpgradeOperation> = {},
|
||||
): Promise<IAppStoreUpgradeOperation> {
|
||||
const existing = this.upgradeOperations.get(operationIdArg);
|
||||
if (!existing) {
|
||||
throw new Error(`Upgrade operation not found: ${operationIdArg}`);
|
||||
}
|
||||
|
||||
const nextOperation: IAppStoreUpgradeOperation = {
|
||||
...existing,
|
||||
...updatesArg,
|
||||
step: stepArg,
|
||||
updatedAt: Date.now(),
|
||||
progressLines: [...existing.progressLines, messageArg].slice(-200),
|
||||
};
|
||||
this.upgradeOperations.set(operationIdArg, nextOperation);
|
||||
await this.pushUpgradeProgress(nextOperation);
|
||||
return nextOperation;
|
||||
}
|
||||
|
||||
private async pushUpgradeProgress(operationArg: IAppStoreUpgradeOperation): Promise<void> {
|
||||
await this.opsServerRef.pushDashboardEvent('pushAppStoreUpgradeProgress', {
|
||||
operation: operationArg,
|
||||
});
|
||||
}
|
||||
|
||||
private async performUpgrade(operationIdArg: string): Promise<interfaces.data.IService> {
|
||||
let operation = this.upgradeOperations.get(operationIdArg);
|
||||
if (!operation) {
|
||||
throw new Error(`Upgrade operation not found: ${operationIdArg}`);
|
||||
}
|
||||
|
||||
try {
|
||||
operation = await this.updateUpgradeOperation(
|
||||
operation.id,
|
||||
'validating',
|
||||
`Validating ${operation.serviceName} for App Store upgrade`,
|
||||
);
|
||||
|
||||
const existingService = this.opsServerRef.oneboxRef.database.getServiceByName(operation.serviceName);
|
||||
if (!existingService) {
|
||||
throw new Error(`Service not found: ${operation.serviceName}`);
|
||||
}
|
||||
if (!existingService.appTemplateId || !existingService.appTemplateVersion) {
|
||||
throw new Error('Service is missing App Store template metadata');
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Upgrading service '${operation.serviceName}' from v${operation.fromVersion} to v${operation.targetVersion}`,
|
||||
);
|
||||
|
||||
await this.updateUpgradeOperation(
|
||||
operation.id,
|
||||
'migration',
|
||||
`Resolving migration for ${operation.appTemplateId} ${operation.fromVersion} -> ${operation.targetVersion}`,
|
||||
);
|
||||
|
||||
const migrationResult = await this.opsServerRef.oneboxRef.appStore.executeMigration(
|
||||
existingService,
|
||||
operation.fromVersion,
|
||||
operation.targetVersion,
|
||||
);
|
||||
|
||||
if (!migrationResult.success) {
|
||||
throw new Error(`Migration failed: ${migrationResult.warnings.join('; ')}`);
|
||||
}
|
||||
|
||||
if (migrationResult.warnings.length > 0) {
|
||||
operation = await this.updateUpgradeOperation(
|
||||
operation.id,
|
||||
'migration',
|
||||
`Migration completed with ${migrationResult.warnings.length} warning(s)`,
|
||||
{ warnings: migrationResult.warnings },
|
||||
);
|
||||
}
|
||||
|
||||
await this.updateUpgradeOperation(
|
||||
operation.id,
|
||||
'applying',
|
||||
`Applying upgrade to ${operation.serviceName}`,
|
||||
);
|
||||
|
||||
const updatedService = await this.opsServerRef.oneboxRef.appStore.applyUpgrade(
|
||||
operation.serviceName,
|
||||
migrationResult,
|
||||
operation.targetVersion,
|
||||
{
|
||||
onProgress: async (progressArg) => {
|
||||
await this.updateUpgradeOperation(
|
||||
operation!.id,
|
||||
progressArg.step as TAppStoreUpgradeStep,
|
||||
progressArg.message,
|
||||
);
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
await this.updateUpgradeOperation(
|
||||
operation.id,
|
||||
'complete',
|
||||
`Upgrade completed for ${operation.serviceName}`,
|
||||
{
|
||||
status: 'success',
|
||||
completedAt: Date.now(),
|
||||
service: updatedService,
|
||||
warnings: migrationResult.warnings,
|
||||
},
|
||||
);
|
||||
|
||||
return updatedService;
|
||||
} catch (error) {
|
||||
await this.updateUpgradeOperation(
|
||||
operation.id,
|
||||
'failed',
|
||||
`Upgrade failed: ${getErrorMessage(error)}`,
|
||||
{
|
||||
status: 'failed',
|
||||
completedAt: Date.now(),
|
||||
error: getErrorMessage(error),
|
||||
},
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private registerHandlers(): void {
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetAppStoreTemplates>(
|
||||
@@ -66,44 +256,38 @@ export class AppStoreHandler {
|
||||
'upgradeAppStoreService',
|
||||
async (dataArg) => {
|
||||
await requireAdminIdentity(this.opsServerRef.adminHandler, dataArg);
|
||||
|
||||
const existingService = this.opsServerRef.oneboxRef.database.getServiceByName(dataArg.serviceName);
|
||||
if (!existingService) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Service not found: ${dataArg.serviceName}`);
|
||||
}
|
||||
if (!existingService.appTemplateId) {
|
||||
throw new plugins.typedrequest.TypedResponseError('Service was not deployed from an app template');
|
||||
}
|
||||
if (!existingService.appTemplateVersion) {
|
||||
throw new plugins.typedrequest.TypedResponseError('Service has no tracked template version');
|
||||
}
|
||||
|
||||
logger.info(`Upgrading service '${dataArg.serviceName}' from v${existingService.appTemplateVersion} to v${dataArg.targetVersion}`);
|
||||
|
||||
const migrationResult = await this.opsServerRef.oneboxRef.appStore.executeMigration(
|
||||
existingService,
|
||||
existingService.appTemplateVersion,
|
||||
dataArg.targetVersion,
|
||||
);
|
||||
|
||||
if (!migrationResult.success) {
|
||||
throw new plugins.typedrequest.TypedResponseError(
|
||||
`Migration failed: ${migrationResult.warnings.join('; ')}`,
|
||||
);
|
||||
}
|
||||
|
||||
const updatedService = await this.opsServerRef.oneboxRef.appStore.applyUpgrade(
|
||||
dataArg.serviceName,
|
||||
migrationResult,
|
||||
dataArg.targetVersion,
|
||||
);
|
||||
const operation = await this.createUpgradeOperation(dataArg.serviceName, dataArg.targetVersion);
|
||||
const updatedService = await this.performUpgrade(operation.id);
|
||||
const completedOperation = this.upgradeOperations.get(operation.id)!;
|
||||
|
||||
return {
|
||||
service: updatedService,
|
||||
warnings: migrationResult.warnings,
|
||||
warnings: completedOperation.warnings,
|
||||
};
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_StartAppStoreServiceUpgrade>(
|
||||
'startAppStoreServiceUpgrade',
|
||||
async (dataArg) => {
|
||||
await requireAdminIdentity(this.opsServerRef.adminHandler, dataArg);
|
||||
const operation = await this.createUpgradeOperation(dataArg.serviceName, dataArg.targetVersion);
|
||||
void this.performUpgrade(operation.id).catch(() => {});
|
||||
return { operation };
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetAppStoreUpgradeOperations>(
|
||||
'getAppStoreUpgradeOperations',
|
||||
async (dataArg) => {
|
||||
await requireAdminIdentity(this.opsServerRef.adminHandler, dataArg);
|
||||
return { operations: this.getUpgradeOperations() };
|
||||
},
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,21 @@ import * as interfaces from '../../../ts_interfaces/index.ts';
|
||||
import { requireAdminIdentity } from '../helpers/guards.ts';
|
||||
import { getErrorMessage } from '../../utils/error.ts';
|
||||
|
||||
interface IWorkspaceProcessSession {
|
||||
processId: string;
|
||||
serviceName: string;
|
||||
userId: string;
|
||||
stream: plugins.nodeStream.Duplex;
|
||||
close: () => Promise<void>;
|
||||
inspect: () => Promise<{ ExitCode?: number | null; Running?: boolean }>;
|
||||
finalized: boolean;
|
||||
}
|
||||
|
||||
const getWorkspaceProcessTag = (processIdArg: string) => `workspaceProcess:${processIdArg}`;
|
||||
|
||||
export class WorkspaceHandler {
|
||||
public typedrouter = new plugins.typedrequest.TypedRouter();
|
||||
private workspaceProcesses = new Map<string, IWorkspaceProcessSession>();
|
||||
|
||||
constructor(private opsServerRef: OpsServer) {
|
||||
this.opsServerRef.typedrouter.addTypedRouter(this.typedrouter);
|
||||
@@ -24,6 +37,111 @@ export class WorkspaceHandler {
|
||||
return service.containerID;
|
||||
}
|
||||
|
||||
private validateProcessId(processIdArg: string): void {
|
||||
if (!/^[a-zA-Z0-9_-]{8,80}$/.test(processIdArg)) {
|
||||
throw new plugins.typedrequest.TypedResponseError('Invalid workspace process id');
|
||||
}
|
||||
}
|
||||
|
||||
private async getShellCommandForContainer(
|
||||
containerIdArg: string,
|
||||
): Promise<interfaces.requests.IWorkspaceShellCommand> {
|
||||
const candidates: interfaces.requests.IWorkspaceShellCommand[] = [
|
||||
{ command: '/bin/bash', args: ['-il'], label: 'bash', prompt: '# ' },
|
||||
{ command: 'bash', args: ['-il'], label: 'bash', prompt: '# ' },
|
||||
{ command: '/bin/sh', args: ['-i'], label: 'sh', prompt: '# ' },
|
||||
{ command: 'sh', args: ['-i'], label: 'sh', prompt: '# ' },
|
||||
{ command: '/bin/ash', args: ['-i'], label: 'ash', prompt: '# ' },
|
||||
{ command: 'ash', args: ['-i'], label: 'ash', prompt: '# ' },
|
||||
{ command: '/usr/bin/zsh', args: ['-il'], label: 'zsh', prompt: '# ' },
|
||||
{ command: 'zsh', args: ['-il'], label: 'zsh', prompt: '# ' },
|
||||
];
|
||||
|
||||
for (const candidate of candidates) {
|
||||
const result = await this.opsServerRef.oneboxRef.docker.execInContainer(
|
||||
containerIdArg,
|
||||
[candidate.command, '-c', 'printf onebox-shell'],
|
||||
);
|
||||
if (result.exitCode === 0 && result.stdout.includes('onebox-shell')) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
|
||||
throw new plugins.typedrequest.TypedResponseError(
|
||||
'No supported interactive shell found in the target container',
|
||||
);
|
||||
}
|
||||
|
||||
private async getProcessSession(
|
||||
dataArg: { identity: interfaces.data.IIdentity; processId: string },
|
||||
): Promise<IWorkspaceProcessSession> {
|
||||
const identity = await requireAdminIdentity(this.opsServerRef.adminHandler, dataArg);
|
||||
this.validateProcessId(dataArg.processId);
|
||||
const session = this.workspaceProcesses.get(dataArg.processId);
|
||||
if (!session) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Workspace process not found: ${dataArg.processId}`);
|
||||
}
|
||||
if (session.userId !== identity.userId) {
|
||||
throw new plugins.typedrequest.TypedResponseError('Workspace process belongs to another session');
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
private async pushWorkspaceProcessOutput(processIdArg: string, outputArg: string): Promise<void> {
|
||||
const typedsocket = (this.opsServerRef.server as any)?.typedserver?.typedsocket;
|
||||
if (!typedsocket) return;
|
||||
|
||||
const connections = await typedsocket.findAllTargetConnectionsByTag(getWorkspaceProcessTag(processIdArg));
|
||||
await Promise.allSettled(
|
||||
connections.map((connection: any) => typedsocket
|
||||
.createTypedRequest(
|
||||
'pushWorkspaceProcessOutput',
|
||||
connection,
|
||||
)
|
||||
.fire({ processId: processIdArg, output: outputArg })),
|
||||
);
|
||||
}
|
||||
|
||||
private async pushWorkspaceProcessExit(processIdArg: string, exitCodeArg: number): Promise<void> {
|
||||
const typedsocket = (this.opsServerRef.server as any)?.typedserver?.typedsocket;
|
||||
if (!typedsocket) return;
|
||||
|
||||
const connections = await typedsocket.findAllTargetConnectionsByTag(getWorkspaceProcessTag(processIdArg));
|
||||
await Promise.allSettled(
|
||||
connections.map((connection: any) => typedsocket
|
||||
.createTypedRequest(
|
||||
'pushWorkspaceProcessExit',
|
||||
connection,
|
||||
)
|
||||
.fire({ processId: processIdArg, exitCode: exitCodeArg })),
|
||||
);
|
||||
}
|
||||
|
||||
private async finalizeWorkspaceProcess(processIdArg: string, fallbackExitCodeArg = -1): Promise<void> {
|
||||
const session = this.workspaceProcesses.get(processIdArg);
|
||||
if (!session || session.finalized) return;
|
||||
session.finalized = true;
|
||||
|
||||
let exitCode = fallbackExitCodeArg;
|
||||
try {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
const inspectResult = await session.inspect();
|
||||
if (typeof inspectResult.ExitCode === 'number') {
|
||||
exitCode = inspectResult.ExitCode;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.debug(`Failed to inspect workspace process ${processIdArg}: ${getErrorMessage(error)}`);
|
||||
}
|
||||
|
||||
this.workspaceProcesses.delete(processIdArg);
|
||||
await this.pushWorkspaceProcessExit(processIdArg, exitCode);
|
||||
try {
|
||||
await session.close();
|
||||
} catch {
|
||||
// The hijacked connection may already be closed by Docker.
|
||||
}
|
||||
}
|
||||
|
||||
private registerHandlers(): void {
|
||||
// Read file from container
|
||||
this.typedrouter.addTypedHandler(
|
||||
@@ -176,6 +294,108 @@ export class WorkspaceHandler {
|
||||
),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_WorkspaceGetShellCommand>(
|
||||
'workspaceGetShellCommand',
|
||||
async (dataArg) => {
|
||||
await requireAdminIdentity(this.opsServerRef.adminHandler, dataArg);
|
||||
const containerId = await this.resolveContainerId(dataArg.serviceName);
|
||||
const shellCommand = await this.getShellCommandForContainer(containerId);
|
||||
return { shellCommand };
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_WorkspaceStartProcess>(
|
||||
'workspaceStartProcess',
|
||||
async (dataArg) => {
|
||||
const identity = await requireAdminIdentity(this.opsServerRef.adminHandler, dataArg);
|
||||
this.validateProcessId(dataArg.processId);
|
||||
if (this.workspaceProcesses.has(dataArg.processId)) {
|
||||
throw new plugins.typedrequest.TypedResponseError(`Workspace process already exists: ${dataArg.processId}`);
|
||||
}
|
||||
|
||||
const containerId = await this.resolveContainerId(dataArg.serviceName);
|
||||
const command = dataArg.args ? [dataArg.command, ...dataArg.args] : [dataArg.command];
|
||||
const interactiveExec = await this.opsServerRef.oneboxRef.docker.startInteractiveExecInContainer(
|
||||
containerId,
|
||||
command,
|
||||
);
|
||||
|
||||
const session: IWorkspaceProcessSession = {
|
||||
processId: dataArg.processId,
|
||||
serviceName: dataArg.serviceName,
|
||||
userId: identity.userId,
|
||||
stream: interactiveExec.stream,
|
||||
close: interactiveExec.close,
|
||||
inspect: interactiveExec.inspect,
|
||||
finalized: false,
|
||||
};
|
||||
this.workspaceProcesses.set(dataArg.processId, session);
|
||||
|
||||
interactiveExec.stream.on('data', (chunk: Uint8Array | string) => {
|
||||
const output = typeof chunk === 'string' ? chunk : new TextDecoder().decode(chunk);
|
||||
void this.pushWorkspaceProcessOutput(dataArg.processId, output);
|
||||
});
|
||||
interactiveExec.stream.on('error', (error: Error) => {
|
||||
void this.pushWorkspaceProcessOutput(
|
||||
dataArg.processId,
|
||||
`\r\n[workspace process error: ${getErrorMessage(error)}]\r\n`,
|
||||
);
|
||||
void this.finalizeWorkspaceProcess(dataArg.processId, -1);
|
||||
});
|
||||
interactiveExec.stream.on('end', () => {
|
||||
void this.finalizeWorkspaceProcess(dataArg.processId, -1);
|
||||
});
|
||||
interactiveExec.stream.on('close', () => {
|
||||
void this.finalizeWorkspaceProcess(dataArg.processId, -1);
|
||||
});
|
||||
|
||||
return { processId: dataArg.processId };
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_WorkspaceProcessInput>(
|
||||
'workspaceProcessInput',
|
||||
async (dataArg) => {
|
||||
const session = await this.getProcessSession(dataArg);
|
||||
if (session.finalized || session.stream.writableEnded) {
|
||||
return {};
|
||||
}
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
session.stream.write(dataArg.input, (error?: Error | null) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
return {};
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
this.typedrouter.addTypedHandler(
|
||||
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_WorkspaceKillProcess>(
|
||||
'workspaceKillProcess',
|
||||
async (dataArg) => {
|
||||
const session = await this.getProcessSession(dataArg);
|
||||
session.stream.destroy();
|
||||
try {
|
||||
await session.close();
|
||||
} catch {
|
||||
// The stream may already be closed.
|
||||
}
|
||||
await this.finalizeWorkspaceProcess(dataArg.processId, -1);
|
||||
return {};
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
logger.info('Workspace handler registered');
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user