f40ef6b7c0
Align Cloudly with the current typedserver, smartconfig, smartstate, and Docker tooling releases so builds and Docker output stay compatible with the upgraded stack.
354 lines
10 KiB
TypeScript
354 lines
10 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import { Cloudly } from '../classes.cloudly.js';
|
|
import { TaskExecution } from './classes.taskexecution.js';
|
|
import { createPredefinedTasks } from './predefinedtasks.js';
|
|
import { logger } from '../logger.js';
|
|
|
|
export interface ITaskInfo {
|
|
name: string;
|
|
description: string;
|
|
category: 'maintenance' | 'deployment' | 'backup' | 'monitoring' | 'cleanup' | 'system' | 'security';
|
|
schedule?: string; // Cron expression if scheduled
|
|
lastRun?: number;
|
|
enabled: boolean;
|
|
}
|
|
|
|
export class CloudlyTaskManager {
|
|
public typedrouter = new plugins.typedrequest.TypedRouter();
|
|
public cloudlyRef: Cloudly;
|
|
|
|
// TaskBuffer integration
|
|
private taskBufferManager = new plugins.taskbuffer.TaskManager();
|
|
private taskRegistry = new Map<string, plugins.taskbuffer.Task>();
|
|
private taskInfo = new Map<string, ITaskInfo>();
|
|
private currentExecutions = new Map<string, TaskExecution>();
|
|
private cancellationRequests = new Set<string>();
|
|
|
|
// Database connection helper
|
|
get db() {
|
|
return this.cloudlyRef.mongodbConnector.smartdataDb;
|
|
}
|
|
|
|
// Set up TaskExecution document manager
|
|
public CTaskExecution = plugins.smartdata.setDefaultManagerForDoc(this, TaskExecution);
|
|
|
|
constructor(cloudlyRefArg: Cloudly) {
|
|
this.cloudlyRef = cloudlyRefArg;
|
|
|
|
// Add router to main router
|
|
this.cloudlyRef.typedrouter.addTypedRouter(this.typedrouter);
|
|
|
|
// Set up API endpoints
|
|
this.setupApiEndpoints();
|
|
|
|
// Register predefined tasks
|
|
createPredefinedTasks(this);
|
|
}
|
|
|
|
/**
|
|
* Register a task with the manager
|
|
*/
|
|
public registerTask(
|
|
name: string,
|
|
task: plugins.taskbuffer.Task,
|
|
info: Omit<ITaskInfo, 'name' | 'lastRun'>
|
|
) {
|
|
this.taskRegistry.set(name, task);
|
|
this.taskInfo.set(name, {
|
|
name,
|
|
...info,
|
|
lastRun: undefined,
|
|
});
|
|
|
|
// Schedule if cron expression provided
|
|
if (info.schedule && info.enabled) {
|
|
this.scheduleTask(name, info.schedule);
|
|
}
|
|
|
|
logger.log('info', `Registered task: ${name}`);
|
|
}
|
|
|
|
/**
|
|
* Execute a task with tracking
|
|
*/
|
|
public async executeTask(
|
|
taskName: string,
|
|
triggeredBy: 'schedule' | 'manual' | 'system',
|
|
userId?: string
|
|
): Promise<TaskExecution | null> {
|
|
const task = this.taskRegistry.get(taskName);
|
|
const info = this.taskInfo.get(taskName);
|
|
|
|
if (!task) {
|
|
throw new Error(`Task ${taskName} not found`);
|
|
}
|
|
|
|
if (!info?.enabled && triggeredBy === 'schedule') {
|
|
logger.log('warn', `Skipping disabled scheduled task: ${taskName}`);
|
|
return null;
|
|
}
|
|
|
|
// Create execution record
|
|
const execution = await TaskExecution.createTaskExecution(taskName, triggeredBy, userId);
|
|
|
|
if (info?.description) {
|
|
execution.data.taskDescription = info.description;
|
|
}
|
|
if (info?.category) {
|
|
execution.data.category = info.category;
|
|
}
|
|
await execution.save();
|
|
|
|
// Store current execution for task to access
|
|
this.currentExecutions.set(taskName, execution);
|
|
|
|
try {
|
|
await execution.addLog(`Starting task: ${taskName}`, 'info');
|
|
|
|
// Execute the task
|
|
const result = await task.trigger();
|
|
|
|
// If a cancellation was requested during execution, don't mark as completed
|
|
if (execution.data.status === 'cancelled' || this.cancellationRequests.has(execution.id)) {
|
|
await execution.addLog('Task cancelled during execution', 'warning');
|
|
} else {
|
|
// Task completed successfully
|
|
await execution.complete(result);
|
|
await execution.addLog(`Task completed successfully`, 'success');
|
|
}
|
|
|
|
// Update last run time
|
|
if (info) {
|
|
info.lastRun = Date.now();
|
|
}
|
|
|
|
} catch (error) {
|
|
// If already cancelled, don't mark as failed
|
|
if (execution.data.status === 'cancelled' || this.cancellationRequests.has(execution.id)) {
|
|
await execution.addLog('Task was cancelled', 'warning');
|
|
} else {
|
|
// Task failed
|
|
await execution.fail(error as any);
|
|
await execution.addLog(`Task failed: ${(error as any).message}`, 'error');
|
|
logger.log('error', `Task ${taskName} failed: ${(error as any).message}`);
|
|
}
|
|
} finally {
|
|
// Clean up current execution
|
|
this.currentExecutions.delete(taskName);
|
|
this.cancellationRequests.delete(execution.id);
|
|
}
|
|
|
|
return execution;
|
|
}
|
|
|
|
/**
|
|
* Get current execution for a task (used by tasks to log)
|
|
*/
|
|
public getCurrentExecution(taskName: string): TaskExecution | undefined {
|
|
return this.currentExecutions.get(taskName);
|
|
}
|
|
|
|
/**
|
|
* Schedule a task with cron expression
|
|
*/
|
|
public scheduleTask(taskName: string, cronExpression: string) {
|
|
const task = this.taskRegistry.get(taskName);
|
|
if (!task) {
|
|
throw new Error(`Task ${taskName} not found`);
|
|
}
|
|
|
|
// Wrap task execution with tracking
|
|
const wrappedTask = new plugins.taskbuffer.Task({
|
|
name: `${taskName}-scheduled`,
|
|
taskFunction: async () => {
|
|
await this.executeTask(taskName, 'schedule');
|
|
},
|
|
});
|
|
|
|
this.taskBufferManager.addAndScheduleTask(wrappedTask, cronExpression);
|
|
logger.log('info', `Scheduled task ${taskName} with cron: ${cronExpression}`);
|
|
}
|
|
|
|
/**
|
|
* Cancel a running task
|
|
*/
|
|
public async cancelTask(executionId: string): Promise<boolean> {
|
|
const execution = await TaskExecution.getTaskExecutionById(executionId);
|
|
if (!execution || execution.data.status !== 'running') {
|
|
return false;
|
|
}
|
|
|
|
await execution.cancel();
|
|
await execution.addLog('Task cancelled by user', 'warning');
|
|
// mark cancellation request so running task functions can react cooperatively
|
|
this.cancellationRequests.add(execution.id);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Check if cancellation is requested for an execution
|
|
*/
|
|
public isCancellationRequested(executionId: string): boolean {
|
|
return this.cancellationRequests.has(executionId);
|
|
}
|
|
|
|
/**
|
|
* Get all registered tasks
|
|
*/
|
|
public getAllTasks(): ITaskInfo[] {
|
|
return Array.from(this.taskInfo.values());
|
|
}
|
|
|
|
/**
|
|
* Enable or disable a task
|
|
*/
|
|
public async setTaskEnabled(taskName: string, enabled: boolean) {
|
|
const info = this.taskInfo.get(taskName);
|
|
if (!info) {
|
|
throw new Error(`Task ${taskName} not found`);
|
|
}
|
|
|
|
info.enabled = enabled;
|
|
|
|
if (!enabled) {
|
|
// TODO: Remove from scheduler if disabled
|
|
logger.log('info', `Disabled task: ${taskName}`);
|
|
} else if (info.schedule) {
|
|
// Reschedule if enabled with schedule
|
|
this.scheduleTask(taskName, info.schedule);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Set up API endpoints
|
|
*/
|
|
private setupApiEndpoints() {
|
|
// Get all tasks
|
|
this.typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_GetTasks>(
|
|
'getTasks',
|
|
async (reqArg) => {
|
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
|
this.cloudlyRef.authManager.validIdentityGuard,
|
|
]);
|
|
|
|
const tasks = this.getAllTasks();
|
|
|
|
return {
|
|
tasks,
|
|
};
|
|
}
|
|
)
|
|
);
|
|
|
|
// Get task executions
|
|
this.typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_GetTaskExecutions>(
|
|
'getTaskExecutions',
|
|
async (reqArg) => {
|
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
|
this.cloudlyRef.authManager.validIdentityGuard,
|
|
]);
|
|
|
|
const executions = await TaskExecution.getTaskExecutions(reqArg.filter);
|
|
|
|
return {
|
|
executions: await Promise.all(
|
|
executions.map(e => e.createSavableObject())
|
|
),
|
|
};
|
|
}
|
|
)
|
|
);
|
|
|
|
// Get task execution by ID
|
|
this.typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_GetTaskExecutionById>(
|
|
'getTaskExecutionById',
|
|
async (reqArg) => {
|
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
|
this.cloudlyRef.authManager.validIdentityGuard,
|
|
]);
|
|
|
|
const execution = await TaskExecution.getTaskExecutionById(reqArg.executionId);
|
|
|
|
if (!execution) {
|
|
throw new Error('Task execution not found');
|
|
}
|
|
|
|
return {
|
|
execution: await execution.createSavableObject(),
|
|
};
|
|
}
|
|
)
|
|
);
|
|
|
|
// Trigger task manually
|
|
this.typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_TriggerTask>(
|
|
'triggerTask',
|
|
async (reqArg) => {
|
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
|
this.cloudlyRef.authManager.validIdentityGuard,
|
|
]);
|
|
|
|
const execution = await this.executeTask(
|
|
reqArg.taskName,
|
|
'manual',
|
|
reqArg.userId
|
|
);
|
|
if (!execution) {
|
|
throw new Error(`Task ${reqArg.taskName} did not start`);
|
|
}
|
|
|
|
return {
|
|
execution: await execution.createSavableObject(),
|
|
};
|
|
}
|
|
)
|
|
);
|
|
|
|
// Cancel task
|
|
this.typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_CancelTask>(
|
|
'cancelTask',
|
|
async (reqArg) => {
|
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
|
this.cloudlyRef.authManager.validIdentityGuard,
|
|
]);
|
|
|
|
const success = await this.cancelTask(reqArg.executionId);
|
|
|
|
return {
|
|
success,
|
|
};
|
|
}
|
|
)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Initialize the task manager
|
|
*/
|
|
public async init() {
|
|
logger.log('info', 'Task Manager initialized');
|
|
|
|
// Clean up old executions on startup
|
|
const deletedCount = await TaskExecution.cleanupOldExecutions(30);
|
|
if (deletedCount > 0) {
|
|
logger.log('info', `Cleaned up ${deletedCount} old task executions`);
|
|
}
|
|
await this.taskBufferManager.start();
|
|
}
|
|
|
|
/**
|
|
* Stop the task manager
|
|
*/
|
|
public async stop() {
|
|
// Stop all scheduled tasks
|
|
await this.taskBufferManager.stop();
|
|
logger.log('info', 'Task Manager stopped');
|
|
}
|
|
}
|