/**
* @file test.perf-09.streaming.ts
* @description Performance tests for streaming operations
*/
import { tap } from '@git.zone/tstest/tapbundle';
import * as plugins from '../../plugins.js';
import { EInvoice } from '../../../ts/index.js';
import { CorpusLoader } from '../../suite/corpus.loader.js';
import { PerformanceTracker } from '../../suite/performance.tracker.js';
import { FormatDetector } from '../../../ts/formats/utils/format.detector.js';
import { Readable, Writable, Transform } from 'stream';
const performanceTracker = new PerformanceTracker('PERF-09: Streaming Performance');
tap.test('PERF-09: Streaming Performance - should handle streaming operations efficiently', async (t) => {
// Test 1: Streaming XML parsing
const streamingXMLParsing = await performanceTracker.measureAsync(
'streaming-xml-parsing',
async () => {
const results = {
tests: [],
memoryEfficiency: null
};
// Create test XML streams of different sizes
const createXMLStream = (itemCount: number): Readable => {
let currentItem = 0;
let headerSent = false;
let itemsSent = false;
return new Readable({
read() {
if (!headerSent) {
this.push(`
STREAM-${itemCount}
2024-03-01
Streaming Supplier
Streaming Customer
`);
headerSent = true;
} else if (currentItem < itemCount) {
// Send items in chunks
const chunkSize = Math.min(10, itemCount - currentItem);
let chunk = '';
for (let i = 0; i < chunkSize; i++) {
chunk += `
${currentItem + i + 1}
1
100.00
-
Streaming Item ${currentItem + i + 1}
`;
}
this.push(chunk);
currentItem += chunkSize;
// Simulate streaming delay
setTimeout(() => this.read(), 1);
} else if (!itemsSent) {
this.push(`
`);
itemsSent = true;
} else {
this.push(null); // End stream
}
}
});
};
// Test different stream sizes
const streamSizes = [
{ items: 10, name: 'Small stream' },
{ items: 100, name: 'Medium stream' },
{ items: 1000, name: 'Large stream' },
{ items: 5000, name: 'Very large stream' }
];
for (const size of streamSizes) {
const startTime = Date.now();
const startMemory = process.memoryUsage();
const memorySnapshots = [];
// Create monitoring interval
const monitorInterval = setInterval(() => {
memorySnapshots.push(process.memoryUsage().heapUsed / 1024 / 1024);
}, 100);
try {
// Simulate streaming parsing
const stream = createXMLStream(size.items);
const chunks = [];
let totalBytes = 0;
await new Promise((resolve, reject) => {
stream.on('data', (chunk) => {
chunks.push(chunk);
totalBytes += chunk.length;
});
stream.on('end', async () => {
clearInterval(monitorInterval);
// Parse accumulated XML
const xml = chunks.join('');
const format = FormatDetector.detectFormat(xml);
const invoice = await EInvoice.fromXml(xml);
const endTime = Date.now();
const endMemory = process.memoryUsage();
results.tests.push({
size: size.name,
items: size.items,
totalBytes: (totalBytes / 1024).toFixed(2),
duration: endTime - startTime,
memoryUsed: ((endMemory.heapUsed - startMemory.heapUsed) / 1024 / 1024).toFixed(2),
peakMemory: Math.max(...memorySnapshots).toFixed(2),
avgMemory: (memorySnapshots.reduce((a, b) => a + b, 0) / memorySnapshots.length).toFixed(2),
throughput: ((totalBytes / 1024) / ((endTime - startTime) / 1000)).toFixed(2),
itemsProcessed: size.items
});
resolve(null);
});
stream.on('error', reject);
});
} catch (error) {
clearInterval(monitorInterval);
results.tests.push({
size: size.name,
error: error.message
});
}
}
// Analyze memory efficiency
if (results.tests.length >= 2) {
const small = results.tests[0];
const large = results.tests[results.tests.length - 1];
if (!small.error && !large.error) {
results.memoryEfficiency = {
smallStreamMemory: small.memoryUsed,
largeStreamMemory: large.memoryUsed,
memoryScaling: (parseFloat(large.memoryUsed) / parseFloat(small.memoryUsed)).toFixed(2),
itemScaling: large.items / small.items,
efficient: parseFloat(large.memoryUsed) < parseFloat(small.memoryUsed) * (large.items / small.items)
};
}
}
return results;
}
);
// Test 2: Stream transformation pipeline
const streamTransformation = await performanceTracker.measureAsync(
'stream-transformation-pipeline',
async () => {
const results = {
pipelines: [],
transformationStats: null
};
// Create transformation streams
class FormatDetectionStream extends Transform {
constructor() {
super({ objectMode: true });
}
async _transform(chunk: any, encoding: string, callback: Function) {
try {
const format = FormatDetector.detectFormat(chunk.content);
this.push({ ...chunk, format });
callback();
} catch (error) {
callback(error);
}
}
}
class ValidationStream extends Transform {
constructor() {
super({ objectMode: true });
}
async _transform(chunk: any, encoding: string, callback: Function) {
try {
if (chunk.format && chunk.format !== 'unknown') {
const invoice = await EInvoice.fromXml(chunk.content);
const validation = await invoice.validate();
this.push({ ...chunk, valid: validation.valid, errors: validation.errors?.length || 0 });
} else {
this.push({ ...chunk, valid: false, errors: -1 });
}
callback();
} catch (error) {
callback(error);
}
}
}
// Test different pipeline configurations
const pipelineConfigs = [
{
name: 'Simple pipeline',
batchSize: 10,
stages: ['detect', 'validate']
},
{
name: 'Parallel pipeline',
batchSize: 50,
stages: ['detect', 'validate'],
parallel: true
},
{
name: 'Complex pipeline',
batchSize: 100,
stages: ['detect', 'parse', 'validate', 'convert']
}
];
// Create test data
const testInvoices = Array.from({ length: 100 }, (_, i) => ({
id: i,
content: `
PIPELINE-${i}
2024-03-01
Supplier ${i}
Customer ${i}
1
1
${100 + i}
`
}));
for (const config of pipelineConfigs) {
const startTime = Date.now();
const processedItems = [];
try {
// Create pipeline
const inputStream = new Readable({
objectMode: true,
read() {
const item = testInvoices.shift();
if (item) {
this.push(item);
} else {
this.push(null);
}
}
});
const outputStream = new Writable({
objectMode: true,
write(chunk, encoding, callback) {
processedItems.push(chunk);
callback();
}
});
// Build pipeline
let pipeline = inputStream;
if (config.stages.includes('detect')) {
pipeline = pipeline.pipe(new FormatDetectionStream());
}
if (config.stages.includes('validate')) {
pipeline = pipeline.pipe(new ValidationStream());
}
// Process
await new Promise((resolve, reject) => {
pipeline.pipe(outputStream)
.on('finish', resolve)
.on('error', reject);
});
const endTime = Date.now();
const duration = endTime - startTime;
results.pipelines.push({
name: config.name,
batchSize: config.batchSize,
stages: config.stages.length,
itemsProcessed: processedItems.length,
duration,
throughput: (processedItems.length / (duration / 1000)).toFixed(2),
avgLatency: (duration / processedItems.length).toFixed(2),
validItems: processedItems.filter(i => i.valid).length,
errorItems: processedItems.filter(i => !i.valid).length
});
} catch (error) {
results.pipelines.push({
name: config.name,
error: error.message
});
}
}
// Analyze transformation efficiency
if (results.pipelines.length > 0) {
const validPipelines = results.pipelines.filter(p => !p.error);
if (validPipelines.length > 0) {
const avgThroughput = validPipelines.reduce((sum, p) => sum + parseFloat(p.throughput), 0) / validPipelines.length;
const bestPipeline = validPipelines.reduce((best, p) =>
parseFloat(p.throughput) > parseFloat(best.throughput) ? p : best
);
results.transformationStats = {
avgThroughput: avgThroughput.toFixed(2),
bestPipeline: bestPipeline.name,
bestThroughput: bestPipeline.throughput
};
}
}
return results;
}
);
// Test 3: Backpressure handling
const backpressureHandling = await performanceTracker.measureAsync(
'backpressure-handling',
async () => {
const results = {
scenarios: [],
backpressureStats: null
};
// Test scenarios with different processing speeds
const scenarios = [
{
name: 'Fast producer, slow consumer',
producerDelay: 1,
consumerDelay: 10,
bufferSize: 100
},
{
name: 'Slow producer, fast consumer',
producerDelay: 10,
consumerDelay: 1,
bufferSize: 100
},
{
name: 'Balanced pipeline',
producerDelay: 5,
consumerDelay: 5,
bufferSize: 100
},
{
name: 'High volume burst',
producerDelay: 0,
consumerDelay: 5,
bufferSize: 1000
}
];
for (const scenario of scenarios) {
const startTime = Date.now();
const metrics = {
produced: 0,
consumed: 0,
buffered: 0,
maxBuffered: 0,
backpressureEvents: 0
};
try {
// Create producer stream
const producer = new Readable({
objectMode: true,
highWaterMark: scenario.bufferSize,
read() {
if (metrics.produced < 100) {
setTimeout(() => {
this.push({
id: metrics.produced++,
content: `BP-${metrics.produced}`
});
metrics.buffered = metrics.produced - metrics.consumed;
if (metrics.buffered > metrics.maxBuffered) {
metrics.maxBuffered = metrics.buffered;
}
}, scenario.producerDelay);
} else {
this.push(null);
}
}
});
// Create consumer stream with processing
const consumer = new Writable({
objectMode: true,
highWaterMark: scenario.bufferSize,
async write(chunk, encoding, callback) {
// Simulate processing
await new Promise(resolve => setTimeout(resolve, scenario.consumerDelay));
// Process invoice
const format = FormatDetector.detectFormat(chunk.content);
metrics.consumed++;
metrics.buffered = metrics.produced - metrics.consumed;
callback();
}
});
// Monitor backpressure
producer.on('pause', () => metrics.backpressureEvents++);
// Process
await new Promise((resolve, reject) => {
producer.pipe(consumer)
.on('finish', resolve)
.on('error', reject);
});
const endTime = Date.now();
const duration = endTime - startTime;
results.scenarios.push({
name: scenario.name,
duration,
produced: metrics.produced,
consumed: metrics.consumed,
maxBuffered: metrics.maxBuffered,
backpressureEvents: metrics.backpressureEvents,
throughput: (metrics.consumed / (duration / 1000)).toFixed(2),
efficiency: ((metrics.consumed / metrics.produced) * 100).toFixed(2),
avgBufferUtilization: ((metrics.maxBuffered / scenario.bufferSize) * 100).toFixed(2)
});
} catch (error) {
results.scenarios.push({
name: scenario.name,
error: error.message
});
}
}
// Analyze backpressure handling
const validScenarios = results.scenarios.filter(s => !s.error);
if (validScenarios.length > 0) {
results.backpressureStats = {
avgBackpressureEvents: (validScenarios.reduce((sum, s) => sum + s.backpressureEvents, 0) / validScenarios.length).toFixed(2),
maxBufferUtilization: Math.max(...validScenarios.map(s => parseFloat(s.avgBufferUtilization))).toFixed(2),
recommendation: validScenarios.some(s => s.backpressureEvents > 10) ?
'Consider increasing buffer sizes or optimizing processing speed' :
'Backpressure handling is adequate'
};
}
return results;
}
);
// Test 4: Corpus streaming analysis
const corpusStreaming = await performanceTracker.measureAsync(
'corpus-streaming-analysis',
async () => {
const files = await CorpusLoader.loadPattern('**/*.xml');
const results = {
streamableFiles: 0,
nonStreamableFiles: 0,
processingStats: {
streamed: [],
traditional: []
},
comparison: null
};
// Process sample files both ways
const sampleFiles = files.slice(0, 20);
for (const file of sampleFiles) {
try {
const stats = await plugins.fs.stat(file.path);
const fileSize = stats.size;
// Traditional processing
const traditionalStart = Date.now();
const content = await plugins.fs.readFile(file.path, 'utf-8');
const format = FormatDetector.detectFormat(content);
if (format && format !== 'unknown') {
const invoice = await EInvoice.fromXml(content);
await invoice.validate();
}
const traditionalEnd = Date.now();
results.processingStats.traditional.push({
size: fileSize,
time: traditionalEnd - traditionalStart
});
// Simulated streaming (chunked reading)
const streamingStart = Date.now();
const chunkSize = 64 * 1024; // 64KB chunks
const chunks = [];
// Read in chunks
const fd = await plugins.fs.open(file.path, 'r');
const buffer = Buffer.alloc(chunkSize);
let position = 0;
while (true) {
const result = await fd.read(buffer, 0, chunkSize, position);
const bytesRead = result.bytesRead;
if (bytesRead === 0) break;
chunks.push(Buffer.from(buffer.slice(0, bytesRead)).toString('utf-8'));
position += bytesRead;
}
await fd.close();
// Process accumulated content
const streamedContent = chunks.join('');
const streamedFormat = FormatDetector.detectFormat(streamedContent);
if (streamedFormat && streamedFormat !== 'unknown') {
const invoice = await EInvoice.fromXml(streamedContent);
await invoice.validate();
}
const streamingEnd = Date.now();
results.processingStats.streamed.push({
size: fileSize,
time: streamingEnd - streamingStart,
chunks: chunks.length
});
// Determine if file benefits from streaming
if (fileSize > 100 * 1024) { // Files > 100KB
results.streamableFiles++;
} else {
results.nonStreamableFiles++;
}
} catch (error) {
// Skip files that can't be processed
}
}
// Compare approaches
if (results.processingStats.traditional.length > 0 && results.processingStats.streamed.length > 0) {
const avgTraditional = results.processingStats.traditional.reduce((sum, s) => sum + s.time, 0) /
results.processingStats.traditional.length;
const avgStreamed = results.processingStats.streamed.reduce((sum, s) => sum + s.time, 0) /
results.processingStats.streamed.length;
const largeFiles = results.processingStats.traditional.filter(s => s.size > 100 * 1024);
const avgTraditionalLarge = largeFiles.length > 0 ?
largeFiles.reduce((sum, s) => sum + s.time, 0) / largeFiles.length : 0;
const largeStreamed = results.processingStats.streamed.filter(s => s.size > 100 * 1024);
const avgStreamedLarge = largeStreamed.length > 0 ?
largeStreamed.reduce((sum, s) => sum + s.time, 0) / largeStreamed.length : 0;
results.comparison = {
avgTraditionalTime: avgTraditional.toFixed(2),
avgStreamedTime: avgStreamed.toFixed(2),
overheadPercent: ((avgStreamed - avgTraditional) / avgTraditional * 100).toFixed(2),
largeFileImprovement: avgTraditionalLarge > 0 && avgStreamedLarge > 0 ?
((avgTraditionalLarge - avgStreamedLarge) / avgTraditionalLarge * 100).toFixed(2) : 'N/A',
recommendation: avgStreamed < avgTraditional * 1.1 ?
'Streaming provides benefits for this workload' :
'Traditional processing is more efficient for this workload'
};
}
return results;
}
);
// Test 5: Real-time streaming performance
const realtimeStreaming = await performanceTracker.measureAsync(
'realtime-streaming',
async () => {
const results = {
latencyTests: [],
jitterAnalysis: null
};
// Test real-time processing with different arrival rates
const arrivalRates = [
{ name: 'Low rate', invoicesPerSecond: 10 },
{ name: 'Medium rate', invoicesPerSecond: 50 },
{ name: 'High rate', invoicesPerSecond: 100 },
{ name: 'Burst rate', invoicesPerSecond: 200 }
];
for (const rate of arrivalRates) {
const testDuration = 5000; // 5 seconds
const interval = 1000 / rate.invoicesPerSecond;
const latencies = [];
let processed = 0;
let dropped = 0;
const startTime = Date.now();
// Create processing queue
const queue = [];
let processing = false;
const processNext = async () => {
if (processing || queue.length === 0) return;
processing = true;
const item = queue.shift();
try {
const processStart = Date.now();
const format = FormatDetector.detectFormat(item.content);
const invoice = await EInvoice.fromXml(item.content);
await invoice.validate();
const latency = Date.now() - item.arrivalTime;
latencies.push(latency);
processed++;
} catch (error) {
dropped++;
}
processing = false;
if (queue.length > 0) {
setImmediate(processNext);
}
};
// Generate invoices at specified rate
const generator = setInterval(() => {
const invoice = {
arrivalTime: Date.now(),
content: `RT-${Date.now()}2024-03-01`
};
// Apply backpressure - drop if queue is too large
if (queue.length < 100) {
queue.push(invoice);
processNext();
} else {
dropped++;
}
}, interval);
// Run test
await new Promise(resolve => setTimeout(resolve, testDuration));
clearInterval(generator);
// Process remaining items
while (queue.length > 0) {
await new Promise(resolve => setTimeout(resolve, 10));
}
// Calculate statistics
if (latencies.length > 0) {
latencies.sort((a, b) => a - b);
const avgLatency = latencies.reduce((a, b) => a + b, 0) / latencies.length;
const p50 = latencies[Math.floor(latencies.length * 0.5)];
const p95 = latencies[Math.floor(latencies.length * 0.95)];
const p99 = latencies[Math.floor(latencies.length * 0.99)];
// Calculate jitter
const jitters = [];
for (let i = 1; i < latencies.length; i++) {
jitters.push(Math.abs(latencies[i] - latencies[i - 1]));
}
const avgJitter = jitters.length > 0 ?
jitters.reduce((a, b) => a + b, 0) / jitters.length : 0;
results.latencyTests.push({
rate: rate.name,
targetRate: rate.invoicesPerSecond,
processed,
dropped,
actualRate: (processed / (testDuration / 1000)).toFixed(2),
avgLatency: avgLatency.toFixed(2),
p50Latency: p50,
p95Latency: p95,
p99Latency: p99,
avgJitter: avgJitter.toFixed(2),
dropRate: ((dropped / (processed + dropped)) * 100).toFixed(2)
});
}
}
// Analyze jitter and stability
if (results.latencyTests.length > 0) {
const avgJitters = results.latencyTests.map(t => parseFloat(t.avgJitter));
const avgDropRates = results.latencyTests.map(t => parseFloat(t.dropRate));
results.jitterAnalysis = {
avgJitter: (avgJitters.reduce((a, b) => a + b, 0) / avgJitters.length).toFixed(2),
maxJitter: Math.max(...avgJitters).toFixed(2),
avgDropRate: (avgDropRates.reduce((a, b) => a + b, 0) / avgDropRates.length).toFixed(2),
stable: Math.max(...avgJitters) < 50 && Math.max(...avgDropRates) < 5,
recommendation: Math.max(...avgDropRates) > 10 ?
'System cannot handle high arrival rates - consider scaling or optimization' :
'System handles real-time streaming adequately'
};
}
return results;
}
);
// Summary
console.log('\n=== PERF-09: Streaming Performance Test Summary ===');
console.log('\nStreaming XML Parsing:');
console.log(' Stream Size | Items | Data | Duration | Memory | Peak | Throughput');
console.log(' ------------|-------|---------|----------|--------|--------|----------');
streamingXMLParsing.tests.forEach((test: any) => {
if (!test.error) {
console.log(` ${test.size.padEnd(11)} | ${String(test.items).padEnd(5)} | ${test.totalBytes.padEnd(7)}KB | ${String(test.duration + 'ms').padEnd(8)} | ${test.memoryUsed.padEnd(6)}MB | ${test.peakMemory.padEnd(6)}MB | ${test.throughput}KB/s`);
}
});
if (streamingXMLParsing.memoryEfficiency) {
console.log(` Memory efficiency: ${streamingXMLParsing.memoryEfficiency.efficient ? 'GOOD ✅' : 'POOR ⚠️'}`);
console.log(` Scaling: ${streamingXMLParsing.memoryEfficiency.memoryScaling}x memory for ${streamingXMLParsing.memoryEfficiency.itemScaling}x items`);
}
console.log('\nStream Transformation Pipeline:');
streamTransformation.pipelines.forEach((pipeline: any) => {
if (!pipeline.error) {
console.log(` ${pipeline.name}:`);
console.log(` - Stages: ${pipeline.stages}, Items: ${pipeline.itemsProcessed}`);
console.log(` - Duration: ${pipeline.duration}ms, Throughput: ${pipeline.throughput}/s`);
console.log(` - Valid: ${pipeline.validItems}, Errors: ${pipeline.errorItems}`);
}
});
if (streamTransformation.transformationStats) {
console.log(` Best pipeline: ${streamTransformation.transformationStats.bestPipeline} (${streamTransformation.transformationStats.bestThroughput}/s)`);
}
console.log('\nBackpressure Handling:');
console.log(' Scenario | Duration | Produced | Consumed | Max Buffer | BP Events | Efficiency');
console.log(' ----------------------------|----------|----------|----------|------------|-----------|----------');
backpressureHandling.scenarios.forEach((scenario: any) => {
if (!scenario.error) {
console.log(` ${scenario.name.padEnd(27)} | ${String(scenario.duration + 'ms').padEnd(8)} | ${String(scenario.produced).padEnd(8)} | ${String(scenario.consumed).padEnd(8)} | ${String(scenario.maxBuffered).padEnd(10)} | ${String(scenario.backpressureEvents).padEnd(9)} | ${scenario.efficiency}%`);
}
});
if (backpressureHandling.backpressureStats) {
console.log(` ${backpressureHandling.backpressureStats.recommendation}`);
}
console.log('\nCorpus Streaming Analysis:');
console.log(` Streamable files: ${corpusStreaming.streamableFiles}`);
console.log(` Non-streamable files: ${corpusStreaming.nonStreamableFiles}`);
if (corpusStreaming.comparison) {
console.log(` Traditional avg: ${corpusStreaming.comparison.avgTraditionalTime}ms`);
console.log(` Streamed avg: ${corpusStreaming.comparison.avgStreamedTime}ms`);
console.log(` Overhead: ${corpusStreaming.comparison.overheadPercent}%`);
console.log(` Large file improvement: ${corpusStreaming.comparison.largeFileImprovement}%`);
console.log(` ${corpusStreaming.comparison.recommendation}`);
}
console.log('\nReal-time Streaming:');
console.log(' Rate | Target | Actual | Processed | Dropped | Avg Latency | P95 | Jitter');
console.log(' ------------|--------|--------|-----------|---------|-------------|--------|-------');
realtimeStreaming.latencyTests.forEach((test: any) => {
console.log(` ${test.rate.padEnd(11)} | ${String(test.targetRate).padEnd(6)} | ${test.actualRate.padEnd(6)} | ${String(test.processed).padEnd(9)} | ${test.dropRate.padEnd(7)}% | ${test.avgLatency.padEnd(11)}ms | ${String(test.p95Latency).padEnd(6)}ms | ${test.avgJitter}ms`);
});
if (realtimeStreaming.jitterAnalysis) {
console.log(` System stability: ${realtimeStreaming.jitterAnalysis.stable ? 'STABLE ✅' : 'UNSTABLE ⚠️'}`);
console.log(` ${realtimeStreaming.jitterAnalysis.recommendation}`);
}
// Performance targets check
console.log('\n=== Performance Targets Check ===');
const streamingEfficient = streamingXMLParsing.memoryEfficiency?.efficient || false;
const realtimeStable = realtimeStreaming.jitterAnalysis?.stable || false;
console.log(`Streaming memory efficiency: ${streamingEfficient ? 'EFFICIENT ✅' : 'INEFFICIENT ⚠️'}`);
console.log(`Real-time stability: ${realtimeStable ? 'STABLE ✅' : 'UNSTABLE ⚠️'}`);
// Overall performance summary
console.log('\n=== Overall Performance Summary ===');
console.log(performanceTracker.getSummary());
});
tap.start();