Files
onebox/ts_web/environments/backend-environment.ts
T

277 lines
8.9 KiB
TypeScript

/**
* BackendExecutionEnvironment — implements IExecutionEnvironment
* by routing all filesystem and process operations through the onebox API
* to Docker exec on the target container.
*/
import * as plugins from '../plugins.js';
import * as interfaces from '../../ts_interfaces/index.js';
// Import IExecutionEnvironment type from dees-catalog
type IExecutionEnvironment = import('@design.estate/dees-catalog').IExecutionEnvironment;
type IFileEntry = import('@design.estate/dees-catalog').IFileEntry;
type IFileWatcher = import('@design.estate/dees-catalog').IFileWatcher;
type IProcessHandle = import('@design.estate/dees-catalog').IProcessHandle;
type IWorkspaceShellCommand = interfaces.requests.IWorkspaceShellCommand;
const domtools = plugins.deesElement.domtools;
interface IWorkspaceProcessState {
outputController: ReadableStreamDefaultController<string>;
resolveExit: (exitCodeArg: number) => void;
}
export class BackendExecutionEnvironment implements IExecutionEnvironment {
readonly type = 'backend' as const;
private _ready = false;
private identity: interfaces.data.IIdentity;
private processRouter = new plugins.domtools.plugins.typedrequest.TypedRouter();
private processSocket: InstanceType<typeof plugins.typedsocket.TypedSocket> | null = null;
private processSocketPromise: Promise<InstanceType<typeof plugins.typedsocket.TypedSocket>> | null = null;
private processStates = new Map<string, IWorkspaceProcessState>();
constructor(
private serviceName: string,
identity: interfaces.data.IIdentity,
) {
this.identity = identity;
this.registerProcessSocketHandlers();
}
get ready(): boolean {
return this._ready;
}
async init(): Promise<void> {
// Verify the container is accessible by checking if root exists
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceExists>(
'workspaceExists',
{ path: '/' },
);
if (!result.exists) {
throw new Error(`Cannot access container filesystem for service: ${this.serviceName}`);
}
this._ready = true;
}
async destroy(): Promise<void> {
for (const processId of Array.from(this.processStates.keys())) {
await this.killProcess(processId).catch(() => {});
}
await this.processSocket?.stop().catch(() => {});
this.processSocket = null;
this.processSocketPromise = null;
this._ready = false;
}
async readFile(path: string): Promise<string> {
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceReadFile>(
'workspaceReadFile',
{ path },
);
return result.content;
}
async writeFile(path: string, contents: string): Promise<void> {
await this.fireRequest<interfaces.requests.IReq_WorkspaceWriteFile>(
'workspaceWriteFile',
{ path, content: contents },
);
}
async readDir(path: string): Promise<IFileEntry[]> {
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceReadDir>(
'workspaceReadDir',
{ path },
);
return result.entries;
}
async mkdir(path: string): Promise<void> {
await this.fireRequest<interfaces.requests.IReq_WorkspaceMkdir>(
'workspaceMkdir',
{ path },
);
}
async rm(path: string, options?: { recursive?: boolean }): Promise<void> {
await this.fireRequest<interfaces.requests.IReq_WorkspaceRm>(
'workspaceRm',
{ path, recursive: options?.recursive },
);
}
async exists(path: string): Promise<boolean> {
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceExists>(
'workspaceExists',
{ path },
);
return result.exists;
}
watch(
_path: string,
_callback: (event: 'rename' | 'change', filename: string | null) => void,
_options?: { recursive?: boolean },
): IFileWatcher {
// Polling-based file watching — check for changes periodically
// For now, return a no-op watcher. Full implementation would poll readDir.
return { stop: () => {} };
}
async spawn(command: string, args?: string[]): Promise<IProcessHandle> {
const socket = await this.ensureProcessSocket();
const processId = crypto.randomUUID();
await socket.setTag(`workspaceProcess:${processId}`, true);
let resolveExit: (exitCodeArg: number) => void = () => {};
const exit = new Promise<number>((resolve) => {
resolveExit = resolve;
});
const output = new ReadableStream<string>({
start: (controller) => {
this.processStates.set(processId, {
outputController: controller,
resolveExit,
});
},
cancel: async () => {
await this.killProcess(processId).catch(() => {});
},
});
try {
await socket.createTypedRequest<interfaces.requests.IReq_WorkspaceStartProcess>(
'workspaceStartProcess',
).fire({
identity: this.identity,
serviceName: this.serviceName,
processId,
command,
args,
});
} catch (error) {
const processState = this.processStates.get(processId);
this.processStates.delete(processId);
await socket.removeTag(`workspaceProcess:${processId}`).catch(() => {});
try {
processState?.outputController.error(error);
} catch {
// The stream may already have been cancelled by the terminal.
}
throw error;
}
const input = new WritableStream<string>({
write: async (chunkArg) => {
await socket.createTypedRequest<interfaces.requests.IReq_WorkspaceProcessInput>(
'workspaceProcessInput',
).fire({
identity: this.identity,
processId,
input: chunkArg,
});
},
abort: async () => {
await this.killProcess(processId).catch(() => {});
},
});
return {
output,
input,
exit,
kill: () => {
void this.killProcess(processId);
},
};
}
async getShellCommand(): Promise<IWorkspaceShellCommand> {
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceGetShellCommand>(
'workspaceGetShellCommand',
{},
);
return result.shellCommand;
}
private registerProcessSocketHandlers(): void {
this.processRouter.addTypedHandler(
new plugins.domtools.plugins.typedrequest.TypedHandler<interfaces.requests.IReq_PushWorkspaceProcessOutput>(
'pushWorkspaceProcessOutput',
async (dataArg: interfaces.requests.IReq_PushWorkspaceProcessOutput['request']) => {
this.processStates.get(dataArg.processId)?.outputController.enqueue(dataArg.output);
return {};
},
),
);
this.processRouter.addTypedHandler(
new plugins.domtools.plugins.typedrequest.TypedHandler<interfaces.requests.IReq_PushWorkspaceProcessExit>(
'pushWorkspaceProcessExit',
async (dataArg: interfaces.requests.IReq_PushWorkspaceProcessExit['request']) => {
this.completeProcessState(dataArg.processId, dataArg.exitCode);
await this.processSocket?.removeTag(`workspaceProcess:${dataArg.processId}`).catch(() => {});
return {};
},
),
);
}
private async ensureProcessSocket(): Promise<InstanceType<typeof plugins.typedsocket.TypedSocket>> {
if (this.processSocket) return this.processSocket;
if (!this.processSocketPromise) {
this.processSocketPromise = plugins.typedsocket.TypedSocket.createClient(
this.processRouter,
plugins.typedsocket.TypedSocket.useWindowLocationOriginUrl(),
{ autoReconnect: true },
);
}
this.processSocket = await this.processSocketPromise;
return this.processSocket;
}
private completeProcessState(processIdArg: string, exitCodeArg: number): void {
const processState = this.processStates.get(processIdArg);
if (!processState) return;
try {
processState.outputController.close();
} catch {
// The terminal may already have cancelled the stream.
}
processState.resolveExit(exitCodeArg);
this.processStates.delete(processIdArg);
}
private async killProcess(processIdArg: string): Promise<void> {
const socket = this.processSocket;
if (!socket) return;
await socket.createTypedRequest<interfaces.requests.IReq_WorkspaceKillProcess>(
'workspaceKillProcess',
).fire({
identity: this.identity,
processId: processIdArg,
}).catch(() => {});
this.completeProcessState(processIdArg, -1);
await socket.removeTag(`workspaceProcess:${processIdArg}`).catch(() => {});
}
/**
* Helper to fire TypedRequests to the workspace API
*/
private async fireRequest<T extends { method: string; request: any; response: any }>(
method: string,
data: Omit<T['request'], 'identity' | 'serviceName'>,
): Promise<T['response']> {
const typedRequest = new domtools.plugins.typedrequest.TypedRequest<T>(
'/typedrequest',
method,
);
return await typedRequest.fire({
identity: this.identity,
serviceName: this.serviceName,
...data,
} as T['request']);
}
}