einvoice/test/suite/einvoice_error-handling/test.err-06.concurrent-errors.ts

571 lines
18 KiB
TypeScript
Raw Normal View History

2025-05-25 19:45:37 +00:00
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as einvoice from '../../../ts/index.js';
import * as plugins from '../../plugins.js';
import { PerformanceTracker } from '../../helpers/performance.tracker.js';
import { CorpusLoader } from '../../helpers/corpus.loader.js';
tap.test('ERR-06: Concurrent Operation Errors - Handle race conditions and concurrency issues', async (t) => {
const performanceTracker = new PerformanceTracker('ERR-06');
await t.test('Race condition detection', async () => {
performanceTracker.startOperation('race-conditions');
class SharedResource {
private value = 0;
private accessCount = 0;
private conflicts = 0;
private lock = false;
async unsafeIncrement(): Promise<void> {
this.accessCount++;
const current = this.value;
// Simulate async operation that could cause race condition
await new Promise(resolve => setTimeout(resolve, Math.random() * 10));
// Check if value changed while we were waiting
if (this.value !== current) {
this.conflicts++;
}
this.value = current + 1;
}
async safeIncrement(): Promise<void> {
while (this.lock) {
await new Promise(resolve => setTimeout(resolve, 1));
}
this.lock = true;
try {
await this.unsafeIncrement();
} finally {
this.lock = false;
}
}
getStats() {
return {
value: this.value,
accessCount: this.accessCount,
conflicts: this.conflicts,
conflictRate: this.conflicts / this.accessCount
};
}
}
// Test unsafe concurrent access
const unsafeResource = new SharedResource();
const unsafePromises = [];
for (let i = 0; i < 10; i++) {
unsafePromises.push(unsafeResource.unsafeIncrement());
}
await Promise.all(unsafePromises);
const unsafeStats = unsafeResource.getStats();
console.log('Unsafe concurrent access:');
console.log(` Final value: ${unsafeStats.value} (expected: 10)`);
console.log(` Conflicts detected: ${unsafeStats.conflicts}`);
console.log(` Conflict rate: ${(unsafeStats.conflictRate * 100).toFixed(1)}%`);
// Test safe concurrent access
const safeResource = new SharedResource();
const safePromises = [];
for (let i = 0; i < 10; i++) {
safePromises.push(safeResource.safeIncrement());
}
await Promise.all(safePromises);
const safeStats = safeResource.getStats();
console.log('\nSafe concurrent access:');
console.log(` Final value: ${safeStats.value} (expected: 10)`);
console.log(` Conflicts detected: ${safeStats.conflicts}`);
expect(safeStats.value).toEqual(10);
performanceTracker.endOperation('race-conditions');
});
await t.test('Deadlock prevention', async () => {
performanceTracker.startOperation('deadlock-prevention');
class LockManager {
private locks = new Map<string, { owner: string; acquired: number }>();
private waitingFor = new Map<string, string[]>();
async acquireLock(resource: string, owner: string, timeout = 5000): Promise<boolean> {
const startTime = Date.now();
while (this.locks.has(resource)) {
// Check for deadlock
if (this.detectDeadlock(owner, resource)) {
throw new Error(`Deadlock detected: ${owner} waiting for ${resource}`);
}
// Check timeout
if (Date.now() - startTime > timeout) {
throw new Error(`Lock acquisition timeout: ${resource}`);
}
// Add to waiting list
if (!this.waitingFor.has(owner)) {
this.waitingFor.set(owner, []);
}
this.waitingFor.get(owner)!.push(resource);
await new Promise(resolve => setTimeout(resolve, 10));
}
// Acquire lock
this.locks.set(resource, { owner, acquired: Date.now() });
this.waitingFor.delete(owner);
return true;
}
releaseLock(resource: string, owner: string): void {
const lock = this.locks.get(resource);
if (lock && lock.owner === owner) {
this.locks.delete(resource);
}
}
private detectDeadlock(owner: string, resource: string): boolean {
const visited = new Set<string>();
const stack = [owner];
while (stack.length > 0) {
const current = stack.pop()!;
if (visited.has(current)) {
continue;
}
visited.add(current);
// Check who owns the resource we're waiting for
const resourceLock = this.locks.get(resource);
if (resourceLock && resourceLock.owner === owner) {
return true; // Circular dependency detected
}
// Check what the current owner is waiting for
const waiting = this.waitingFor.get(current) || [];
stack.push(...waiting);
}
return false;
}
}
const lockManager = new LockManager();
// Test successful lock acquisition
try {
await lockManager.acquireLock('resource1', 'process1');
console.log('✓ Lock acquired successfully');
lockManager.releaseLock('resource1', 'process1');
} catch (error) {
console.log(`✗ Lock acquisition failed: ${error.message}`);
}
// Test timeout
try {
await lockManager.acquireLock('resource2', 'process2');
// Don't release, cause timeout for next acquirer
await lockManager.acquireLock('resource2', 'process3', 100);
} catch (error) {
expect(error.message).toMatch(/timeout/i);
console.log(`✓ Lock timeout detected: ${error.message}`);
} finally {
lockManager.releaseLock('resource2', 'process2');
}
performanceTracker.endOperation('deadlock-prevention');
});
await t.test('Concurrent file access errors', async () => {
performanceTracker.startOperation('file-access-conflicts');
const tempDir = '.nogit/concurrent-test';
await plugins.fs.ensureDir(tempDir);
const testFile = plugins.path.join(tempDir, 'concurrent.xml');
// Test concurrent writes
const writers = [];
for (let i = 0; i < 5; i++) {
writers.push(
plugins.fs.writeFile(
testFile,
`<invoice id="${i}">\n <amount>100</amount>\n</invoice>`
).catch(err => ({ error: err, writer: i }))
);
}
const writeResults = await Promise.all(writers);
const writeErrors = writeResults.filter(r => r.error);
console.log(`Concurrent writes: ${writers.length} attempts, ${writeErrors.length} errors`);
// Test concurrent read/write
const readWriteOps = [];
// Writer
readWriteOps.push(
plugins.fs.writeFile(testFile, '<invoice>Updated</invoice>')
.then(() => ({ type: 'write', success: true }))
.catch(err => ({ type: 'write', error: err }))
);
// Multiple readers
for (let i = 0; i < 3; i++) {
readWriteOps.push(
plugins.fs.readFile(testFile, 'utf8')
.then(content => ({ type: 'read', success: true, content }))
.catch(err => ({ type: 'read', error: err }))
);
}
const readWriteResults = await Promise.all(readWriteOps);
const successfulReads = readWriteResults.filter(r => r.type === 'read' && r.success);
console.log(`Concurrent read/write: ${successfulReads.length} successful reads`);
// Cleanup
await plugins.fs.remove(tempDir);
performanceTracker.endOperation('file-access-conflicts');
});
await t.test('Thread pool exhaustion', async () => {
performanceTracker.startOperation('thread-pool-exhaustion');
class ThreadPool {
private active = 0;
private queue: Array<() => Promise<void>> = [];
private results = { completed: 0, rejected: 0, queued: 0 };
constructor(private maxThreads: number) {}
async execute<T>(task: () => Promise<T>): Promise<T> {
if (this.active >= this.maxThreads) {
if (this.queue.length >= this.maxThreads * 2) {
this.results.rejected++;
throw new Error('Thread pool exhausted - queue is full');
}
// Queue the task
return new Promise((resolve, reject) => {
this.results.queued++;
this.queue.push(async () => {
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
});
});
}
this.active++;
try {
const result = await task();
this.results.completed++;
return result;
} finally {
this.active--;
this.processQueue();
}
}
private async processQueue(): Promise<void> {
if (this.queue.length > 0 && this.active < this.maxThreads) {
const task = this.queue.shift()!;
this.active++;
try {
await task();
this.results.completed++;
} finally {
this.active--;
this.processQueue();
}
}
}
getStats() {
return {
active: this.active,
queued: this.queue.length,
results: this.results
};
}
}
const threadPool = new ThreadPool(3);
const tasks = [];
// Submit many tasks
for (let i = 0; i < 10; i++) {
tasks.push(
threadPool.execute(async () => {
await new Promise(resolve => setTimeout(resolve, 50));
return `Task ${i} completed`;
}).catch(err => ({ error: err.message }))
);
}
console.log('Thread pool stats during execution:', threadPool.getStats());
const results = await Promise.all(tasks);
const errors = results.filter(r => r.error);
console.log('Thread pool final stats:', threadPool.getStats());
console.log(`Errors: ${errors.length}`);
performanceTracker.endOperation('thread-pool-exhaustion');
});
await t.test('Concurrent validation conflicts', async () => {
performanceTracker.startOperation('validation-conflicts');
const corpusLoader = new CorpusLoader();
const xmlFiles = await corpusLoader.getFiles(/\.xml$/);
// Test concurrent validation of same document
const testXml = xmlFiles.length > 0
? await plugins.fs.readFile(xmlFiles[0].path, 'utf8')
: '<invoice><id>TEST-001</id></invoice>';
const concurrentValidations = [];
const validationCount = 5;
for (let i = 0; i < validationCount; i++) {
concurrentValidations.push(
(async () => {
const startTime = performance.now();
const invoice = new einvoice.EInvoice();
try {
await invoice.fromXmlString(testXml);
if (invoice.validate) {
const result = await invoice.validate();
return {
validator: i,
success: true,
duration: performance.now() - startTime,
valid: result.valid
};
} else {
return {
validator: i,
success: true,
duration: performance.now() - startTime,
valid: null
};
}
} catch (error) {
return {
validator: i,
success: false,
duration: performance.now() - startTime,
error: error.message
};
}
})()
);
}
const validationResults = await Promise.all(concurrentValidations);
console.log(`\nConcurrent validation results (${validationCount} validators):`);
validationResults.forEach(result => {
if (result.success) {
console.log(` Validator ${result.validator}: Success (${result.duration.toFixed(1)}ms)`);
} else {
console.log(` Validator ${result.validator}: Failed - ${result.error}`);
}
});
// Check for consistency
const validResults = validationResults.filter(r => r.success && r.valid !== null);
if (validResults.length > 1) {
const allSame = validResults.every(r => r.valid === validResults[0].valid);
console.log(`Validation consistency: ${allSame ? '✓ All consistent' : '✗ Inconsistent results'}`);
}
performanceTracker.endOperation('validation-conflicts');
});
await t.test('Semaphore implementation', async () => {
performanceTracker.startOperation('semaphore');
class Semaphore {
private permits: number;
private waitQueue: Array<() => void> = [];
constructor(private maxPermits: number) {
this.permits = maxPermits;
}
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits--;
return;
}
// Wait for permit
return new Promise(resolve => {
this.waitQueue.push(resolve);
});
}
release(): void {
if (this.waitQueue.length > 0) {
const waiting = this.waitQueue.shift()!;
waiting();
} else {
this.permits++;
}
}
async withPermit<T>(operation: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await operation();
} finally {
this.release();
}
}
getAvailablePermits(): number {
return this.permits;
}
getWaitingCount(): number {
return this.waitQueue.length;
}
}
const semaphore = new Semaphore(2);
const operations = [];
console.log('\nTesting semaphore with 2 permits:');
for (let i = 0; i < 5; i++) {
operations.push(
semaphore.withPermit(async () => {
console.log(` Operation ${i} started (available: ${semaphore.getAvailablePermits()}, waiting: ${semaphore.getWaitingCount()})`);
await new Promise(resolve => setTimeout(resolve, 50));
console.log(` Operation ${i} completed`);
return i;
})
);
}
await Promise.all(operations);
console.log(`Final state - Available permits: ${semaphore.getAvailablePermits()}`);
performanceTracker.endOperation('semaphore');
});
await t.test('Concurrent modification detection', async () => {
performanceTracker.startOperation('modification-detection');
class VersionedDocument {
private version = 0;
private content: any = {};
private modificationLog: Array<{ version: number; timestamp: number; changes: string }> = [];
getVersion(): number {
return this.version;
}
async modify(changes: any, expectedVersion: number): Promise<void> {
if (this.version !== expectedVersion) {
throw new Error(
`Concurrent modification detected: expected version ${expectedVersion}, current version ${this.version}`
);
}
// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 10));
// Apply changes
Object.assign(this.content, changes);
this.version++;
this.modificationLog.push({
version: this.version,
timestamp: Date.now(),
changes: JSON.stringify(changes)
});
}
getContent(): any {
return { ...this.content };
}
getModificationLog() {
return [...this.modificationLog];
}
}
const document = new VersionedDocument();
// Concurrent modifications with version checking
const modifications = [
{ user: 'A', changes: { field1: 'valueA' }, delay: 0 },
{ user: 'B', changes: { field2: 'valueB' }, delay: 5 },
{ user: 'C', changes: { field3: 'valueC' }, delay: 10 }
];
const results = await Promise.all(
modifications.map(async (mod) => {
await new Promise(resolve => setTimeout(resolve, mod.delay));
const version = document.getVersion();
try {
await document.modify(mod.changes, version);
return { user: mod.user, success: true, version };
} catch (error) {
return { user: mod.user, success: false, error: error.message };
}
})
);
console.log('\nConcurrent modification results:');
results.forEach(result => {
if (result.success) {
console.log(` User ${result.user}: Success (from version ${result.version})`);
} else {
console.log(` User ${result.user}: Failed - ${result.error}`);
}
});
console.log(`Final document version: ${document.getVersion()}`);
console.log(`Final content:`, document.getContent());
performanceTracker.endOperation('modification-detection');
});
// Performance summary
console.log('\n' + performanceTracker.getSummary());
// Concurrent error handling best practices
console.log('\nConcurrent Operation Error Handling Best Practices:');
console.log('1. Use proper locking mechanisms (mutex, semaphore) for shared resources');
console.log('2. Implement deadlock detection and prevention strategies');
console.log('3. Use optimistic locking with version numbers for documents');
console.log('4. Set reasonable timeouts for lock acquisition');
console.log('5. Implement thread pool limits to prevent resource exhaustion');
console.log('6. Use atomic operations where possible');
console.log('7. Log all concurrent access attempts for debugging');
});
tap.start();