330 lines
9.3 KiB
TypeScript
330 lines
9.3 KiB
TypeScript
|
import * as plugins from '../plugins.js';
|
||
|
import { Cloudly } from '../classes.cloudly.js';
|
||
|
import { TaskExecution } from './classes.taskexecution.js';
|
||
|
import { createPredefinedTasks } from './predefinedtasks.js';
|
||
|
import { logger } from '../logger.js';
|
||
|
|
||
|
export interface ITaskInfo {
|
||
|
name: string;
|
||
|
description: string;
|
||
|
category: 'maintenance' | 'deployment' | 'backup' | 'monitoring' | 'cleanup' | 'system' | 'security';
|
||
|
schedule?: string; // Cron expression if scheduled
|
||
|
lastRun?: number;
|
||
|
enabled: boolean;
|
||
|
}
|
||
|
|
||
|
export class CloudlyTaskManager {
|
||
|
public typedrouter = new plugins.typedrequest.TypedRouter();
|
||
|
public cloudlyRef: Cloudly;
|
||
|
|
||
|
// TaskBuffer integration
|
||
|
private taskBufferManager = new plugins.taskbuffer.TaskManager();
|
||
|
private taskRegistry = new Map<string, plugins.taskbuffer.Task>();
|
||
|
private taskInfo = new Map<string, ITaskInfo>();
|
||
|
private currentExecutions = new Map<string, TaskExecution>();
|
||
|
|
||
|
// Database connection helper
|
||
|
get db() {
|
||
|
return this.cloudlyRef.mongodbConnector.smartdataDb;
|
||
|
}
|
||
|
|
||
|
// Set up TaskExecution document manager
|
||
|
public CTaskExecution = plugins.smartdata.setDefaultManagerForDoc(this, TaskExecution);
|
||
|
|
||
|
constructor(cloudlyRefArg: Cloudly) {
|
||
|
this.cloudlyRef = cloudlyRefArg;
|
||
|
|
||
|
// Add router to main router
|
||
|
this.cloudlyRef.typedrouter.addTypedRouter(this.typedrouter);
|
||
|
|
||
|
// Set up API endpoints
|
||
|
this.setupApiEndpoints();
|
||
|
|
||
|
// Register predefined tasks
|
||
|
createPredefinedTasks(this);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Register a task with the manager
|
||
|
*/
|
||
|
public registerTask(
|
||
|
name: string,
|
||
|
task: plugins.taskbuffer.Task,
|
||
|
info: Omit<ITaskInfo, 'name' | 'lastRun'>
|
||
|
) {
|
||
|
this.taskRegistry.set(name, task);
|
||
|
this.taskInfo.set(name, {
|
||
|
name,
|
||
|
...info,
|
||
|
lastRun: undefined,
|
||
|
});
|
||
|
|
||
|
// Schedule if cron expression provided
|
||
|
if (info.schedule && info.enabled) {
|
||
|
this.scheduleTask(name, info.schedule);
|
||
|
}
|
||
|
|
||
|
logger.log('info', `Registered task: ${name}`);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Execute a task with tracking
|
||
|
*/
|
||
|
public async executeTask(
|
||
|
taskName: string,
|
||
|
triggeredBy: 'schedule' | 'manual' | 'system',
|
||
|
userId?: string
|
||
|
): Promise<TaskExecution> {
|
||
|
const task = this.taskRegistry.get(taskName);
|
||
|
const info = this.taskInfo.get(taskName);
|
||
|
|
||
|
if (!task) {
|
||
|
throw new Error(`Task ${taskName} not found`);
|
||
|
}
|
||
|
|
||
|
if (!info?.enabled && triggeredBy === 'schedule') {
|
||
|
logger.log('warn', `Skipping disabled scheduled task: ${taskName}`);
|
||
|
return null;
|
||
|
}
|
||
|
|
||
|
// Create execution record
|
||
|
const execution = await TaskExecution.createTaskExecution(taskName, triggeredBy, userId);
|
||
|
|
||
|
if (info?.description) {
|
||
|
execution.data.taskDescription = info.description;
|
||
|
}
|
||
|
if (info?.category) {
|
||
|
execution.data.category = info.category;
|
||
|
}
|
||
|
await execution.save();
|
||
|
|
||
|
// Store current execution for task to access
|
||
|
this.currentExecutions.set(taskName, execution);
|
||
|
|
||
|
try {
|
||
|
await execution.addLog(`Starting task: ${taskName}`, 'info');
|
||
|
|
||
|
// Execute the task
|
||
|
const result = await task.trigger();
|
||
|
|
||
|
// Task completed successfully
|
||
|
await execution.complete(result);
|
||
|
await execution.addLog(`Task completed successfully`, 'success');
|
||
|
|
||
|
// Update last run time
|
||
|
if (info) {
|
||
|
info.lastRun = Date.now();
|
||
|
}
|
||
|
|
||
|
} catch (error) {
|
||
|
// Task failed
|
||
|
await execution.fail(error);
|
||
|
await execution.addLog(`Task failed: ${error.message}`, 'error');
|
||
|
logger.log('error', `Task ${taskName} failed: ${error.message}`);
|
||
|
} finally {
|
||
|
// Clean up current execution
|
||
|
this.currentExecutions.delete(taskName);
|
||
|
}
|
||
|
|
||
|
return execution;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Get current execution for a task (used by tasks to log)
|
||
|
*/
|
||
|
public getCurrentExecution(taskName: string): TaskExecution | undefined {
|
||
|
return this.currentExecutions.get(taskName);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Schedule a task with cron expression
|
||
|
*/
|
||
|
public scheduleTask(taskName: string, cronExpression: string) {
|
||
|
const task = this.taskRegistry.get(taskName);
|
||
|
if (!task) {
|
||
|
throw new Error(`Task ${taskName} not found`);
|
||
|
}
|
||
|
|
||
|
// Wrap task execution with tracking
|
||
|
const wrappedTask = new plugins.taskbuffer.Task({
|
||
|
name: `${taskName}-scheduled`,
|
||
|
taskFunction: async () => {
|
||
|
await this.executeTask(taskName, 'schedule');
|
||
|
},
|
||
|
});
|
||
|
|
||
|
this.taskBufferManager.addAndScheduleTask(wrappedTask, cronExpression);
|
||
|
logger.log('info', `Scheduled task ${taskName} with cron: ${cronExpression}`);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Cancel a running task
|
||
|
*/
|
||
|
public async cancelTask(executionId: string): Promise<boolean> {
|
||
|
const execution = await TaskExecution.getTaskExecutionById(executionId);
|
||
|
if (!execution || execution.data.status !== 'running') {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
await execution.cancel();
|
||
|
await execution.addLog('Task cancelled by user', 'warning');
|
||
|
|
||
|
// TODO: Implement actual task cancellation in taskbuffer
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Get all registered tasks
|
||
|
*/
|
||
|
public getAllTasks(): ITaskInfo[] {
|
||
|
return Array.from(this.taskInfo.values());
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Enable or disable a task
|
||
|
*/
|
||
|
public async setTaskEnabled(taskName: string, enabled: boolean) {
|
||
|
const info = this.taskInfo.get(taskName);
|
||
|
if (!info) {
|
||
|
throw new Error(`Task ${taskName} not found`);
|
||
|
}
|
||
|
|
||
|
info.enabled = enabled;
|
||
|
|
||
|
if (!enabled) {
|
||
|
// TODO: Remove from scheduler if disabled
|
||
|
logger.log('info', `Disabled task: ${taskName}`);
|
||
|
} else if (info.schedule) {
|
||
|
// Reschedule if enabled with schedule
|
||
|
this.scheduleTask(taskName, info.schedule);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Set up API endpoints
|
||
|
*/
|
||
|
private setupApiEndpoints() {
|
||
|
// Get all tasks
|
||
|
this.typedrouter.addTypedHandler(
|
||
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_GetTasks>(
|
||
|
'getTasks',
|
||
|
async (reqArg) => {
|
||
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
||
|
this.cloudlyRef.authManager.validIdentityGuard,
|
||
|
]);
|
||
|
|
||
|
const tasks = this.getAllTasks();
|
||
|
|
||
|
return {
|
||
|
tasks,
|
||
|
};
|
||
|
}
|
||
|
)
|
||
|
);
|
||
|
|
||
|
// Get task executions
|
||
|
this.typedrouter.addTypedHandler(
|
||
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_GetTaskExecutions>(
|
||
|
'getTaskExecutions',
|
||
|
async (reqArg) => {
|
||
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
||
|
this.cloudlyRef.authManager.validIdentityGuard,
|
||
|
]);
|
||
|
|
||
|
const executions = await TaskExecution.getTaskExecutions(reqArg.filter);
|
||
|
|
||
|
return {
|
||
|
executions: await Promise.all(
|
||
|
executions.map(e => e.createSavableObject())
|
||
|
),
|
||
|
};
|
||
|
}
|
||
|
)
|
||
|
);
|
||
|
|
||
|
// Get task execution by ID
|
||
|
this.typedrouter.addTypedHandler(
|
||
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_GetTaskExecutionById>(
|
||
|
'getTaskExecutionById',
|
||
|
async (reqArg) => {
|
||
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
||
|
this.cloudlyRef.authManager.validIdentityGuard,
|
||
|
]);
|
||
|
|
||
|
const execution = await TaskExecution.getTaskExecutionById(reqArg.executionId);
|
||
|
|
||
|
if (!execution) {
|
||
|
throw new Error('Task execution not found');
|
||
|
}
|
||
|
|
||
|
return {
|
||
|
execution: await execution.createSavableObject(),
|
||
|
};
|
||
|
}
|
||
|
)
|
||
|
);
|
||
|
|
||
|
// Trigger task manually
|
||
|
this.typedrouter.addTypedHandler(
|
||
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_TriggerTask>(
|
||
|
'triggerTask',
|
||
|
async (reqArg) => {
|
||
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
||
|
this.cloudlyRef.authManager.validIdentityGuard,
|
||
|
]);
|
||
|
|
||
|
const execution = await this.executeTask(
|
||
|
reqArg.taskName,
|
||
|
'manual',
|
||
|
reqArg.userId
|
||
|
);
|
||
|
|
||
|
return {
|
||
|
execution: await execution.createSavableObject(),
|
||
|
};
|
||
|
}
|
||
|
)
|
||
|
);
|
||
|
|
||
|
// Cancel task
|
||
|
this.typedrouter.addTypedHandler(
|
||
|
new plugins.typedrequest.TypedHandler<plugins.servezoneInterfaces.requests.task.IRequest_Any_Cloudly_CancelTask>(
|
||
|
'cancelTask',
|
||
|
async (reqArg) => {
|
||
|
await plugins.smartguard.passGuardsOrReject(reqArg, [
|
||
|
this.cloudlyRef.authManager.validIdentityGuard,
|
||
|
]);
|
||
|
|
||
|
const success = await this.cancelTask(reqArg.executionId);
|
||
|
|
||
|
return {
|
||
|
success,
|
||
|
};
|
||
|
}
|
||
|
)
|
||
|
);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Initialize the task manager
|
||
|
*/
|
||
|
public async init() {
|
||
|
logger.log('info', 'Task Manager initialized');
|
||
|
|
||
|
// Clean up old executions on startup
|
||
|
const deletedCount = await TaskExecution.cleanupOldExecutions(30);
|
||
|
if (deletedCount > 0) {
|
||
|
logger.log('info', `Cleaned up ${deletedCount} old task executions`);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Stop the task manager
|
||
|
*/
|
||
|
public async stop() {
|
||
|
// Stop all scheduled tasks
|
||
|
await this.taskBufferManager.stop();
|
||
|
logger.log('info', 'Task Manager stopped');
|
||
|
}
|
||
|
}
|