/** * @file test.perf-07.concurrent-processing.ts * @description Performance tests for concurrent processing capabilities */ import { tap } from '@git.zone/tstest/tapbundle'; import * as plugins from '../../plugins.js'; import { EInvoice } from '../../../ts/index.js'; import { CorpusLoader } from '../../suite/corpus.loader.js'; import { PerformanceTracker } from '../../suite/performance.tracker.js'; import * as os from 'os'; const corpusLoader = new CorpusLoader(); const performanceTracker = new PerformanceTracker('PERF-07: Concurrent Processing'); tap.test('PERF-07: Concurrent Processing - should handle concurrent operations efficiently', async (t) => { // Test 1: Concurrent format detection const concurrentDetection = await performanceTracker.measureAsync( 'concurrent-format-detection', async () => { const einvoice = new EInvoice(); const results = { concurrencyLevels: [], optimalConcurrency: 0, maxThroughput: 0 }; // Create test data with different formats const testData = [ ...Array(25).fill(null).map((_, i) => ({ id: `ubl-${i}`, content: `UBL-${i}` })), ...Array(25).fill(null).map((_, i) => ({ id: `cii-${i}`, content: `CII-${i}` })), ...Array(25).fill(null).map((_, i) => ({ id: `unknown-${i}`, content: `UNKNOWN-${i}` })) ]; // Test different concurrency levels const levels = [1, 2, 4, 8, 16, 32, 64]; for (const concurrency of levels) { const startTime = Date.now(); let completed = 0; let correct = 0; // Process in batches const batchSize = concurrency; const batches = []; for (let i = 0; i < testData.length; i += batchSize) { batches.push(testData.slice(i, i + batchSize)); } for (const batch of batches) { const promises = batch.map(async (item) => { const format = await einvoice.detectFormat(item.content); completed++; // Verify correctness if ((item.id.startsWith('ubl') && format === 'ubl') || (item.id.startsWith('cii') && format === 'cii') || (item.id.startsWith('unknown') && format === 'unknown')) { correct++; } return format; }); await Promise.all(promises); } const duration = Date.now() - startTime; const throughput = (completed / (duration / 1000)); const result = { concurrency, duration, completed, correct, accuracy: ((correct / completed) * 100).toFixed(2), throughput: throughput.toFixed(2), avgLatency: (duration / completed).toFixed(2) }; results.concurrencyLevels.push(result); if (throughput > results.maxThroughput) { results.maxThroughput = throughput; results.optimalConcurrency = concurrency; } } return results; } ); // Test 2: Concurrent validation const concurrentValidation = await performanceTracker.measureAsync( 'concurrent-validation', async () => { const einvoice = new EInvoice(); const results = { scenarios: [], resourceContention: null }; // Create test invoices with varying complexity const createInvoice = (id: number, complexity: 'simple' | 'medium' | 'complex') => { const itemCount = complexity === 'simple' ? 5 : complexity === 'medium' ? 20 : 50; const invoice = { format: 'ubl' as const, data: { documentType: 'INVOICE', invoiceNumber: `CONC-VAL-${complexity}-${id}`, issueDate: '2024-02-20', seller: { name: `Seller ${id}`, address: 'Address', country: 'US', taxId: `US${id}` }, buyer: { name: `Buyer ${id}`, address: 'Address', country: 'US', taxId: `US${id + 1000}` }, items: Array.from({ length: itemCount }, (_, i) => ({ description: `Item ${i + 1} for invoice ${id}`, quantity: Math.random() * 10, unitPrice: Math.random() * 100, vatRate: [5, 10, 15, 20][Math.floor(Math.random() * 4)], lineTotal: 0 })), totals: { netAmount: 0, vatAmount: 0, grossAmount: 0 } } }; // Calculate totals invoice.data.items.forEach(item => { item.lineTotal = item.quantity * item.unitPrice; invoice.data.totals.netAmount += item.lineTotal; invoice.data.totals.vatAmount += item.lineTotal * (item.vatRate / 100); }); invoice.data.totals.grossAmount = invoice.data.totals.netAmount + invoice.data.totals.vatAmount; return invoice; }; // Test scenarios const scenarios = [ { name: 'All simple', distribution: { simple: 30, medium: 0, complex: 0 } }, { name: 'Mixed load', distribution: { simple: 10, medium: 15, complex: 5 } }, { name: 'All complex', distribution: { simple: 0, medium: 0, complex: 30 } } ]; for (const scenario of scenarios) { const invoices = []; let id = 0; // Create invoices according to distribution for (const [complexity, count] of Object.entries(scenario.distribution)) { for (let i = 0; i < count; i++) { invoices.push(createInvoice(id++, complexity as any)); } } // Test with optimal concurrency from previous test const concurrency = concurrentDetection.result.optimalConcurrency || 8; const startTime = Date.now(); const startCPU = process.cpuUsage(); // Process concurrently const results = []; for (let i = 0; i < invoices.length; i += concurrency) { const batch = invoices.slice(i, i + concurrency); const batchResults = await Promise.all( batch.map(async (invoice) => { const start = Date.now(); const result = await einvoice.validateInvoice(invoice); return { duration: Date.now() - start, valid: result.isValid, errors: result.errors?.length || 0 }; }) ); results.push(...batchResults); } const totalDuration = Date.now() - startTime; const cpuUsage = process.cpuUsage(startCPU); // Analyze results const validCount = results.filter(r => r.valid).length; const avgDuration = results.reduce((sum, r) => sum + r.duration, 0) / results.length; const maxDuration = Math.max(...results.map(r => r.duration)); results.scenarios.push({ name: scenario.name, invoiceCount: invoices.length, concurrency, totalDuration, throughput: (invoices.length / (totalDuration / 1000)).toFixed(2), validCount, validationRate: ((validCount / invoices.length) * 100).toFixed(2), avgLatency: avgDuration.toFixed(2), maxLatency: maxDuration, cpuTime: ((cpuUsage.user + cpuUsage.system) / 1000).toFixed(2), cpuEfficiency: (((cpuUsage.user + cpuUsage.system) / 1000) / totalDuration * 100).toFixed(2) }); } // Test resource contention const contentionTest = async () => { const invoice = createInvoice(9999, 'medium'); const concurrencyLevels = [1, 10, 50, 100]; const results = []; for (const level of concurrencyLevels) { const start = Date.now(); const promises = Array(level).fill(null).map(() => einvoice.validateInvoice(invoice) ); await Promise.all(promises); const duration = Date.now() - start; results.push({ concurrency: level, totalTime: duration, avgTime: (duration / level).toFixed(2), throughput: (level / (duration / 1000)).toFixed(2) }); } return results; }; results.resourceContention = await contentionTest(); return results; } ); // Test 3: Concurrent file processing const concurrentFileProcessing = await performanceTracker.measureAsync( 'concurrent-file-processing', async () => { const files = await corpusLoader.getFilesByPattern('**/*.xml'); const einvoice = new EInvoice(); const results = { fileCount: 0, processedCount: 0, concurrencyTests: [], errorRates: new Map() }; // Sample files const sampleFiles = files.slice(0, 50); results.fileCount = sampleFiles.length; // Test different concurrency strategies const strategies = [ { name: 'Sequential', concurrency: 1 }, { name: 'Conservative', concurrency: 4 }, { name: 'Moderate', concurrency: 8 }, { name: 'Aggressive', concurrency: 16 }, { name: 'Max', concurrency: os.cpus().length * 2 } ]; for (const strategy of strategies) { const startTime = Date.now(); const startMemory = process.memoryUsage(); let processed = 0; let errors = 0; // Process files with specified concurrency const queue = [...sampleFiles]; const activePromises = new Set(); while (queue.length > 0 || activePromises.size > 0) { // Start new tasks up to concurrency limit while (activePromises.size < strategy.concurrency && queue.length > 0) { const file = queue.shift()!; const promise = (async () => { try { const content = await plugins.fs.readFile(file, 'utf-8'); const format = await einvoice.detectFormat(content); if (format && format !== 'unknown') { const invoice = await einvoice.parseInvoice(content, format); await einvoice.validateInvoice(invoice); processed++; } } catch (error) { errors++; } })(); activePromises.add(promise); promise.finally(() => activePromises.delete(promise)); } // Wait for at least one to complete if (activePromises.size > 0) { await Promise.race(activePromises); } } const duration = Date.now() - startTime; const endMemory = process.memoryUsage(); results.concurrencyTests.push({ strategy: strategy.name, concurrency: strategy.concurrency, duration, processed, errors, throughput: (processed / (duration / 1000)).toFixed(2), avgFileTime: (duration / sampleFiles.length).toFixed(2), memoryIncrease: ((endMemory.heapUsed - startMemory.heapUsed) / 1024 / 1024).toFixed(2), errorRate: ((errors / sampleFiles.length) * 100).toFixed(2) }); results.errorRates.set(strategy.concurrency, errors); results.processedCount = Math.max(results.processedCount, processed); } return results; } ); // Test 4: Mixed operation concurrency const mixedOperationConcurrency = await performanceTracker.measureAsync( 'mixed-operation-concurrency', async () => { const einvoice = new EInvoice(); const results = { operations: [], contentionAnalysis: null }; // Define mixed operations const operations = [ { name: 'detect', fn: async (id: number) => { const xml = `MIXED-${id}`; return await einvoice.detectFormat(xml); } }, { name: 'parse', fn: async (id: number) => { const xml = `PARSE-${id}2024-01-01`; return await einvoice.parseInvoice(xml, 'ubl'); } }, { name: 'validate', fn: async (id: number) => { const invoice = { format: 'ubl' as const, data: { documentType: 'INVOICE', invoiceNumber: `VAL-${id}`, issueDate: '2024-02-20', seller: { name: 'Seller', address: 'Address', country: 'US', taxId: 'US123' }, buyer: { name: 'Buyer', address: 'Address', country: 'US', taxId: 'US456' }, items: [{ description: 'Item', quantity: 1, unitPrice: 100, vatRate: 10, lineTotal: 100 }], totals: { netAmount: 100, vatAmount: 10, grossAmount: 110 } } }; return await einvoice.validateInvoice(invoice); } }, { name: 'convert', fn: async (id: number) => { const invoice = { format: 'ubl' as const, data: { documentType: 'INVOICE', invoiceNumber: `CONV-${id}`, issueDate: '2024-02-20', seller: { name: 'Seller', address: 'Address', country: 'US', taxId: 'US123' }, buyer: { name: 'Buyer', address: 'Address', country: 'US', taxId: 'US456' }, items: [{ description: 'Item', quantity: 1, unitPrice: 100, vatRate: 10, lineTotal: 100 }], totals: { netAmount: 100, vatAmount: 10, grossAmount: 110 } } }; return await einvoice.convertFormat(invoice, 'cii'); } } ]; // Test mixed workload const totalOperations = 200; const operationMix = Array.from({ length: totalOperations }, (_, i) => ({ operation: operations[i % operations.length], id: i })); // Shuffle to simulate real-world mix for (let i = operationMix.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)); [operationMix[i], operationMix[j]] = [operationMix[j], operationMix[i]]; } // Test with different concurrency levels const concurrencyLevels = [1, 5, 10, 20]; for (const concurrency of concurrencyLevels) { const startTime = Date.now(); const operationStats = new Map(operations.map(op => [op.name, { count: 0, totalTime: 0, errors: 0 }])); // Process operations for (let i = 0; i < operationMix.length; i += concurrency) { const batch = operationMix.slice(i, i + concurrency); await Promise.all(batch.map(async ({ operation, id }) => { const opStart = Date.now(); try { await operation.fn(id); operationStats.get(operation.name)!.count++; } catch { operationStats.get(operation.name)!.errors++; } operationStats.get(operation.name)!.totalTime += Date.now() - opStart; })); } const totalDuration = Date.now() - startTime; results.operations.push({ concurrency, totalDuration, throughput: (totalOperations / (totalDuration / 1000)).toFixed(2), operationBreakdown: Array.from(operationStats.entries()).map(([name, stats]) => ({ operation: name, count: stats.count, avgTime: stats.count > 0 ? (stats.totalTime / stats.count).toFixed(2) : 'N/A', errorRate: ((stats.errors / (stats.count + stats.errors)) * 100).toFixed(2) })) }); } // Analyze operation contention const contentionTest = async () => { const promises = []; const contentionResults = []; // Run all operations concurrently for (let i = 0; i < 10; i++) { for (const op of operations) { promises.push( (async () => { const start = Date.now(); await op.fn(1000 + i); return { operation: op.name, duration: Date.now() - start }; })() ); } } const results = await Promise.all(promises); // Group by operation const grouped = results.reduce((acc, r) => { if (!acc[r.operation]) acc[r.operation] = []; acc[r.operation].push(r.duration); return acc; }, {} as Record); for (const [op, durations] of Object.entries(grouped)) { const avg = durations.reduce((a, b) => a + b, 0) / durations.length; const min = Math.min(...durations); const max = Math.max(...durations); contentionResults.push({ operation: op, avgDuration: avg.toFixed(2), minDuration: min, maxDuration: max, variance: ((max - min) / avg * 100).toFixed(2) }); } return contentionResults; }; results.contentionAnalysis = await contentionTest(); return results; } ); // Test 5: Concurrent corpus processing const concurrentCorpusProcessing = await performanceTracker.measureAsync( 'concurrent-corpus-processing', async () => { const files = await corpusLoader.getFilesByPattern('**/*.xml'); const einvoice = new EInvoice(); const results = { totalFiles: files.length, processedFiles: 0, formatDistribution: new Map(), performanceMetrics: { startTime: Date.now(), endTime: 0, peakConcurrency: 0, avgResponseTime: 0, throughputOverTime: [] } }; // Process entire corpus with optimal concurrency const optimalConcurrency = concurrentDetection.result.optimalConcurrency || 16; const queue = [...files]; const activeOperations = new Map(); const responseTimes = []; // Track throughput over time const throughputInterval = setInterval(() => { const elapsed = (Date.now() - results.performanceMetrics.startTime) / 1000; const current = results.processedFiles; results.performanceMetrics.throughputOverTime.push({ time: elapsed, throughput: current / elapsed }); }, 1000); while (queue.length > 0 || activeOperations.size > 0) { // Start new operations while (activeOperations.size < optimalConcurrency && queue.length > 0) { const file = queue.shift()!; const operationId = `op-${Date.now()}-${Math.random()}`; activeOperations.set(operationId, { start: Date.now() }); (async () => { try { const content = await plugins.fs.readFile(file, 'utf-8'); const format = await einvoice.detectFormat(content); if (format && format !== 'unknown') { activeOperations.get(operationId)!.format = format; results.formatDistribution.set(format, (results.formatDistribution.get(format) || 0) + 1 ); const invoice = await einvoice.parseInvoice(content, format); await einvoice.validateInvoice(invoice); results.processedFiles++; } const duration = Date.now() - activeOperations.get(operationId)!.start; responseTimes.push(duration); } catch (error) { // Skip failed files } finally { activeOperations.delete(operationId); } })(); if (activeOperations.size > results.performanceMetrics.peakConcurrency) { results.performanceMetrics.peakConcurrency = activeOperations.size; } } // Wait for some to complete if (activeOperations.size > 0) { await new Promise(resolve => setTimeout(resolve, 10)); } } clearInterval(throughputInterval); results.performanceMetrics.endTime = Date.now(); // Calculate final metrics const totalDuration = results.performanceMetrics.endTime - results.performanceMetrics.startTime; results.performanceMetrics.avgResponseTime = responseTimes.length > 0 ? responseTimes.reduce((a, b) => a + b, 0) / responseTimes.length : 0; return { totalFiles: results.totalFiles, processedFiles: results.processedFiles, successRate: ((results.processedFiles / results.totalFiles) * 100).toFixed(2), totalDuration: totalDuration, overallThroughput: (results.processedFiles / (totalDuration / 1000)).toFixed(2), avgResponseTime: results.performanceMetrics.avgResponseTime.toFixed(2), peakConcurrency: results.performanceMetrics.peakConcurrency, formatDistribution: Array.from(results.formatDistribution.entries()), throughputProgression: results.performanceMetrics.throughputOverTime.slice(-5) }; } ); // Summary t.comment('\n=== PERF-07: Concurrent Processing Test Summary ==='); t.comment('\nConcurrent Format Detection:'); t.comment(' Concurrency | Duration | Throughput | Accuracy | Avg Latency'); t.comment(' ------------|----------|------------|----------|------------'); concurrentDetection.result.concurrencyLevels.forEach(level => { t.comment(` ${String(level.concurrency).padEnd(11)} | ${String(level.duration + 'ms').padEnd(8)} | ${level.throughput.padEnd(10)}/s | ${level.accuracy.padEnd(8)}% | ${level.avgLatency}ms`); }); t.comment(` Optimal concurrency: ${concurrentDetection.result.optimalConcurrency} (${concurrentDetection.result.maxThroughput.toFixed(2)} ops/sec)`); t.comment('\nConcurrent Validation Scenarios:'); concurrentValidation.result.scenarios.forEach(scenario => { t.comment(` ${scenario.name}:`); t.comment(` - Invoices: ${scenario.invoiceCount}, Concurrency: ${scenario.concurrency}`); t.comment(` - Duration: ${scenario.totalDuration}ms, Throughput: ${scenario.throughput}/sec`); t.comment(` - Validation rate: ${scenario.validationRate}%`); t.comment(` - Avg latency: ${scenario.avgLatency}ms, Max: ${scenario.maxLatency}ms`); t.comment(` - CPU efficiency: ${scenario.cpuEfficiency}%`); }); t.comment('\nConcurrent File Processing:'); t.comment(' Strategy | Concur. | Duration | Processed | Throughput | Errors | Memory'); t.comment(' ------------|---------|----------|-----------|------------|--------|-------'); concurrentFileProcessing.result.concurrencyTests.forEach(test => { t.comment(` ${test.strategy.padEnd(11)} | ${String(test.concurrency).padEnd(7)} | ${String(test.duration + 'ms').padEnd(8)} | ${String(test.processed).padEnd(9)} | ${test.throughput.padEnd(10)}/s | ${test.errorRate.padEnd(6)}% | ${test.memoryIncrease}MB`); }); t.comment('\nMixed Operation Concurrency:'); mixedOperationConcurrency.result.operations.forEach(test => { t.comment(` Concurrency ${test.concurrency}: ${test.throughput} ops/sec`); test.operationBreakdown.forEach(op => { t.comment(` - ${op.operation}: ${op.count} ops, avg ${op.avgTime}ms, ${op.errorRate}% errors`); }); }); t.comment('\nOperation Contention Analysis:'); mixedOperationConcurrency.result.contentionAnalysis.forEach(op => { t.comment(` ${op.operation}: avg ${op.avgDuration}ms (${op.minDuration}-${op.maxDuration}ms), variance ${op.variance}%`); }); t.comment('\nCorpus Concurrent Processing:'); t.comment(` Total files: ${concurrentCorpusProcessing.result.totalFiles}`); t.comment(` Processed: ${concurrentCorpusProcessing.result.processedFiles}`); t.comment(` Success rate: ${concurrentCorpusProcessing.result.successRate}%`); t.comment(` Duration: ${(concurrentCorpusProcessing.result.totalDuration / 1000).toFixed(2)}s`); t.comment(` Throughput: ${concurrentCorpusProcessing.result.overallThroughput} files/sec`); t.comment(` Avg response time: ${concurrentCorpusProcessing.result.avgResponseTime}ms`); t.comment(` Peak concurrency: ${concurrentCorpusProcessing.result.peakConcurrency}`); // Performance targets check t.comment('\n=== Performance Targets Check ==='); const targetConcurrency = 100; // Target: >100 concurrent ops/sec const achievedThroughput = parseFloat(concurrentDetection.result.maxThroughput.toFixed(2)); t.comment(`Concurrent throughput: ${achievedThroughput} ops/sec ${achievedThroughput > targetConcurrency ? '✅' : '⚠️'} (target: >${targetConcurrency}/sec)`); t.comment(`Optimal concurrency: ${concurrentDetection.result.optimalConcurrency} threads`); // Overall performance summary t.comment('\n=== Overall Performance Summary ==='); performanceTracker.logSummary(); t.end(); }); tap.start();