fix(task): Implement core Task execution flow, buffering and lifecycle; update README with generics and buffer docs

This commit is contained in:
2025-08-26 20:42:52 +00:00
parent d54012379c
commit b1725cbdf9
4 changed files with 328 additions and 1 deletions

View File

@@ -1,5 +1,16 @@
# Changelog
## 2025-08-26 - 3.1.10 - fix(task)
Implement core Task execution flow, buffering and lifecycle; update README with generics and buffer docs
- Implement Task.runTask including preTask/afterTask chaining, touched-task cycle prevention and error handling.
- Add Task helpers: extractTask, isTask, isTaskTouched and emptyTaskFunction (resolved promise).
- Introduce task lifecycle coordination: finished promise, resolveFinished, and blockingTasks to await dependent tasks.
- Support taskSetup/setupValue, execDelay handling, and wait-for-blocking-tasks before execution.
- Wire up trigger() to choose buffered vs unbuffered execution (triggerBuffered / triggerUnBuffered) and integrate BufferRunner.
- Improve logging and safer promise handling (caught errors are logged).
- Update README with extended TypeScript generics examples and expanded buffer behavior and strategies documentation.
## 2025-08-26 - 3.1.9 - fix(tests)
Update CI workflows, fix tests and refresh README/package metadata

316
readme.md
View File

@@ -72,6 +72,322 @@ const myTask = new Task({
const result = await myTask.trigger();
```
## TypeScript Generics Support 🔬
TaskBuffer leverages TypeScript's powerful generics system for complete type safety across your task chains and workflows.
### Generic Task Functions
Tasks support generic type parameters for both input and output types:
```typescript
import { Task, ITaskFunction } from '@push.rocks/taskbuffer';
// Define typed interfaces
interface UserData {
id: string;
name: string;
email: string;
}
interface ProcessedUser {
userId: string;
displayName: string;
normalized: boolean;
}
// Create strongly typed tasks
const processUserTask = new Task<UserData, ProcessedUser>({
name: 'ProcessUser',
taskFunction: async (user: UserData): Promise<ProcessedUser> => {
return {
userId: user.id,
displayName: user.name.toUpperCase(),
normalized: true
};
}
});
// Type safety enforced at compile time
const result: ProcessedUser = await processUserTask.trigger({
id: '123',
name: 'John Doe',
email: 'john@example.com'
});
```
### Generic Setup Values
Tasks can accept setup values through generics, perfect for configuration:
```typescript
interface TaskConfig {
apiEndpoint: string;
retryCount: number;
timeout: number;
}
const configuredTask = new Task<TaskConfig>({
name: 'ConfiguredTask',
taskSetup: async () => ({
apiEndpoint: 'https://api.example.com',
retryCount: 3,
timeout: 5000
}),
taskFunction: async (data: any, setupValue: TaskConfig) => {
// setupValue is fully typed!
for (let i = 0; i < setupValue.retryCount; i++) {
try {
return await fetchWithTimeout(
setupValue.apiEndpoint,
setupValue.timeout
);
} catch (error) {
if (i === setupValue.retryCount - 1) throw error;
}
}
}
});
```
### Type-Safe Task Chains
Chain tasks with preserved type flow:
```typescript
// Each task knows its input and output types
const fetchTask = new Task<void, UserData[]>({
name: 'FetchUsers',
taskFunction: async (): Promise<UserData[]> => {
return await api.getUsers();
}
});
const filterTask = new Task<UserData[], UserData[]>({
name: 'FilterActive',
taskFunction: async (users: UserData[]): Promise<UserData[]> => {
return users.filter(user => user.isActive);
}
});
const mapTask = new Task<UserData[], ProcessedUser[]>({
name: 'MapToProcessed',
taskFunction: async (users: UserData[]): Promise<ProcessedUser[]> => {
return users.map(transformUser);
}
});
// Type safety flows through the chain
const chain = new Taskchain({
name: 'UserPipeline',
taskArray: [fetchTask, filterTask, mapTask]
});
const finalResult: ProcessedUser[] = await chain.trigger();
```
## Buffer Behavior Deep Dive 🌊
The buffer system in TaskBuffer provides intelligent control over concurrent executions, preventing system overload while maximizing throughput.
### How Buffering Works
When a task is buffered, TaskBuffer manages a queue of executions:
```typescript
const bufferedTask = new Task({
name: 'BufferedOperation',
taskFunction: async (data) => {
console.log(`Processing: ${data}`);
await simulateWork();
return `Processed: ${data}`;
},
buffered: true,
bufferMax: 3 // Maximum 3 concurrent executions
});
// Trigger 10 executions rapidly
for (let i = 0; i < 10; i++) {
bufferedTask.trigger(`Item ${i}`);
}
// What happens:
// 1. First 3 tasks start immediately
// 2. Items 4-10 are queued
// 3. As each task completes, next queued item starts
// 4. Never more than 3 tasks running simultaneously
```
### Buffer Truncation Behavior
When buffer limit is reached, new calls are intelligently managed:
```typescript
const truncatingTask = new Task({
name: 'TruncatingBuffer',
taskFunction: async (data) => {
await processData(data);
},
buffered: true,
bufferMax: 5 // Maximum 5 in buffer
});
// Rapid fire 100 calls
for (let i = 0; i < 100; i++) {
truncatingTask.trigger(`Data ${i}`);
}
// Buffer behavior:
// - First 5 calls: Added to buffer and start processing
// - Calls 6-100: Each overwrites the 5th buffer slot
// - Result: Only processes items 0,1,2,3, and 99 (last one)
// - This prevents memory overflow in high-frequency scenarios
```
### Advanced Buffer Strategies
#### 1. **Sliding Window Buffer**
Perfect for real-time data processing where only recent items matter:
```typescript
const slidingWindowTask = new Task({
name: 'SlidingWindow',
taskFunction: async (data) => {
return await analyzeRecentData(data);
},
buffered: true,
bufferMax: 10, // Keep last 10 items
execDelay: 100 // Process every 100ms
});
// In a real-time stream scenario
dataStream.on('data', (chunk) => {
slidingWindowTask.trigger(chunk);
// Older items automatically dropped when buffer full
});
```
#### 2. **Throttled Buffer**
Combine buffering with execution delays for rate limiting:
```typescript
const apiRateLimiter = new Task({
name: 'RateLimitedAPI',
taskFunction: async (request) => {
return await api.call(request);
},
buffered: true,
bufferMax: 10, // Max 10 queued requests
execDelay: 1000 // 1 second between executions
});
// Requests are queued and executed at 1/second
// Prevents API rate limit violations
```
#### 3. **Priority Buffer** (Custom Implementation)
Implement priority queuing with buffer management:
```typescript
class PriorityBufferedTask extends Task {
private priorityQueue: Array<{data: any, priority: number}> = [];
constructor(options) {
super({
...options,
taskFunction: async (item) => {
// Process based on priority
return await this.processByPriority(item);
}
});
}
triggerWithPriority(data: any, priority: number) {
if (this.priorityQueue.length >= this.bufferMax) {
// Remove lowest priority item if buffer full
this.priorityQueue.sort((a, b) => b.priority - a.priority);
this.priorityQueue.pop();
}
this.priorityQueue.push({data, priority});
this.priorityQueue.sort((a, b) => b.priority - a.priority);
return this.trigger(this.priorityQueue.shift());
}
}
```
### Buffer Monitoring
Track buffer utilization and performance:
```typescript
const monitoredTask = new Task({
name: 'MonitoredBuffer',
taskFunction: async (data) => {
const startTime = Date.now();
const result = await processData(data);
console.log(`Processing time: ${Date.now() - startTime}ms`);
console.log(`Buffer utilization: ${monitoredTask.bufferRunner.bufferCounter}/${monitoredTask.bufferMax}`);
return result;
},
buffered: true,
bufferMax: 20
});
// Monitor buffer saturation
setInterval(() => {
const utilization = (monitoredTask.bufferRunner.bufferCounter / monitoredTask.bufferMax) * 100;
if (utilization > 80) {
console.warn(`Buffer near capacity: ${utilization.toFixed(1)}%`);
}
}, 1000);
```
### Buffer Best Practices
1. **Choose appropriate buffer sizes**:
- I/O operations: 5-10 concurrent
- CPU-intensive: Number of cores
- API calls: Based on rate limits
2. **Handle buffer overflow gracefully**:
```typescript
const task = new Task({
taskFunction: async (data) => {
try {
return await process(data);
} catch (error) {
if (error.code === 'BUFFER_OVERFLOW') {
// Implement backoff strategy
await delay(1000);
return task.trigger(data);
}
throw error;
}
},
buffered: true,
bufferMax: 10
});
```
3. **Monitor and adjust dynamically**:
```typescript
// Adjust buffer size based on system load
const adaptiveTask = new Task({
name: 'AdaptiveBuffer',
taskFunction: async (data) => {
const cpuLoad = await getSystemLoad();
if (cpuLoad > 0.8) {
adaptiveTask.bufferMax = Math.max(2, adaptiveTask.bufferMax - 1);
} else if (cpuLoad < 0.5) {
adaptiveTask.bufferMax = Math.min(20, adaptiveTask.bufferMax + 1);
}
return await process(data);
},
buffered: true,
bufferMax: 10
});
```
### Buffered Execution (Rate Limiting)
Perfect for API calls or database operations that need throttling:

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/taskbuffer',
version: '3.1.9',
version: '3.1.10',
description: 'A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.'
}