807 lines
29 KiB
TypeScript
807 lines
29 KiB
TypeScript
/**
|
|
* @file test.perf-09.streaming.ts
|
|
* @description Performance tests for streaming operations
|
|
*/
|
|
|
|
import { tap } from '@git.zone/tstest/tapbundle';
|
|
import * as plugins from '../../plugins.js';
|
|
import { EInvoice } from '../../../ts/index.js';
|
|
import { CorpusLoader } from '../../suite/corpus.loader.js';
|
|
import { PerformanceTracker } from '../../suite/performance.tracker.js';
|
|
import { FormatDetector } from '../../../ts/formats/utils/format.detector.js';
|
|
import { Readable, Writable, Transform } from 'stream';
|
|
|
|
const performanceTracker = new PerformanceTracker('PERF-09: Streaming Performance');
|
|
|
|
tap.test('PERF-09: Streaming Performance - should handle streaming operations efficiently', async (t) => {
|
|
// Test 1: Streaming XML parsing
|
|
const streamingXMLParsing = await performanceTracker.measureAsync(
|
|
'streaming-xml-parsing',
|
|
async () => {
|
|
const results = {
|
|
tests: [],
|
|
memoryEfficiency: null
|
|
};
|
|
|
|
// Create test XML streams of different sizes
|
|
const createXMLStream = (itemCount: number): Readable => {
|
|
let currentItem = 0;
|
|
let headerSent = false;
|
|
let itemsSent = false;
|
|
|
|
return new Readable({
|
|
read() {
|
|
if (!headerSent) {
|
|
this.push(`<?xml version="1.0" encoding="UTF-8"?>
|
|
<Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2">
|
|
<ID>STREAM-${itemCount}</ID>
|
|
<IssueDate>2024-03-01</IssueDate>
|
|
<AccountingSupplierParty>
|
|
<Party>
|
|
<PartyName><Name>Streaming Supplier</Name></PartyName>
|
|
</Party>
|
|
</AccountingSupplierParty>
|
|
<AccountingCustomerParty>
|
|
<Party>
|
|
<PartyName><Name>Streaming Customer</Name></PartyName>
|
|
</Party>
|
|
</AccountingCustomerParty>
|
|
<InvoiceLine>`);
|
|
headerSent = true;
|
|
} else if (currentItem < itemCount) {
|
|
// Send items in chunks
|
|
const chunkSize = Math.min(10, itemCount - currentItem);
|
|
let chunk = '';
|
|
|
|
for (let i = 0; i < chunkSize; i++) {
|
|
chunk += `
|
|
<InvoiceLine>
|
|
<ID>${currentItem + i + 1}</ID>
|
|
<InvoicedQuantity>1</InvoicedQuantity>
|
|
<LineExtensionAmount>100.00</LineExtensionAmount>
|
|
<Item>
|
|
<Description>Streaming Item ${currentItem + i + 1}</Description>
|
|
</Item>
|
|
</InvoiceLine>`;
|
|
}
|
|
|
|
this.push(chunk);
|
|
currentItem += chunkSize;
|
|
|
|
// Simulate streaming delay
|
|
setTimeout(() => this.read(), 1);
|
|
} else if (!itemsSent) {
|
|
this.push(`
|
|
</InvoiceLine>
|
|
</Invoice>`);
|
|
itemsSent = true;
|
|
} else {
|
|
this.push(null); // End stream
|
|
}
|
|
}
|
|
});
|
|
};
|
|
|
|
// Test different stream sizes
|
|
const streamSizes = [
|
|
{ items: 10, name: 'Small stream' },
|
|
{ items: 100, name: 'Medium stream' },
|
|
{ items: 1000, name: 'Large stream' },
|
|
{ items: 5000, name: 'Very large stream' }
|
|
];
|
|
|
|
for (const size of streamSizes) {
|
|
const startTime = Date.now();
|
|
const startMemory = process.memoryUsage();
|
|
const memorySnapshots = [];
|
|
|
|
// Create monitoring interval
|
|
const monitorInterval = setInterval(() => {
|
|
memorySnapshots.push(process.memoryUsage().heapUsed / 1024 / 1024);
|
|
}, 100);
|
|
|
|
try {
|
|
// Simulate streaming parsing
|
|
const stream = createXMLStream(size.items);
|
|
const chunks = [];
|
|
let totalBytes = 0;
|
|
|
|
await new Promise((resolve, reject) => {
|
|
stream.on('data', (chunk) => {
|
|
chunks.push(chunk);
|
|
totalBytes += chunk.length;
|
|
});
|
|
|
|
stream.on('end', async () => {
|
|
clearInterval(monitorInterval);
|
|
|
|
// Parse accumulated XML
|
|
const xml = chunks.join('');
|
|
const format = FormatDetector.detectFormat(xml);
|
|
const invoice = await EInvoice.fromXml(xml);
|
|
|
|
const endTime = Date.now();
|
|
const endMemory = process.memoryUsage();
|
|
|
|
results.tests.push({
|
|
size: size.name,
|
|
items: size.items,
|
|
totalBytes: (totalBytes / 1024).toFixed(2),
|
|
duration: endTime - startTime,
|
|
memoryUsed: ((endMemory.heapUsed - startMemory.heapUsed) / 1024 / 1024).toFixed(2),
|
|
peakMemory: Math.max(...memorySnapshots).toFixed(2),
|
|
avgMemory: (memorySnapshots.reduce((a, b) => a + b, 0) / memorySnapshots.length).toFixed(2),
|
|
throughput: ((totalBytes / 1024) / ((endTime - startTime) / 1000)).toFixed(2),
|
|
itemsProcessed: size.items
|
|
});
|
|
|
|
resolve(null);
|
|
});
|
|
|
|
stream.on('error', reject);
|
|
});
|
|
|
|
} catch (error) {
|
|
clearInterval(monitorInterval);
|
|
results.tests.push({
|
|
size: size.name,
|
|
error: error.message
|
|
});
|
|
}
|
|
}
|
|
|
|
// Analyze memory efficiency
|
|
if (results.tests.length >= 2) {
|
|
const small = results.tests[0];
|
|
const large = results.tests[results.tests.length - 1];
|
|
|
|
if (!small.error && !large.error) {
|
|
results.memoryEfficiency = {
|
|
smallStreamMemory: small.memoryUsed,
|
|
largeStreamMemory: large.memoryUsed,
|
|
memoryScaling: (parseFloat(large.memoryUsed) / parseFloat(small.memoryUsed)).toFixed(2),
|
|
itemScaling: large.items / small.items,
|
|
efficient: parseFloat(large.memoryUsed) < parseFloat(small.memoryUsed) * (large.items / small.items)
|
|
};
|
|
}
|
|
}
|
|
|
|
return results;
|
|
}
|
|
);
|
|
|
|
// Test 2: Stream transformation pipeline
|
|
const streamTransformation = await performanceTracker.measureAsync(
|
|
'stream-transformation-pipeline',
|
|
async () => {
|
|
const results = {
|
|
pipelines: [],
|
|
transformationStats: null
|
|
};
|
|
|
|
// Create transformation streams
|
|
class FormatDetectionStream extends Transform {
|
|
constructor() {
|
|
super({ objectMode: true });
|
|
}
|
|
|
|
async _transform(chunk: any, encoding: string, callback: Function) {
|
|
try {
|
|
const format = FormatDetector.detectFormat(chunk.content);
|
|
this.push({ ...chunk, format });
|
|
callback();
|
|
} catch (error) {
|
|
callback(error);
|
|
}
|
|
}
|
|
}
|
|
|
|
class ValidationStream extends Transform {
|
|
constructor() {
|
|
super({ objectMode: true });
|
|
}
|
|
|
|
async _transform(chunk: any, encoding: string, callback: Function) {
|
|
try {
|
|
if (chunk.format && chunk.format !== 'unknown') {
|
|
const invoice = await EInvoice.fromXml(chunk.content);
|
|
const validation = await invoice.validate();
|
|
this.push({ ...chunk, valid: validation.valid, errors: validation.errors?.length || 0 });
|
|
} else {
|
|
this.push({ ...chunk, valid: false, errors: -1 });
|
|
}
|
|
callback();
|
|
} catch (error) {
|
|
callback(error);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test different pipeline configurations
|
|
const pipelineConfigs = [
|
|
{
|
|
name: 'Simple pipeline',
|
|
batchSize: 10,
|
|
stages: ['detect', 'validate']
|
|
},
|
|
{
|
|
name: 'Parallel pipeline',
|
|
batchSize: 50,
|
|
stages: ['detect', 'validate'],
|
|
parallel: true
|
|
},
|
|
{
|
|
name: 'Complex pipeline',
|
|
batchSize: 100,
|
|
stages: ['detect', 'parse', 'validate', 'convert']
|
|
}
|
|
];
|
|
|
|
// Create test data
|
|
const testInvoices = Array.from({ length: 100 }, (_, i) => ({
|
|
id: i,
|
|
content: `<?xml version="1.0"?>
|
|
<Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2">
|
|
<ID>PIPELINE-${i}</ID>
|
|
<IssueDate>2024-03-01</IssueDate>
|
|
<AccountingSupplierParty><Party><PartyName><Name>Supplier ${i}</Name></PartyName></Party></AccountingSupplierParty>
|
|
<AccountingCustomerParty><Party><PartyName><Name>Customer ${i}</Name></PartyName></Party></AccountingCustomerParty>
|
|
<InvoiceLine>
|
|
<ID>1</ID>
|
|
<InvoicedQuantity>1</InvoicedQuantity>
|
|
<LineExtensionAmount>${100 + i}</LineExtensionAmount>
|
|
</InvoiceLine>
|
|
</Invoice>`
|
|
}));
|
|
|
|
for (const config of pipelineConfigs) {
|
|
const startTime = Date.now();
|
|
const processedItems = [];
|
|
|
|
try {
|
|
// Create pipeline
|
|
const inputStream = new Readable({
|
|
objectMode: true,
|
|
read() {
|
|
const item = testInvoices.shift();
|
|
if (item) {
|
|
this.push(item);
|
|
} else {
|
|
this.push(null);
|
|
}
|
|
}
|
|
});
|
|
|
|
const outputStream = new Writable({
|
|
objectMode: true,
|
|
write(chunk, encoding, callback) {
|
|
processedItems.push(chunk);
|
|
callback();
|
|
}
|
|
});
|
|
|
|
// Build pipeline
|
|
let pipeline = inputStream;
|
|
|
|
if (config.stages.includes('detect')) {
|
|
pipeline = pipeline.pipe(new FormatDetectionStream());
|
|
}
|
|
|
|
if (config.stages.includes('validate')) {
|
|
pipeline = pipeline.pipe(new ValidationStream());
|
|
}
|
|
|
|
// Process
|
|
await new Promise((resolve, reject) => {
|
|
pipeline.pipe(outputStream)
|
|
.on('finish', resolve)
|
|
.on('error', reject);
|
|
});
|
|
|
|
const endTime = Date.now();
|
|
const duration = endTime - startTime;
|
|
|
|
results.pipelines.push({
|
|
name: config.name,
|
|
batchSize: config.batchSize,
|
|
stages: config.stages.length,
|
|
itemsProcessed: processedItems.length,
|
|
duration,
|
|
throughput: (processedItems.length / (duration / 1000)).toFixed(2),
|
|
avgLatency: (duration / processedItems.length).toFixed(2),
|
|
validItems: processedItems.filter(i => i.valid).length,
|
|
errorItems: processedItems.filter(i => !i.valid).length
|
|
});
|
|
|
|
} catch (error) {
|
|
results.pipelines.push({
|
|
name: config.name,
|
|
error: error.message
|
|
});
|
|
}
|
|
}
|
|
|
|
// Analyze transformation efficiency
|
|
if (results.pipelines.length > 0) {
|
|
const validPipelines = results.pipelines.filter(p => !p.error);
|
|
if (validPipelines.length > 0) {
|
|
const avgThroughput = validPipelines.reduce((sum, p) => sum + parseFloat(p.throughput), 0) / validPipelines.length;
|
|
const bestPipeline = validPipelines.reduce((best, p) =>
|
|
parseFloat(p.throughput) > parseFloat(best.throughput) ? p : best
|
|
);
|
|
|
|
results.transformationStats = {
|
|
avgThroughput: avgThroughput.toFixed(2),
|
|
bestPipeline: bestPipeline.name,
|
|
bestThroughput: bestPipeline.throughput
|
|
};
|
|
}
|
|
}
|
|
|
|
return results;
|
|
}
|
|
);
|
|
|
|
// Test 3: Backpressure handling
|
|
const backpressureHandling = await performanceTracker.measureAsync(
|
|
'backpressure-handling',
|
|
async () => {
|
|
const results = {
|
|
scenarios: [],
|
|
backpressureStats: null
|
|
};
|
|
|
|
// Test scenarios with different processing speeds
|
|
const scenarios = [
|
|
{
|
|
name: 'Fast producer, slow consumer',
|
|
producerDelay: 1,
|
|
consumerDelay: 10,
|
|
bufferSize: 100
|
|
},
|
|
{
|
|
name: 'Slow producer, fast consumer',
|
|
producerDelay: 10,
|
|
consumerDelay: 1,
|
|
bufferSize: 100
|
|
},
|
|
{
|
|
name: 'Balanced pipeline',
|
|
producerDelay: 5,
|
|
consumerDelay: 5,
|
|
bufferSize: 100
|
|
},
|
|
{
|
|
name: 'High volume burst',
|
|
producerDelay: 0,
|
|
consumerDelay: 5,
|
|
bufferSize: 1000
|
|
}
|
|
];
|
|
|
|
for (const scenario of scenarios) {
|
|
const startTime = Date.now();
|
|
const metrics = {
|
|
produced: 0,
|
|
consumed: 0,
|
|
buffered: 0,
|
|
maxBuffered: 0,
|
|
backpressureEvents: 0
|
|
};
|
|
|
|
try {
|
|
// Create producer stream
|
|
const producer = new Readable({
|
|
objectMode: true,
|
|
highWaterMark: scenario.bufferSize,
|
|
read() {
|
|
if (metrics.produced < 100) {
|
|
setTimeout(() => {
|
|
this.push({
|
|
id: metrics.produced++,
|
|
content: `<?xml version="1.0"?><Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"><ID>BP-${metrics.produced}</ID></Invoice>`
|
|
});
|
|
|
|
metrics.buffered = metrics.produced - metrics.consumed;
|
|
if (metrics.buffered > metrics.maxBuffered) {
|
|
metrics.maxBuffered = metrics.buffered;
|
|
}
|
|
}, scenario.producerDelay);
|
|
} else {
|
|
this.push(null);
|
|
}
|
|
}
|
|
});
|
|
|
|
// Create consumer stream with processing
|
|
const consumer = new Writable({
|
|
objectMode: true,
|
|
highWaterMark: scenario.bufferSize,
|
|
async write(chunk, encoding, callback) {
|
|
// Simulate processing
|
|
await new Promise(resolve => setTimeout(resolve, scenario.consumerDelay));
|
|
|
|
// Process invoice
|
|
const format = FormatDetector.detectFormat(chunk.content);
|
|
|
|
metrics.consumed++;
|
|
metrics.buffered = metrics.produced - metrics.consumed;
|
|
callback();
|
|
}
|
|
});
|
|
|
|
// Monitor backpressure
|
|
producer.on('pause', () => metrics.backpressureEvents++);
|
|
|
|
// Process
|
|
await new Promise((resolve, reject) => {
|
|
producer.pipe(consumer)
|
|
.on('finish', resolve)
|
|
.on('error', reject);
|
|
});
|
|
|
|
const endTime = Date.now();
|
|
const duration = endTime - startTime;
|
|
|
|
results.scenarios.push({
|
|
name: scenario.name,
|
|
duration,
|
|
produced: metrics.produced,
|
|
consumed: metrics.consumed,
|
|
maxBuffered: metrics.maxBuffered,
|
|
backpressureEvents: metrics.backpressureEvents,
|
|
throughput: (metrics.consumed / (duration / 1000)).toFixed(2),
|
|
efficiency: ((metrics.consumed / metrics.produced) * 100).toFixed(2),
|
|
avgBufferUtilization: ((metrics.maxBuffered / scenario.bufferSize) * 100).toFixed(2)
|
|
});
|
|
|
|
} catch (error) {
|
|
results.scenarios.push({
|
|
name: scenario.name,
|
|
error: error.message
|
|
});
|
|
}
|
|
}
|
|
|
|
// Analyze backpressure handling
|
|
const validScenarios = results.scenarios.filter(s => !s.error);
|
|
if (validScenarios.length > 0) {
|
|
results.backpressureStats = {
|
|
avgBackpressureEvents: (validScenarios.reduce((sum, s) => sum + s.backpressureEvents, 0) / validScenarios.length).toFixed(2),
|
|
maxBufferUtilization: Math.max(...validScenarios.map(s => parseFloat(s.avgBufferUtilization))).toFixed(2),
|
|
recommendation: validScenarios.some(s => s.backpressureEvents > 10) ?
|
|
'Consider increasing buffer sizes or optimizing processing speed' :
|
|
'Backpressure handling is adequate'
|
|
};
|
|
}
|
|
|
|
return results;
|
|
}
|
|
);
|
|
|
|
// Test 4: Corpus streaming analysis
|
|
const corpusStreaming = await performanceTracker.measureAsync(
|
|
'corpus-streaming-analysis',
|
|
async () => {
|
|
const files = await CorpusLoader.loadPattern('**/*.xml');
|
|
const results = {
|
|
streamableFiles: 0,
|
|
nonStreamableFiles: 0,
|
|
processingStats: {
|
|
streamed: [],
|
|
traditional: []
|
|
},
|
|
comparison: null
|
|
};
|
|
|
|
// Process sample files both ways
|
|
const sampleFiles = files.slice(0, 20);
|
|
|
|
for (const file of sampleFiles) {
|
|
try {
|
|
const stats = await plugins.fs.stat(file.path);
|
|
const fileSize = stats.size;
|
|
|
|
// Traditional processing
|
|
const traditionalStart = Date.now();
|
|
const content = await plugins.fs.readFile(file.path, 'utf-8');
|
|
const format = FormatDetector.detectFormat(content);
|
|
if (format && format !== 'unknown') {
|
|
const invoice = await EInvoice.fromXml(content);
|
|
await invoice.validate();
|
|
}
|
|
const traditionalEnd = Date.now();
|
|
|
|
results.processingStats.traditional.push({
|
|
size: fileSize,
|
|
time: traditionalEnd - traditionalStart
|
|
});
|
|
|
|
// Simulated streaming (chunked reading)
|
|
const streamingStart = Date.now();
|
|
const chunkSize = 64 * 1024; // 64KB chunks
|
|
const chunks = [];
|
|
|
|
// Read in chunks
|
|
const fd = await plugins.fs.open(file.path, 'r');
|
|
const buffer = Buffer.alloc(chunkSize);
|
|
let position = 0;
|
|
|
|
while (true) {
|
|
const result = await fd.read(buffer, 0, chunkSize, position);
|
|
const bytesRead = result.bytesRead;
|
|
if (bytesRead === 0) break;
|
|
|
|
chunks.push(Buffer.from(buffer.slice(0, bytesRead)).toString('utf-8'));
|
|
position += bytesRead;
|
|
}
|
|
|
|
await fd.close();
|
|
|
|
// Process accumulated content
|
|
const streamedContent = chunks.join('');
|
|
const streamedFormat = FormatDetector.detectFormat(streamedContent);
|
|
if (streamedFormat && streamedFormat !== 'unknown') {
|
|
const invoice = await EInvoice.fromXml(streamedContent);
|
|
await invoice.validate();
|
|
}
|
|
const streamingEnd = Date.now();
|
|
|
|
results.processingStats.streamed.push({
|
|
size: fileSize,
|
|
time: streamingEnd - streamingStart,
|
|
chunks: chunks.length
|
|
});
|
|
|
|
// Determine if file benefits from streaming
|
|
if (fileSize > 100 * 1024) { // Files > 100KB
|
|
results.streamableFiles++;
|
|
} else {
|
|
results.nonStreamableFiles++;
|
|
}
|
|
|
|
} catch (error) {
|
|
// Skip files that can't be processed
|
|
}
|
|
}
|
|
|
|
// Compare approaches
|
|
if (results.processingStats.traditional.length > 0 && results.processingStats.streamed.length > 0) {
|
|
const avgTraditional = results.processingStats.traditional.reduce((sum, s) => sum + s.time, 0) /
|
|
results.processingStats.traditional.length;
|
|
const avgStreamed = results.processingStats.streamed.reduce((sum, s) => sum + s.time, 0) /
|
|
results.processingStats.streamed.length;
|
|
|
|
const largeFiles = results.processingStats.traditional.filter(s => s.size > 100 * 1024);
|
|
const avgTraditionalLarge = largeFiles.length > 0 ?
|
|
largeFiles.reduce((sum, s) => sum + s.time, 0) / largeFiles.length : 0;
|
|
|
|
const largeStreamed = results.processingStats.streamed.filter(s => s.size > 100 * 1024);
|
|
const avgStreamedLarge = largeStreamed.length > 0 ?
|
|
largeStreamed.reduce((sum, s) => sum + s.time, 0) / largeStreamed.length : 0;
|
|
|
|
results.comparison = {
|
|
avgTraditionalTime: avgTraditional.toFixed(2),
|
|
avgStreamedTime: avgStreamed.toFixed(2),
|
|
overheadPercent: ((avgStreamed - avgTraditional) / avgTraditional * 100).toFixed(2),
|
|
largeFileImprovement: avgTraditionalLarge > 0 && avgStreamedLarge > 0 ?
|
|
((avgTraditionalLarge - avgStreamedLarge) / avgTraditionalLarge * 100).toFixed(2) : 'N/A',
|
|
recommendation: avgStreamed < avgTraditional * 1.1 ?
|
|
'Streaming provides benefits for this workload' :
|
|
'Traditional processing is more efficient for this workload'
|
|
};
|
|
}
|
|
|
|
return results;
|
|
}
|
|
);
|
|
|
|
// Test 5: Real-time streaming performance
|
|
const realtimeStreaming = await performanceTracker.measureAsync(
|
|
'realtime-streaming',
|
|
async () => {
|
|
const results = {
|
|
latencyTests: [],
|
|
jitterAnalysis: null
|
|
};
|
|
|
|
// Test real-time processing with different arrival rates
|
|
const arrivalRates = [
|
|
{ name: 'Low rate', invoicesPerSecond: 10 },
|
|
{ name: 'Medium rate', invoicesPerSecond: 50 },
|
|
{ name: 'High rate', invoicesPerSecond: 100 },
|
|
{ name: 'Burst rate', invoicesPerSecond: 200 }
|
|
];
|
|
|
|
for (const rate of arrivalRates) {
|
|
const testDuration = 5000; // 5 seconds
|
|
const interval = 1000 / rate.invoicesPerSecond;
|
|
const latencies = [];
|
|
let processed = 0;
|
|
let dropped = 0;
|
|
|
|
const startTime = Date.now();
|
|
|
|
// Create processing queue
|
|
const queue = [];
|
|
let processing = false;
|
|
|
|
const processNext = async () => {
|
|
if (processing || queue.length === 0) return;
|
|
|
|
processing = true;
|
|
const item = queue.shift();
|
|
|
|
try {
|
|
const processStart = Date.now();
|
|
const format = FormatDetector.detectFormat(item.content);
|
|
const invoice = await EInvoice.fromXml(item.content);
|
|
await invoice.validate();
|
|
|
|
const latency = Date.now() - item.arrivalTime;
|
|
latencies.push(latency);
|
|
processed++;
|
|
} catch (error) {
|
|
dropped++;
|
|
}
|
|
|
|
processing = false;
|
|
if (queue.length > 0) {
|
|
setImmediate(processNext);
|
|
}
|
|
};
|
|
|
|
// Generate invoices at specified rate
|
|
const generator = setInterval(() => {
|
|
const invoice = {
|
|
arrivalTime: Date.now(),
|
|
content: `<?xml version="1.0"?><Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"><ID>RT-${Date.now()}</ID><IssueDate>2024-03-01</IssueDate></Invoice>`
|
|
};
|
|
|
|
// Apply backpressure - drop if queue is too large
|
|
if (queue.length < 100) {
|
|
queue.push(invoice);
|
|
processNext();
|
|
} else {
|
|
dropped++;
|
|
}
|
|
}, interval);
|
|
|
|
// Run test
|
|
await new Promise(resolve => setTimeout(resolve, testDuration));
|
|
clearInterval(generator);
|
|
|
|
// Process remaining items
|
|
while (queue.length > 0) {
|
|
await new Promise(resolve => setTimeout(resolve, 10));
|
|
}
|
|
|
|
// Calculate statistics
|
|
if (latencies.length > 0) {
|
|
latencies.sort((a, b) => a - b);
|
|
const avgLatency = latencies.reduce((a, b) => a + b, 0) / latencies.length;
|
|
const p50 = latencies[Math.floor(latencies.length * 0.5)];
|
|
const p95 = latencies[Math.floor(latencies.length * 0.95)];
|
|
const p99 = latencies[Math.floor(latencies.length * 0.99)];
|
|
|
|
// Calculate jitter
|
|
const jitters = [];
|
|
for (let i = 1; i < latencies.length; i++) {
|
|
jitters.push(Math.abs(latencies[i] - latencies[i - 1]));
|
|
}
|
|
const avgJitter = jitters.length > 0 ?
|
|
jitters.reduce((a, b) => a + b, 0) / jitters.length : 0;
|
|
|
|
results.latencyTests.push({
|
|
rate: rate.name,
|
|
targetRate: rate.invoicesPerSecond,
|
|
processed,
|
|
dropped,
|
|
actualRate: (processed / (testDuration / 1000)).toFixed(2),
|
|
avgLatency: avgLatency.toFixed(2),
|
|
p50Latency: p50,
|
|
p95Latency: p95,
|
|
p99Latency: p99,
|
|
avgJitter: avgJitter.toFixed(2),
|
|
dropRate: ((dropped / (processed + dropped)) * 100).toFixed(2)
|
|
});
|
|
}
|
|
}
|
|
|
|
// Analyze jitter and stability
|
|
if (results.latencyTests.length > 0) {
|
|
const avgJitters = results.latencyTests.map(t => parseFloat(t.avgJitter));
|
|
const avgDropRates = results.latencyTests.map(t => parseFloat(t.dropRate));
|
|
|
|
results.jitterAnalysis = {
|
|
avgJitter: (avgJitters.reduce((a, b) => a + b, 0) / avgJitters.length).toFixed(2),
|
|
maxJitter: Math.max(...avgJitters).toFixed(2),
|
|
avgDropRate: (avgDropRates.reduce((a, b) => a + b, 0) / avgDropRates.length).toFixed(2),
|
|
stable: Math.max(...avgJitters) < 50 && Math.max(...avgDropRates) < 5,
|
|
recommendation: Math.max(...avgDropRates) > 10 ?
|
|
'System cannot handle high arrival rates - consider scaling or optimization' :
|
|
'System handles real-time streaming adequately'
|
|
};
|
|
}
|
|
|
|
return results;
|
|
}
|
|
);
|
|
|
|
// Summary
|
|
console.log('\n=== PERF-09: Streaming Performance Test Summary ===');
|
|
|
|
console.log('\nStreaming XML Parsing:');
|
|
console.log(' Stream Size | Items | Data | Duration | Memory | Peak | Throughput');
|
|
console.log(' ------------|-------|---------|----------|--------|--------|----------');
|
|
streamingXMLParsing.tests.forEach((test: any) => {
|
|
if (!test.error) {
|
|
console.log(` ${test.size.padEnd(11)} | ${String(test.items).padEnd(5)} | ${test.totalBytes.padEnd(7)}KB | ${String(test.duration + 'ms').padEnd(8)} | ${test.memoryUsed.padEnd(6)}MB | ${test.peakMemory.padEnd(6)}MB | ${test.throughput}KB/s`);
|
|
}
|
|
});
|
|
if (streamingXMLParsing.memoryEfficiency) {
|
|
console.log(` Memory efficiency: ${streamingXMLParsing.memoryEfficiency.efficient ? 'GOOD ✅' : 'POOR ⚠️'}`);
|
|
console.log(` Scaling: ${streamingXMLParsing.memoryEfficiency.memoryScaling}x memory for ${streamingXMLParsing.memoryEfficiency.itemScaling}x items`);
|
|
}
|
|
|
|
console.log('\nStream Transformation Pipeline:');
|
|
streamTransformation.pipelines.forEach((pipeline: any) => {
|
|
if (!pipeline.error) {
|
|
console.log(` ${pipeline.name}:`);
|
|
console.log(` - Stages: ${pipeline.stages}, Items: ${pipeline.itemsProcessed}`);
|
|
console.log(` - Duration: ${pipeline.duration}ms, Throughput: ${pipeline.throughput}/s`);
|
|
console.log(` - Valid: ${pipeline.validItems}, Errors: ${pipeline.errorItems}`);
|
|
}
|
|
});
|
|
if (streamTransformation.transformationStats) {
|
|
console.log(` Best pipeline: ${streamTransformation.transformationStats.bestPipeline} (${streamTransformation.transformationStats.bestThroughput}/s)`);
|
|
}
|
|
|
|
console.log('\nBackpressure Handling:');
|
|
console.log(' Scenario | Duration | Produced | Consumed | Max Buffer | BP Events | Efficiency');
|
|
console.log(' ----------------------------|----------|----------|----------|------------|-----------|----------');
|
|
backpressureHandling.scenarios.forEach((scenario: any) => {
|
|
if (!scenario.error) {
|
|
console.log(` ${scenario.name.padEnd(27)} | ${String(scenario.duration + 'ms').padEnd(8)} | ${String(scenario.produced).padEnd(8)} | ${String(scenario.consumed).padEnd(8)} | ${String(scenario.maxBuffered).padEnd(10)} | ${String(scenario.backpressureEvents).padEnd(9)} | ${scenario.efficiency}%`);
|
|
}
|
|
});
|
|
if (backpressureHandling.backpressureStats) {
|
|
console.log(` ${backpressureHandling.backpressureStats.recommendation}`);
|
|
}
|
|
|
|
console.log('\nCorpus Streaming Analysis:');
|
|
console.log(` Streamable files: ${corpusStreaming.streamableFiles}`);
|
|
console.log(` Non-streamable files: ${corpusStreaming.nonStreamableFiles}`);
|
|
if (corpusStreaming.comparison) {
|
|
console.log(` Traditional avg: ${corpusStreaming.comparison.avgTraditionalTime}ms`);
|
|
console.log(` Streamed avg: ${corpusStreaming.comparison.avgStreamedTime}ms`);
|
|
console.log(` Overhead: ${corpusStreaming.comparison.overheadPercent}%`);
|
|
console.log(` Large file improvement: ${corpusStreaming.comparison.largeFileImprovement}%`);
|
|
console.log(` ${corpusStreaming.comparison.recommendation}`);
|
|
}
|
|
|
|
console.log('\nReal-time Streaming:');
|
|
console.log(' Rate | Target | Actual | Processed | Dropped | Avg Latency | P95 | Jitter');
|
|
console.log(' ------------|--------|--------|-----------|---------|-------------|--------|-------');
|
|
realtimeStreaming.latencyTests.forEach((test: any) => {
|
|
console.log(` ${test.rate.padEnd(11)} | ${String(test.targetRate).padEnd(6)} | ${test.actualRate.padEnd(6)} | ${String(test.processed).padEnd(9)} | ${test.dropRate.padEnd(7)}% | ${test.avgLatency.padEnd(11)}ms | ${String(test.p95Latency).padEnd(6)}ms | ${test.avgJitter}ms`);
|
|
});
|
|
if (realtimeStreaming.jitterAnalysis) {
|
|
console.log(` System stability: ${realtimeStreaming.jitterAnalysis.stable ? 'STABLE ✅' : 'UNSTABLE ⚠️'}`);
|
|
console.log(` ${realtimeStreaming.jitterAnalysis.recommendation}`);
|
|
}
|
|
|
|
// Performance targets check
|
|
console.log('\n=== Performance Targets Check ===');
|
|
const streamingEfficient = streamingXMLParsing.memoryEfficiency?.efficient || false;
|
|
const realtimeStable = realtimeStreaming.jitterAnalysis?.stable || false;
|
|
|
|
console.log(`Streaming memory efficiency: ${streamingEfficient ? 'EFFICIENT ✅' : 'INEFFICIENT ⚠️'}`);
|
|
console.log(`Real-time stability: ${realtimeStable ? 'STABLE ✅' : 'UNSTABLE ⚠️'}`);
|
|
|
|
// Overall performance summary
|
|
console.log('\n=== Overall Performance Summary ===');
|
|
console.log(performanceTracker.getSummary());
|
|
});
|
|
|
|
tap.start(); |