@push.rocks/taskbuffer 🚀
Modern TypeScript task orchestration with smart buffering, scheduling, and real-time progress tracking
🌟 Features
- 🎯 Type-Safe Task Management - Full TypeScript support with generics and type inference
- 📊 Real-Time Progress Tracking - Step-based progress with percentage weights
- ⚡ Smart Buffering - Intelligent request debouncing and batching
- ⏰ Cron Scheduling - Schedule tasks with cron expressions
- 🔄 Task Chains & Parallel Execution - Sequential and parallel task orchestration
- 🎨 Web Component Dashboard - Real-time visualization of task execution
- 📈 Comprehensive Metadata - Track execution history, duration, and status
- 🔒 Thread-Safe Operations - Concurrency control and execution limits
- 🎭 Event-Driven Architecture - Observable task lifecycle events
📦 Installation
npm install @push.rocks/taskbuffer
# or
pnpm add @push.rocks/taskbuffer
# or
yarn add @push.rocks/taskbuffer
🚀 Quick Start
Basic Task Creation
import { Task, TaskManager } from '@push.rocks/taskbuffer';
// Create a simple task
const dataProcessor = new Task({
name: 'ProcessData',
taskFunction: async (data) => {
console.log(`Processing: ${data}`);
// Your async logic here
return `Processed: ${data}`;
}
});
// Execute the task
const result = await dataProcessor.trigger('my-data');
console.log(result); // "Processed: my-data"
Tasks with Progress Tracking 📊
const deploymentTask = new Task({
name: 'DeployApplication',
steps: [
{ name: 'build', description: 'Building application', percentage: 30 },
{ name: 'test', description: 'Running tests', percentage: 20 },
{ name: 'deploy', description: 'Deploying to server', percentage: 40 },
{ name: 'verify', description: 'Verifying deployment', percentage: 10 }
] as const, // Use 'as const' for type inference
taskFunction: async function() {
// TypeScript knows these step names!
this.notifyStep('build');
await buildApplication();
this.notifyStep('test');
await runTests();
this.notifyStep('deploy');
await deployToServer();
this.notifyStep('verify');
await verifyDeployment();
return 'Deployment successful!';
}
});
// Monitor progress
console.log(deploymentTask.getProgress()); // 0-100
console.log(deploymentTask.getStepsMetadata()); // Detailed step info
🎯 Core Concepts
Task Buffering - Intelligent Request Management
TaskBuffer's buffering system prevents overwhelming your system with rapid-fire requests:
const apiTask = new Task({
name: 'APIRequest',
taskFunction: async (endpoint) => {
return await fetch(endpoint).then(r => r.json());
},
buffered: true,
bufferMax: 5, // Maximum 5 concurrent executions
execDelay: 100 // Minimum 100ms between executions
});
// Rapid fire 100 calls - only 5 will execute concurrently
for (let i = 0; i < 100; i++) {
apiTask.trigger(`/api/data/${i}`);
}
Buffer Behavior:
- First
bufferMax
calls execute immediately - Additional calls are queued
- When buffer is full, new calls overwrite the last queued item
- Perfect for real-time data streams where only recent data matters
Task Chains - Sequential Workflows
Build complex workflows with automatic data flow:
import { Task, Taskchain } from '@push.rocks/taskbuffer';
const fetchTask = new Task({
name: 'FetchData',
taskFunction: async () => {
const response = await fetch('/api/data');
return response.json();
}
});
const transformTask = new Task({
name: 'TransformData',
taskFunction: async (data) => {
return data.map(item => ({
...item,
transformed: true,
timestamp: Date.now()
}));
}
});
const saveTask = new Task({
name: 'SaveData',
taskFunction: async (transformedData) => {
await database.save(transformedData);
return transformedData.length;
}
});
// Create and execute chain
const dataChain = new Taskchain({
name: 'DataPipeline',
tasks: [fetchTask, transformTask, saveTask]
});
const savedCount = await dataChain.trigger();
console.log(`Saved ${savedCount} items`);
Parallel Execution - Concurrent Processing
Execute multiple tasks simultaneously:
import { TaskParallel } from '@push.rocks/taskbuffer';
const parallel = new TaskParallel({
name: 'ParallelProcessor',
tasks: [
emailTask,
smsTask,
pushNotificationTask,
webhookTask
]
});
// All tasks execute concurrently
const results = await parallel.trigger(notificationData);
// results = [emailResult, smsResult, pushResult, webhookResult]
TaskManager - Centralized Orchestration
Manage all your tasks from a single point:
const taskManager = new TaskManager();
// Add tasks
taskManager.addTask(dataProcessor);
taskManager.addTask(deploymentTask);
// Schedule tasks with cron
taskManager.addAndScheduleTask(backupTask, '0 2 * * *'); // Daily at 2 AM
taskManager.addAndScheduleTask(healthCheck, '*/5 * * * *'); // Every 5 minutes
// Get task metadata
const metadata = taskManager.getTaskMetadata('DeployApplication');
console.log(metadata);
// {
// name: 'DeployApplication',
// status: 'idle' | 'running' | 'completed' | 'failed',
// steps: [...],
// currentProgress: 75,
// runCount: 12,
// lastRun: Date,
// buffered: false,
// bufferMax: undefined,
// version: '1.0.0',
// timeout: 30000
// }
// Get all scheduled tasks
const scheduled = taskManager.getScheduledTasks();
scheduled.forEach(task => {
console.log(`${task.name}: Next run at ${task.nextRun}`);
});
// Execute and remove pattern
const report = await taskManager.addExecuteRemoveTask(temporaryTask, {
trackProgress: true
});
console.log(report);
// {
// taskName: 'TempTask',
// startTime: Date,
// endTime: Date,
// duration: 1523,
// steps: [...],
// stepsCompleted: ['step1', 'step2'],
// progress: 100,
// result: any,
// error?: Error
// }
🎨 Web Component Dashboard
Visualize your tasks in real-time with the included web component:
<!DOCTYPE html>
<html>
<head>
<script type="module">
import { TaskManager } from '@push.rocks/taskbuffer';
import '@push.rocks/taskbuffer/dist_ts_web/taskbuffer-dashboard.js';
const taskManager = new TaskManager();
// Attach to dashboard
const dashboard = document.querySelector('taskbuffer-dashboard');
dashboard.taskManager = taskManager;
dashboard.refreshInterval = 500; // Update every 500ms
</script>
</head>
<body>
<taskbuffer-dashboard></taskbuffer-dashboard>
</body>
</html>
The dashboard provides:
- 📊 Real-time progress bars with step indicators
- 📈 Task execution history
- ⏰ Scheduled task information
- 🎯 Interactive task controls
- 🌓 Light/dark theme support
🧩 Advanced Patterns
Dynamic Task Routing
Route tasks based on conditions:
const routerTask = new Task({
name: 'Router',
taskFunction: async (request) => {
if (request.priority === 'high') {
return await expressProcessor.trigger(request);
} else if (request.size > 1000000) {
return await bulkProcessor.trigger(request);
} else {
return await standardProcessor.trigger(request);
}
}
});
Task Pools
Create reusable task pools for load distribution:
class TaskPool {
private tasks: Task[] = [];
private currentIndex = 0;
constructor(poolSize: number, taskConfig: any) {
for (let i = 0; i < poolSize; i++) {
this.tasks.push(new Task({
...taskConfig,
name: `${taskConfig.name}_${i}`
}));
}
}
async execute(data: any) {
const task = this.tasks[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.tasks.length;
return await task.trigger(data);
}
}
const processorPool = new TaskPool(5, {
name: 'DataProcessor',
taskFunction: async (data) => processData(data)
});
Error Recovery & Retry
Implement robust error handling:
const resilientTask = new Task({
name: 'ResilientTask',
taskFunction: async (data, retryCount = 0) => {
try {
return await riskyOperation(data);
} catch (error) {
if (retryCount < 3) {
console.log(`Retry ${retryCount + 1}/3`);
await new Promise(r => setTimeout(r, 1000 * Math.pow(2, retryCount)));
return await resilientTask.trigger(data, retryCount + 1);
}
throw error;
}
}
});
Task Composition
Compose complex behaviors from simple tasks:
const compositeTask = new Task({
name: 'CompositeOperation',
steps: [
{ name: 'validate', description: 'Validating input', percentage: 20 },
{ name: 'process', description: 'Processing data', percentage: 60 },
{ name: 'notify', description: 'Sending notifications', percentage: 20 }
] as const,
taskFunction: async function(data) {
this.notifyStep('validate');
const validated = await validationTask.trigger(data);
this.notifyStep('process');
const processed = await processingTask.trigger(validated);
this.notifyStep('notify');
await notificationTask.trigger(processed);
return processed;
}
});
🔧 Configuration
Task Options
interface TaskOptions<T = undefined, TSteps = []> {
name?: string; // Task identifier
taskFunction: Function; // Async function to execute
buffered?: boolean; // Enable buffering
bufferMax?: number; // Max concurrent executions
execDelay?: number; // Min delay between executions
timeout?: number; // Task timeout in ms
version?: string; // Task version
steps?: TSteps; // Progress steps configuration
taskSetup?: Function; // One-time setup function
beforeTask?: Function; // Runs before each execution
afterTask?: Function; // Runs after each execution
}
TaskManager Options
const taskManager = new TaskManager({
maxConcurrentTasks: 10, // Global concurrency limit
defaultTimeout: 30000, // Default task timeout
logLevel: 'info' // Logging verbosity
});
📊 Monitoring & Observability
Task Events
task.on('started', () => console.log('Task started'));
task.on('completed', (result) => console.log('Task completed:', result));
task.on('failed', (error) => console.error('Task failed:', error));
task.on('stepChange', (step) => console.log('Step:', step.name));
Execution Metrics
const metrics = task.getMetrics();
console.log({
totalRuns: metrics.runCount,
averageDuration: metrics.avgDuration,
successRate: metrics.successRate,
lastError: metrics.lastError
});
🧪 Testing
import { expect, tap } from '@git.zone/tstest';
import { Task } from '@push.rocks/taskbuffer';
tap.test('Task should execute with progress tracking', async () => {
const task = new Task({
name: 'TestTask',
steps: [
{ name: 'step1', description: 'Step 1', percentage: 50 },
{ name: 'step2', description: 'Step 2', percentage: 50 }
] as const,
taskFunction: async function() {
this.notifyStep('step1');
await new Promise(r => setTimeout(r, 100));
this.notifyStep('step2');
return 'done';
}
});
const result = await task.trigger();
expect(result).toEqual('done');
expect(task.getProgress()).toEqual(100);
});
🌐 Real-World Examples
API Rate Limiter
const apiLimiter = new Task({
name: 'APIRateLimiter',
buffered: true,
bufferMax: 10, // Max 10 requests per second
execDelay: 100, // 100ms between requests
taskFunction: async (endpoint, data) => {
return await fetch(endpoint, {
method: 'POST',
body: JSON.stringify(data)
});
}
});
Database Migration Pipeline
const migrationChain = new Taskchain({
name: 'DatabaseMigration',
tasks: [
backupDatabaseTask,
validateSchemaTask,
runMigrationsTask,
verifyIntegrityTask,
updateIndexesTask
]
});
// Execute with rollback on failure
try {
await migrationChain.trigger();
console.log('Migration successful!');
} catch (error) {
await rollbackTask.trigger();
throw error;
}
Distributed Job Queue
const jobQueue = new TaskManager();
// Worker tasks
const imageProcessor = new Task({
name: 'ImageProcessor',
buffered: true,
bufferMax: 5,
steps: [
{ name: 'download', description: 'Downloading', percentage: 20 },
{ name: 'resize', description: 'Resizing', percentage: 40 },
{ name: 'optimize', description: 'Optimizing', percentage: 30 },
{ name: 'upload', description: 'Uploading', percentage: 10 }
] as const,
taskFunction: async function(job) {
this.notifyStep('download');
const image = await downloadImage(job.url);
this.notifyStep('resize');
const resized = await resizeImage(image, job.dimensions);
this.notifyStep('optimize');
const optimized = await optimizeImage(resized);
this.notifyStep('upload');
return await uploadToCDN(optimized);
}
});
jobQueue.addTask(imageProcessor);
// Process incoming jobs
messageQueue.on('job', async (job) => {
const result = await jobQueue.getTaskByName('ImageProcessor').trigger(job);
await messageQueue.ack(job.id, result);
});
🚀 Performance Tips
- Use Buffering Wisely - Enable buffering for I/O-bound tasks
- Set Appropriate Delays - Use
execDelay
to prevent API rate limits - Leverage Task Pools - Distribute load across multiple task instances
- Monitor Progress - Use step tracking for long-running operations
- Clean Up - Use
addExecuteRemoveTask
for one-time operations
🔍 Debugging
Enable detailed logging:
import { logger } from '@push.rocks/smartlog';
logger.enableConsole();
logger.level = 'debug';
// Tasks will now output detailed execution logs
📚 API Reference
Core Classes
Task<T, TSteps>
- Basic task unit with optional step trackingTaskManager
- Central orchestrator for task managementTaskchain
- Sequential task executorTaskParallel
- Concurrent task executorTaskOnce
- Single-execution taskTaskLoop
- Repeating task with conditions
Key Methods
Task Methods
trigger(input?: T): Promise<any>
- Execute the tasknotifyStep(stepName: StepNames<TSteps>): void
- Update current stepgetProgress(): number
- Get progress percentage (0-100)getStepsMetadata(): ITaskStep[]
- Get detailed step informationgetMetadata(): ITaskMetadata
- Get complete task metadata
TaskManager Methods
addTask(task: Task): void
- Register a taskgetTaskByName(name: string): Task | undefined
- Retrieve task by nameaddAndScheduleTask(task: Task, cronExpression: string): void
- Schedule taskdescheduleTaskByName(name: string): void
- Remove schedulinggetTaskMetadata(name: string): ITaskMetadata | null
- Get task metadatagetAllTasksMetadata(): ITaskMetadata[]
- Get all tasks metadatagetScheduledTasks(): IScheduledTaskInfo[]
- List scheduled tasksaddExecuteRemoveTask(task, options?): Promise<ITaskExecutionReport>
- Execute once
License and Legal Information
This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the license file within this repository.
Please note: The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.
Company Information
Task Venture Capital GmbH
Registered at District court Bremen HRB 35230 HB, Germany
For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.