277 lines
8.9 KiB
TypeScript
277 lines
8.9 KiB
TypeScript
/**
|
|
* BackendExecutionEnvironment — implements IExecutionEnvironment
|
|
* by routing all filesystem and process operations through the onebox API
|
|
* to Docker exec on the target container.
|
|
*/
|
|
|
|
import * as plugins from '../plugins.js';
|
|
import * as interfaces from '../../ts_interfaces/index.js';
|
|
|
|
// Import IExecutionEnvironment type from dees-catalog
|
|
type IExecutionEnvironment = import('@design.estate/dees-catalog').IExecutionEnvironment;
|
|
type IFileEntry = import('@design.estate/dees-catalog').IFileEntry;
|
|
type IFileWatcher = import('@design.estate/dees-catalog').IFileWatcher;
|
|
type IProcessHandle = import('@design.estate/dees-catalog').IProcessHandle;
|
|
type IWorkspaceShellCommand = interfaces.requests.IWorkspaceShellCommand;
|
|
|
|
const domtools = plugins.deesElement.domtools;
|
|
|
|
interface IWorkspaceProcessState {
|
|
outputController: ReadableStreamDefaultController<string>;
|
|
resolveExit: (exitCodeArg: number) => void;
|
|
}
|
|
|
|
export class BackendExecutionEnvironment implements IExecutionEnvironment {
|
|
readonly type = 'backend' as const;
|
|
private _ready = false;
|
|
private identity: interfaces.data.IIdentity;
|
|
private processRouter = new plugins.domtools.plugins.typedrequest.TypedRouter();
|
|
private processSocket: InstanceType<typeof plugins.typedsocket.TypedSocket> | null = null;
|
|
private processSocketPromise: Promise<InstanceType<typeof plugins.typedsocket.TypedSocket>> | null = null;
|
|
private processStates = new Map<string, IWorkspaceProcessState>();
|
|
|
|
constructor(
|
|
private serviceName: string,
|
|
identity: interfaces.data.IIdentity,
|
|
) {
|
|
this.identity = identity;
|
|
this.registerProcessSocketHandlers();
|
|
}
|
|
|
|
get ready(): boolean {
|
|
return this._ready;
|
|
}
|
|
|
|
async init(): Promise<void> {
|
|
// Verify the container is accessible by checking if root exists
|
|
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceExists>(
|
|
'workspaceExists',
|
|
{ path: '/' },
|
|
);
|
|
if (!result.exists) {
|
|
throw new Error(`Cannot access container filesystem for service: ${this.serviceName}`);
|
|
}
|
|
this._ready = true;
|
|
}
|
|
|
|
async destroy(): Promise<void> {
|
|
for (const processId of Array.from(this.processStates.keys())) {
|
|
await this.killProcess(processId).catch(() => {});
|
|
}
|
|
await this.processSocket?.stop().catch(() => {});
|
|
this.processSocket = null;
|
|
this.processSocketPromise = null;
|
|
this._ready = false;
|
|
}
|
|
|
|
async readFile(path: string): Promise<string> {
|
|
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceReadFile>(
|
|
'workspaceReadFile',
|
|
{ path },
|
|
);
|
|
return result.content;
|
|
}
|
|
|
|
async writeFile(path: string, contents: string): Promise<void> {
|
|
await this.fireRequest<interfaces.requests.IReq_WorkspaceWriteFile>(
|
|
'workspaceWriteFile',
|
|
{ path, content: contents },
|
|
);
|
|
}
|
|
|
|
async readDir(path: string): Promise<IFileEntry[]> {
|
|
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceReadDir>(
|
|
'workspaceReadDir',
|
|
{ path },
|
|
);
|
|
return result.entries;
|
|
}
|
|
|
|
async mkdir(path: string): Promise<void> {
|
|
await this.fireRequest<interfaces.requests.IReq_WorkspaceMkdir>(
|
|
'workspaceMkdir',
|
|
{ path },
|
|
);
|
|
}
|
|
|
|
async rm(path: string, options?: { recursive?: boolean }): Promise<void> {
|
|
await this.fireRequest<interfaces.requests.IReq_WorkspaceRm>(
|
|
'workspaceRm',
|
|
{ path, recursive: options?.recursive },
|
|
);
|
|
}
|
|
|
|
async exists(path: string): Promise<boolean> {
|
|
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceExists>(
|
|
'workspaceExists',
|
|
{ path },
|
|
);
|
|
return result.exists;
|
|
}
|
|
|
|
watch(
|
|
_path: string,
|
|
_callback: (event: 'rename' | 'change', filename: string | null) => void,
|
|
_options?: { recursive?: boolean },
|
|
): IFileWatcher {
|
|
// Polling-based file watching — check for changes periodically
|
|
// For now, return a no-op watcher. Full implementation would poll readDir.
|
|
return { stop: () => {} };
|
|
}
|
|
|
|
async spawn(command: string, args?: string[]): Promise<IProcessHandle> {
|
|
const socket = await this.ensureProcessSocket();
|
|
const processId = crypto.randomUUID();
|
|
await socket.setTag(`workspaceProcess:${processId}`, true);
|
|
|
|
let resolveExit: (exitCodeArg: number) => void = () => {};
|
|
const exit = new Promise<number>((resolve) => {
|
|
resolveExit = resolve;
|
|
});
|
|
const output = new ReadableStream<string>({
|
|
start: (controller) => {
|
|
this.processStates.set(processId, {
|
|
outputController: controller,
|
|
resolveExit,
|
|
});
|
|
},
|
|
cancel: async () => {
|
|
await this.killProcess(processId).catch(() => {});
|
|
},
|
|
});
|
|
|
|
try {
|
|
await socket.createTypedRequest<interfaces.requests.IReq_WorkspaceStartProcess>(
|
|
'workspaceStartProcess',
|
|
).fire({
|
|
identity: this.identity,
|
|
serviceName: this.serviceName,
|
|
processId,
|
|
command,
|
|
args,
|
|
});
|
|
} catch (error) {
|
|
const processState = this.processStates.get(processId);
|
|
this.processStates.delete(processId);
|
|
await socket.removeTag(`workspaceProcess:${processId}`).catch(() => {});
|
|
try {
|
|
processState?.outputController.error(error);
|
|
} catch {
|
|
// The stream may already have been cancelled by the terminal.
|
|
}
|
|
throw error;
|
|
}
|
|
|
|
const input = new WritableStream<string>({
|
|
write: async (chunkArg) => {
|
|
await socket.createTypedRequest<interfaces.requests.IReq_WorkspaceProcessInput>(
|
|
'workspaceProcessInput',
|
|
).fire({
|
|
identity: this.identity,
|
|
processId,
|
|
input: chunkArg,
|
|
});
|
|
},
|
|
abort: async () => {
|
|
await this.killProcess(processId).catch(() => {});
|
|
},
|
|
});
|
|
|
|
return {
|
|
output,
|
|
input,
|
|
exit,
|
|
kill: () => {
|
|
void this.killProcess(processId);
|
|
},
|
|
};
|
|
}
|
|
|
|
async getShellCommand(): Promise<IWorkspaceShellCommand> {
|
|
const result = await this.fireRequest<interfaces.requests.IReq_WorkspaceGetShellCommand>(
|
|
'workspaceGetShellCommand',
|
|
{},
|
|
);
|
|
return result.shellCommand;
|
|
}
|
|
|
|
private registerProcessSocketHandlers(): void {
|
|
this.processRouter.addTypedHandler(
|
|
new plugins.domtools.plugins.typedrequest.TypedHandler<interfaces.requests.IReq_PushWorkspaceProcessOutput>(
|
|
'pushWorkspaceProcessOutput',
|
|
async (dataArg: interfaces.requests.IReq_PushWorkspaceProcessOutput['request']) => {
|
|
this.processStates.get(dataArg.processId)?.outputController.enqueue(dataArg.output);
|
|
return {};
|
|
},
|
|
),
|
|
);
|
|
|
|
this.processRouter.addTypedHandler(
|
|
new plugins.domtools.plugins.typedrequest.TypedHandler<interfaces.requests.IReq_PushWorkspaceProcessExit>(
|
|
'pushWorkspaceProcessExit',
|
|
async (dataArg: interfaces.requests.IReq_PushWorkspaceProcessExit['request']) => {
|
|
this.completeProcessState(dataArg.processId, dataArg.exitCode);
|
|
await this.processSocket?.removeTag(`workspaceProcess:${dataArg.processId}`).catch(() => {});
|
|
return {};
|
|
},
|
|
),
|
|
);
|
|
}
|
|
|
|
private async ensureProcessSocket(): Promise<InstanceType<typeof plugins.typedsocket.TypedSocket>> {
|
|
if (this.processSocket) return this.processSocket;
|
|
if (!this.processSocketPromise) {
|
|
this.processSocketPromise = plugins.typedsocket.TypedSocket.createClient(
|
|
this.processRouter,
|
|
plugins.typedsocket.TypedSocket.useWindowLocationOriginUrl(),
|
|
{ autoReconnect: true },
|
|
);
|
|
}
|
|
this.processSocket = await this.processSocketPromise;
|
|
return this.processSocket;
|
|
}
|
|
|
|
private completeProcessState(processIdArg: string, exitCodeArg: number): void {
|
|
const processState = this.processStates.get(processIdArg);
|
|
if (!processState) return;
|
|
try {
|
|
processState.outputController.close();
|
|
} catch {
|
|
// The terminal may already have cancelled the stream.
|
|
}
|
|
processState.resolveExit(exitCodeArg);
|
|
this.processStates.delete(processIdArg);
|
|
}
|
|
|
|
private async killProcess(processIdArg: string): Promise<void> {
|
|
const socket = this.processSocket;
|
|
if (!socket) return;
|
|
await socket.createTypedRequest<interfaces.requests.IReq_WorkspaceKillProcess>(
|
|
'workspaceKillProcess',
|
|
).fire({
|
|
identity: this.identity,
|
|
processId: processIdArg,
|
|
}).catch(() => {});
|
|
this.completeProcessState(processIdArg, -1);
|
|
await socket.removeTag(`workspaceProcess:${processIdArg}`).catch(() => {});
|
|
}
|
|
|
|
/**
|
|
* Helper to fire TypedRequests to the workspace API
|
|
*/
|
|
private async fireRequest<T extends { method: string; request: any; response: any }>(
|
|
method: string,
|
|
data: Omit<T['request'], 'identity' | 'serviceName'>,
|
|
): Promise<T['response']> {
|
|
const typedRequest = new domtools.plugins.typedrequest.TypedRequest<T>(
|
|
'/typedrequest',
|
|
method,
|
|
);
|
|
return await typedRequest.fire({
|
|
identity: this.identity,
|
|
serviceName: this.serviceName,
|
|
...data,
|
|
} as T['request']);
|
|
}
|
|
}
|