2025-08-26 20:42:52 +00:00
2021-11-11 20:48:31 +01:00
2016-09-02 18:11:33 +02:00
2016-02-07 22:11:30 +01:00
2025-08-26 20:42:52 +00:00
2024-04-14 18:38:20 +02:00

@push.rocks/taskbuffer 🚀

A powerful, flexible, and TypeScript-first task management library for orchestrating asynchronous operations with style. From simple task execution to complex distributed workflows, taskbuffer has got you covered.

Install 📦

npm install @push.rocks/taskbuffer --save

Or with pnpm (recommended):

pnpm add @push.rocks/taskbuffer

Why taskbuffer? 🤔

In the modern JavaScript ecosystem, managing asynchronous tasks efficiently is crucial. Whether you're building a data pipeline, managing API rate limits, or orchestrating complex workflows, @push.rocks/taskbuffer provides the tools you need:

  • 🎯 TypeScript-first: Built with TypeScript for TypeScript - enjoy complete type safety and excellent IDE support
  • Flexible execution: From simple tasks to complex parallel workflows with dependencies
  • 🔄 Smart buffering: Control concurrent executions with intelligent buffer management
  • Built-in scheduling: Cron-based task scheduling without additional dependencies
  • 🎭 Multiple paradigms: Support for debounced, throttled, and one-time execution patterns
  • 🔌 Extensible: Clean architecture that's easy to extend and customize
  • 🏃 Zero dependencies on external schedulers: Everything you need is included

Core Concepts 🎓

Task

The fundamental unit of work. A task wraps an asynchronous function and provides powerful execution control.

Taskchain

Sequential task execution - tasks run one after another, with results passed along the chain.

Taskparallel

Parallel task execution - multiple tasks run simultaneously for maximum performance.

TaskManager

Centralized task scheduling and management using cron expressions.

TaskDebounced

Debounced task execution - prevents rapid repeated executions, only running after a quiet period.

TaskOnce

Singleton task execution - ensures a task runs exactly once, perfect for initialization routines.

Quick Start 🏁

Basic Task Execution

import { Task } from '@push.rocks/taskbuffer';

// Create a simple task
const myTask = new Task({
  name: 'DataProcessor',
  taskFunction: async () => {
    const data = await fetchData();
    return processData(data);
  },
});

// Execute the task
const result = await myTask.trigger();

TypeScript Generics Support 🔬

TaskBuffer leverages TypeScript's powerful generics system for complete type safety across your task chains and workflows.

Generic Task Functions

Tasks support generic type parameters for both input and output types:

import { Task, ITaskFunction } from '@push.rocks/taskbuffer';

// Define typed interfaces
interface UserData {
  id: string;
  name: string;
  email: string;
}

interface ProcessedUser {
  userId: string;
  displayName: string;
  normalized: boolean;
}

// Create strongly typed tasks
const processUserTask = new Task<UserData, ProcessedUser>({
  name: 'ProcessUser',
  taskFunction: async (user: UserData): Promise<ProcessedUser> => {
    return {
      userId: user.id,
      displayName: user.name.toUpperCase(),
      normalized: true
    };
  }
});

// Type safety enforced at compile time
const result: ProcessedUser = await processUserTask.trigger({
  id: '123',
  name: 'John Doe',
  email: 'john@example.com'
});

Generic Setup Values

Tasks can accept setup values through generics, perfect for configuration:

interface TaskConfig {
  apiEndpoint: string;
  retryCount: number;
  timeout: number;
}

const configuredTask = new Task<TaskConfig>({
  name: 'ConfiguredTask',
  taskSetup: async () => ({
    apiEndpoint: 'https://api.example.com',
    retryCount: 3,
    timeout: 5000
  }),
  taskFunction: async (data: any, setupValue: TaskConfig) => {
    // setupValue is fully typed!
    for (let i = 0; i < setupValue.retryCount; i++) {
      try {
        return await fetchWithTimeout(
          setupValue.apiEndpoint,
          setupValue.timeout
        );
      } catch (error) {
        if (i === setupValue.retryCount - 1) throw error;
      }
    }
  }
});

Type-Safe Task Chains

Chain tasks with preserved type flow:

// Each task knows its input and output types
const fetchTask = new Task<void, UserData[]>({
  name: 'FetchUsers',
  taskFunction: async (): Promise<UserData[]> => {
    return await api.getUsers();
  }
});

const filterTask = new Task<UserData[], UserData[]>({
  name: 'FilterActive',
  taskFunction: async (users: UserData[]): Promise<UserData[]> => {
    return users.filter(user => user.isActive);
  }
});

const mapTask = new Task<UserData[], ProcessedUser[]>({
  name: 'MapToProcessed',
  taskFunction: async (users: UserData[]): Promise<ProcessedUser[]> => {
    return users.map(transformUser);
  }
});

// Type safety flows through the chain
const chain = new Taskchain({
  name: 'UserPipeline',
  taskArray: [fetchTask, filterTask, mapTask]
});

const finalResult: ProcessedUser[] = await chain.trigger();

Buffer Behavior Deep Dive 🌊

The buffer system in TaskBuffer provides intelligent control over concurrent executions, preventing system overload while maximizing throughput.

How Buffering Works

When a task is buffered, TaskBuffer manages a queue of executions:

const bufferedTask = new Task({
  name: 'BufferedOperation',
  taskFunction: async (data) => {
    console.log(`Processing: ${data}`);
    await simulateWork();
    return `Processed: ${data}`;
  },
  buffered: true,
  bufferMax: 3  // Maximum 3 concurrent executions
});

// Trigger 10 executions rapidly
for (let i = 0; i < 10; i++) {
  bufferedTask.trigger(`Item ${i}`);
}

// What happens:
// 1. First 3 tasks start immediately
// 2. Items 4-10 are queued
// 3. As each task completes, next queued item starts
// 4. Never more than 3 tasks running simultaneously

Buffer Truncation Behavior

When buffer limit is reached, new calls are intelligently managed:

const truncatingTask = new Task({
  name: 'TruncatingBuffer',
  taskFunction: async (data) => {
    await processData(data);
  },
  buffered: true,
  bufferMax: 5  // Maximum 5 in buffer
});

// Rapid fire 100 calls
for (let i = 0; i < 100; i++) {
  truncatingTask.trigger(`Data ${i}`);
}

// Buffer behavior:
// - First 5 calls: Added to buffer and start processing
// - Calls 6-100: Each overwrites the 5th buffer slot
// - Result: Only processes items 0,1,2,3, and 99 (last one)
// - This prevents memory overflow in high-frequency scenarios

Advanced Buffer Strategies

1. Sliding Window Buffer

Perfect for real-time data processing where only recent items matter:

const slidingWindowTask = new Task({
  name: 'SlidingWindow',
  taskFunction: async (data) => {
    return await analyzeRecentData(data);
  },
  buffered: true,
  bufferMax: 10,  // Keep last 10 items
  execDelay: 100  // Process every 100ms
});

// In a real-time stream scenario
dataStream.on('data', (chunk) => {
  slidingWindowTask.trigger(chunk);
  // Older items automatically dropped when buffer full
});

2. Throttled Buffer

Combine buffering with execution delays for rate limiting:

const apiRateLimiter = new Task({
  name: 'RateLimitedAPI',
  taskFunction: async (request) => {
    return await api.call(request);
  },
  buffered: true,
  bufferMax: 10,     // Max 10 queued requests
  execDelay: 1000    // 1 second between executions
});

// Requests are queued and executed at 1/second
// Prevents API rate limit violations

3. Priority Buffer (Custom Implementation)

Implement priority queuing with buffer management:

class PriorityBufferedTask extends Task {
  private priorityQueue: Array<{data: any, priority: number}> = [];
  
  constructor(options) {
    super({
      ...options,
      taskFunction: async (item) => {
        // Process based on priority
        return await this.processByPriority(item);
      }
    });
  }
  
  triggerWithPriority(data: any, priority: number) {
    if (this.priorityQueue.length >= this.bufferMax) {
      // Remove lowest priority item if buffer full
      this.priorityQueue.sort((a, b) => b.priority - a.priority);
      this.priorityQueue.pop();
    }
    this.priorityQueue.push({data, priority});
    this.priorityQueue.sort((a, b) => b.priority - a.priority);
    return this.trigger(this.priorityQueue.shift());
  }
}

Buffer Monitoring

Track buffer utilization and performance:

const monitoredTask = new Task({
  name: 'MonitoredBuffer',
  taskFunction: async (data) => {
    const startTime = Date.now();
    const result = await processData(data);
    console.log(`Processing time: ${Date.now() - startTime}ms`);
    console.log(`Buffer utilization: ${monitoredTask.bufferRunner.bufferCounter}/${monitoredTask.bufferMax}`);
    return result;
  },
  buffered: true,
  bufferMax: 20
});

// Monitor buffer saturation
setInterval(() => {
  const utilization = (monitoredTask.bufferRunner.bufferCounter / monitoredTask.bufferMax) * 100;
  if (utilization > 80) {
    console.warn(`Buffer near capacity: ${utilization.toFixed(1)}%`);
  }
}, 1000);

Buffer Best Practices

  1. Choose appropriate buffer sizes:

    • I/O operations: 5-10 concurrent
    • CPU-intensive: Number of cores
    • API calls: Based on rate limits
  2. Handle buffer overflow gracefully:

    const task = new Task({
      taskFunction: async (data) => {
        try {
          return await process(data);
        } catch (error) {
          if (error.code === 'BUFFER_OVERFLOW') {
            // Implement backoff strategy
            await delay(1000);
            return task.trigger(data);
          }
          throw error;
        }
      },
      buffered: true,
      bufferMax: 10
    });
    
  3. Monitor and adjust dynamically:

    // Adjust buffer size based on system load
    const adaptiveTask = new Task({
      name: 'AdaptiveBuffer',
      taskFunction: async (data) => {
        const cpuLoad = await getSystemLoad();
        if (cpuLoad > 0.8) {
          adaptiveTask.bufferMax = Math.max(2, adaptiveTask.bufferMax - 1);
        } else if (cpuLoad < 0.5) {
          adaptiveTask.bufferMax = Math.min(20, adaptiveTask.bufferMax + 1);
        }
        return await process(data);
      },
      buffered: true,
      bufferMax: 10
    });
    

Buffered Execution (Rate Limiting)

Perfect for API calls or database operations that need throttling:

const apiTask = new Task({
  name: 'APICall',
  taskFunction: async (endpoint: string) => {
    return await fetch(endpoint);
  },
  buffered: true,
  bufferMax: 3, // Maximum 3 concurrent executions
  execDelay: 1000, // Wait 1 second between executions
});

// These will be automatically throttled
for (let i = 0; i < 10; i++) {
  apiTask.trigger(`/api/data/${i}`);
}

Task Chains - Sequential Workflows

Build complex workflows where each step depends on the previous:

import { Task, Taskchain } from '@push.rocks/taskbuffer';

const fetchTask = new Task({
  name: 'FetchData',
  taskFunction: async () => {
    const response = await fetch('/api/data');
    return response.json();
  },
});

const transformTask = new Task({
  name: 'TransformData',
  taskFunction: async (data) => {
    return data.map((item) => ({
      ...item,
      processed: true,
      timestamp: Date.now(),
    }));
  },
});

const saveTask = new Task({
  name: 'SaveData',
  taskFunction: async (transformedData) => {
    await database.bulkInsert(transformedData);
    return { saved: transformedData.length };
  },
});

const workflow = new Taskchain({
  name: 'DataPipeline',
  taskArray: [fetchTask, transformTask, saveTask],
});

// Execute the entire chain
const result = await workflow.trigger();
console.log(`Processed ${result.saved} items`);

Parallel Execution - Maximum Performance

Execute multiple independent tasks simultaneously:

import { Task, Taskparallel } from '@push.rocks/taskbuffer';

const tasks = ['user', 'posts', 'comments'].map(
  (resource) =>
    new Task({
      name: `Fetch${resource}`,
      taskFunction: async () => {
        const data = await fetch(`/api/${resource}`);
        return data.json();
      },
    }),
);

const parallelFetch = new Taskparallel({
  taskArray: tasks,
});

// All tasks execute simultaneously
const [users, posts, comments] = await parallelFetch.trigger();

Scheduled Tasks with TaskManager

Run tasks on a schedule using cron expressions:

import { Task, TaskManager } from '@push.rocks/taskbuffer';

const backupTask = new Task({
  name: 'DatabaseBackup',
  taskFunction: async () => {
    await performBackup();
    console.log(`Backup completed at ${new Date().toISOString()}`);
  },
});

const manager = new TaskManager();

// Add and schedule tasks
manager.addAndScheduleTask(backupTask, '0 0 * * *'); // Daily at midnight
manager.addAndScheduleTask(healthCheck, '*/5 * * * *'); // Every 5 minutes

// Start the scheduler
manager.start();

// Later... stop if needed
manager.stop();

Debounced Tasks - Smart Throttling

Prevent task spam with intelligent debouncing:

import { TaskDebounced } from '@push.rocks/taskbuffer';

const saveTask = new TaskDebounced({
  name: 'AutoSave',
  taskFunction: async (content: string) => {
    await saveToDatabase(content);
    console.log('Content saved');
  },
  debounceTimeInMillis: 2000, // Wait 2 seconds of inactivity
});

// Rapid calls will be debounced
input.addEventListener('input', (e) => {
  saveTask.trigger(e.target.value);
});

One-Time Tasks - Initialize Once

Ensure initialization code runs exactly once:

import { TaskOnce } from '@push.rocks/taskbuffer';

const initTask = new TaskOnce({
  name: 'SystemInitialization',
  taskFunction: async () => {
    await database.connect();
    await cache.initialize();
    await loadConfiguration();
    console.log('System initialized');
  },
});

// Safe to call multiple times - only runs once
await initTask.trigger();
await initTask.trigger(); // This won't run again

Advanced Features 🔥

Task Dependencies with Pre/Post Hooks

Create sophisticated task relationships:

const validationTask = new Task({
  name: 'ValidateInput',
  taskFunction: async (data) => {
    if (!isValid(data)) {
      throw new Error('Validation failed');
    }
    return data;
  },
});

const mainTask = new Task({
  name: 'ProcessData',
  taskFunction: async (data) => {
    return await complexProcessing(data);
  },
  preTask: validationTask, // Runs before main task
  afterTask: cleanupTask, // Runs after main task
});

Task Runners - Distributed Execution

The TaskRunner system enables distributed task execution across multiple workers:

import { TaskRunner } from '@push.rocks/taskbuffer';

const runner = new TaskRunner({
  name: 'WorkerNode1',
  maxConcurrentTasks: 5,
});

// Register tasks this runner can handle
runner.registerTask(dataProcessingTask);
runner.registerTask(imageResizeTask);

// Start processing
runner.start();

Buffer Management Strategies

Fine-tune concurrent execution behavior:

const task = new Task({
  name: 'ResourceIntensive',
  taskFunction: async () => {
    /* ... */
  },
  buffered: true,
  bufferMax: 5, // Max 5 concurrent
  execDelay: 100, // 100ms between starts
  timeout: 30000, // 30 second timeout
});

Cycle Detection and Prevention

TaskBuffer automatically detects and prevents circular dependencies:

const taskA = new Task({
  name: 'TaskA',
  taskFunction: async () => {
    /* ... */
  },
  preTask: taskB, // This would create a cycle
});

const taskB = new Task({
  name: 'TaskB',
  taskFunction: async () => {
    /* ... */
  },
  preTask: taskA, // Circular dependency detected!
});

Dynamic Task Creation

Create tasks on-the-fly based on runtime conditions:

const dynamicWorkflow = async (config: Config) => {
  const tasks = config.steps.map(
    (step) =>
      new Task({
        name: step.name,
        taskFunction: async (input) => {
          return await processStep(step, input);
        },
      }),
  );

  const chain = new Taskchain({
    name: 'DynamicWorkflow',
    taskArray: tasks,
  });

  return await chain.trigger();
};

API Reference 📚

Task Options

Option Type Description
name string Unique identifier for the task
taskFunction Function Async function to execute
buffered boolean Enable buffer management
bufferMax number Maximum concurrent executions
execDelay number Delay between executions (ms)
timeout number Task timeout (ms)
preTask Task Task to run before
afterTask Task Task to run after

TaskManager Methods

Method Description
addTask(task, cronExpression) Add and schedule a task
removeTask(taskName) Remove a scheduled task
start() Start the scheduler
stop() Stop the scheduler
getStats() Get execution statistics

Taskchain Methods

Method Description
addTask(task) Add task to chain
removeTask(taskName) Remove task from chain
trigger(initialValue) Execute the chain
reset() Reset chain state

Performance Tips 🏎️

  1. Use buffering for I/O operations: Prevents overwhelming external services
  2. Leverage parallel execution: When tasks are independent, run them simultaneously
  3. Implement proper error handling: Use try-catch in task functions
  4. Monitor task execution: Use the built-in stats and logging
  5. Set appropriate timeouts: Prevent hanging tasks from blocking your system

Error Handling 🛡️

const robustTask = new Task({
  name: 'RobustOperation',
  taskFunction: async (input) => {
    try {
      return await riskyOperation(input);
    } catch (error) {
      // Log error
      console.error(`Task failed: ${error.message}`);

      // Optionally retry
      if (error.retryable) {
        return await riskyOperation(input);
      }

      // Or return default value
      return defaultValue;
    }
  },
  timeout: 5000, // Fail if takes longer than 5 seconds
});

Real-World Examples 🌍

API Rate Limiting

const apiClient = new Task({
  name: 'RateLimitedAPI',
  taskFunction: async (endpoint: string) => {
    return await fetch(`https://api.example.com${endpoint}`);
  },
  buffered: true,
  bufferMax: 10, // 10 requests
  execDelay: 100, // Per 100ms = 100 req/s max
});

Database Migration Pipeline

const migrationChain = new Taskchain({
  name: 'DatabaseMigration',
  taskArray: [
    backupTask,
    schemaUpdateTask,
    dataTransformTask,
    validationTask,
    cleanupTask,
  ],
});

Microservice Health Monitoring

const healthMonitor = new TaskManager();

services.forEach((service) => {
  const healthCheck = new Task({
    name: `HealthCheck:${service.name}`,
    taskFunction: async () => {
      const healthy = await checkHealth(service.url);
      if (!healthy) {
        await alertOps(service);
      }
    },
  });

  healthMonitor.addAndScheduleTask(healthCheck, '*/1 * * * *'); // Every minute
});

Testing 🧪

import { expect, tap } from '@git.zone/tstest';
import { Task } from '@push.rocks/taskbuffer';

tap.test('should execute task successfully', async () => {
  const result = await myTask.trigger();
  expect(result).toEqual(expectedValue);
});

tap.start();

Contributing 🤝

We welcome contributions! Please see our Contributing Guide for details.

Support 💬

This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the license file within this repository.

Please note: The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

Trademarks

This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.

Company Information

Task Venture Capital GmbH
Registered at District court Bremen HRB 35230 HB, Germany

For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.

By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.

Description
A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.
Readme MIT 1.2 MiB
Languages
TypeScript 100%