update
This commit is contained in:
813
test/suite/einvoice_performance/test.perf-09.streaming.ts
Normal file
813
test/suite/einvoice_performance/test.perf-09.streaming.ts
Normal file
@ -0,0 +1,813 @@
|
||||
/**
|
||||
* @file test.perf-09.streaming.ts
|
||||
* @description Performance tests for streaming operations
|
||||
*/
|
||||
|
||||
import { tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as plugins from '../../plugins.js';
|
||||
import { EInvoice } from '../../../ts/index.js';
|
||||
import { CorpusLoader } from '../../suite/corpus.loader.js';
|
||||
import { PerformanceTracker } from '../../suite/performance.tracker.js';
|
||||
import { Readable, Writable, Transform } from 'stream';
|
||||
|
||||
const corpusLoader = new CorpusLoader();
|
||||
const performanceTracker = new PerformanceTracker('PERF-09: Streaming Performance');
|
||||
|
||||
tap.test('PERF-09: Streaming Performance - should handle streaming operations efficiently', async (t) => {
|
||||
// Test 1: Streaming XML parsing
|
||||
const streamingXMLParsing = await performanceTracker.measureAsync(
|
||||
'streaming-xml-parsing',
|
||||
async () => {
|
||||
const einvoice = new EInvoice();
|
||||
const results = {
|
||||
tests: [],
|
||||
memoryEfficiency: null
|
||||
};
|
||||
|
||||
// Create test XML streams of different sizes
|
||||
const createXMLStream = (itemCount: number): Readable => {
|
||||
let currentItem = 0;
|
||||
let headerSent = false;
|
||||
let itemsSent = false;
|
||||
|
||||
return new Readable({
|
||||
read() {
|
||||
if (!headerSent) {
|
||||
this.push(`<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2">
|
||||
<ID>STREAM-${itemCount}</ID>
|
||||
<IssueDate>2024-03-01</IssueDate>
|
||||
<AccountingSupplierParty>
|
||||
<Party>
|
||||
<PartyName><Name>Streaming Supplier</Name></PartyName>
|
||||
</Party>
|
||||
</AccountingSupplierParty>
|
||||
<AccountingCustomerParty>
|
||||
<Party>
|
||||
<PartyName><Name>Streaming Customer</Name></PartyName>
|
||||
</Party>
|
||||
</AccountingCustomerParty>
|
||||
<InvoiceLine>`);
|
||||
headerSent = true;
|
||||
} else if (currentItem < itemCount) {
|
||||
// Send items in chunks
|
||||
const chunkSize = Math.min(10, itemCount - currentItem);
|
||||
let chunk = '';
|
||||
|
||||
for (let i = 0; i < chunkSize; i++) {
|
||||
chunk += `
|
||||
<InvoiceLine>
|
||||
<ID>${currentItem + i + 1}</ID>
|
||||
<InvoicedQuantity>1</InvoicedQuantity>
|
||||
<LineExtensionAmount>100.00</LineExtensionAmount>
|
||||
<Item>
|
||||
<Description>Streaming Item ${currentItem + i + 1}</Description>
|
||||
</Item>
|
||||
</InvoiceLine>`;
|
||||
}
|
||||
|
||||
this.push(chunk);
|
||||
currentItem += chunkSize;
|
||||
|
||||
// Simulate streaming delay
|
||||
setTimeout(() => this.read(), 1);
|
||||
} else if (!itemsSent) {
|
||||
this.push(`
|
||||
</InvoiceLine>
|
||||
</Invoice>`);
|
||||
itemsSent = true;
|
||||
} else {
|
||||
this.push(null); // End stream
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Test different stream sizes
|
||||
const streamSizes = [
|
||||
{ items: 10, name: 'Small stream' },
|
||||
{ items: 100, name: 'Medium stream' },
|
||||
{ items: 1000, name: 'Large stream' },
|
||||
{ items: 5000, name: 'Very large stream' }
|
||||
];
|
||||
|
||||
for (const size of streamSizes) {
|
||||
const startTime = Date.now();
|
||||
const startMemory = process.memoryUsage();
|
||||
const memorySnapshots = [];
|
||||
|
||||
// Create monitoring interval
|
||||
const monitorInterval = setInterval(() => {
|
||||
memorySnapshots.push(process.memoryUsage().heapUsed / 1024 / 1024);
|
||||
}, 100);
|
||||
|
||||
try {
|
||||
// Simulate streaming parsing
|
||||
const stream = createXMLStream(size.items);
|
||||
const chunks = [];
|
||||
let totalBytes = 0;
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk);
|
||||
totalBytes += chunk.length;
|
||||
});
|
||||
|
||||
stream.on('end', async () => {
|
||||
clearInterval(monitorInterval);
|
||||
|
||||
// Parse accumulated XML
|
||||
const xml = chunks.join('');
|
||||
const format = await einvoice.detectFormat(xml);
|
||||
const invoice = await einvoice.parseInvoice(xml, format || 'ubl');
|
||||
|
||||
const endTime = Date.now();
|
||||
const endMemory = process.memoryUsage();
|
||||
|
||||
results.tests.push({
|
||||
size: size.name,
|
||||
items: size.items,
|
||||
totalBytes: (totalBytes / 1024).toFixed(2),
|
||||
duration: endTime - startTime,
|
||||
memoryUsed: ((endMemory.heapUsed - startMemory.heapUsed) / 1024 / 1024).toFixed(2),
|
||||
peakMemory: Math.max(...memorySnapshots).toFixed(2),
|
||||
avgMemory: (memorySnapshots.reduce((a, b) => a + b, 0) / memorySnapshots.length).toFixed(2),
|
||||
throughput: ((totalBytes / 1024) / ((endTime - startTime) / 1000)).toFixed(2),
|
||||
itemsProcessed: invoice.data.items?.length || 0
|
||||
});
|
||||
|
||||
resolve(null);
|
||||
});
|
||||
|
||||
stream.on('error', reject);
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
clearInterval(monitorInterval);
|
||||
results.tests.push({
|
||||
size: size.name,
|
||||
error: error.message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Analyze memory efficiency
|
||||
if (results.tests.length >= 2) {
|
||||
const small = results.tests[0];
|
||||
const large = results.tests[results.tests.length - 1];
|
||||
|
||||
if (!small.error && !large.error) {
|
||||
results.memoryEfficiency = {
|
||||
smallStreamMemory: small.memoryUsed,
|
||||
largeStreamMemory: large.memoryUsed,
|
||||
memoryScaling: (parseFloat(large.memoryUsed) / parseFloat(small.memoryUsed)).toFixed(2),
|
||||
itemScaling: large.items / small.items,
|
||||
efficient: parseFloat(large.memoryUsed) < parseFloat(small.memoryUsed) * (large.items / small.items)
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
);
|
||||
|
||||
// Test 2: Stream transformation pipeline
|
||||
const streamTransformation = await performanceTracker.measureAsync(
|
||||
'stream-transformation-pipeline',
|
||||
async () => {
|
||||
const einvoice = new EInvoice();
|
||||
const results = {
|
||||
pipelines: [],
|
||||
transformationStats: null
|
||||
};
|
||||
|
||||
// Create transformation streams
|
||||
class FormatDetectionStream extends Transform {
|
||||
constructor(private einvoice: EInvoice) {
|
||||
super({ objectMode: true });
|
||||
}
|
||||
|
||||
async _transform(chunk: any, encoding: string, callback: Function) {
|
||||
try {
|
||||
const format = await this.einvoice.detectFormat(chunk.content);
|
||||
this.push({ ...chunk, format });
|
||||
callback();
|
||||
} catch (error) {
|
||||
callback(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ValidationStream extends Transform {
|
||||
constructor(private einvoice: EInvoice) {
|
||||
super({ objectMode: true });
|
||||
}
|
||||
|
||||
async _transform(chunk: any, encoding: string, callback: Function) {
|
||||
try {
|
||||
if (chunk.format && chunk.format !== 'unknown') {
|
||||
const invoice = await this.einvoice.parseInvoice(chunk.content, chunk.format);
|
||||
const validation = await this.einvoice.validateInvoice(invoice);
|
||||
this.push({ ...chunk, valid: validation.isValid, errors: validation.errors?.length || 0 });
|
||||
} else {
|
||||
this.push({ ...chunk, valid: false, errors: -1 });
|
||||
}
|
||||
callback();
|
||||
} catch (error) {
|
||||
callback(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test different pipeline configurations
|
||||
const pipelineConfigs = [
|
||||
{
|
||||
name: 'Simple pipeline',
|
||||
batchSize: 10,
|
||||
stages: ['detect', 'validate']
|
||||
},
|
||||
{
|
||||
name: 'Parallel pipeline',
|
||||
batchSize: 50,
|
||||
stages: ['detect', 'validate'],
|
||||
parallel: true
|
||||
},
|
||||
{
|
||||
name: 'Complex pipeline',
|
||||
batchSize: 100,
|
||||
stages: ['detect', 'parse', 'validate', 'convert']
|
||||
}
|
||||
];
|
||||
|
||||
// Create test data
|
||||
const testInvoices = Array.from({ length: 100 }, (_, i) => ({
|
||||
id: i,
|
||||
content: `<?xml version="1.0"?>
|
||||
<Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2">
|
||||
<ID>PIPELINE-${i}</ID>
|
||||
<IssueDate>2024-03-01</IssueDate>
|
||||
<AccountingSupplierParty><Party><PartyName><Name>Supplier ${i}</Name></PartyName></Party></AccountingSupplierParty>
|
||||
<AccountingCustomerParty><Party><PartyName><Name>Customer ${i}</Name></PartyName></Party></AccountingCustomerParty>
|
||||
<InvoiceLine>
|
||||
<ID>1</ID>
|
||||
<InvoicedQuantity>1</InvoicedQuantity>
|
||||
<LineExtensionAmount>${100 + i}</LineExtensionAmount>
|
||||
</InvoiceLine>
|
||||
</Invoice>`
|
||||
}));
|
||||
|
||||
for (const config of pipelineConfigs) {
|
||||
const startTime = Date.now();
|
||||
const processedItems = [];
|
||||
|
||||
try {
|
||||
// Create pipeline
|
||||
const inputStream = new Readable({
|
||||
objectMode: true,
|
||||
read() {
|
||||
const item = testInvoices.shift();
|
||||
if (item) {
|
||||
this.push(item);
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const outputStream = new Writable({
|
||||
objectMode: true,
|
||||
write(chunk, encoding, callback) {
|
||||
processedItems.push(chunk);
|
||||
callback();
|
||||
}
|
||||
});
|
||||
|
||||
// Build pipeline
|
||||
let pipeline = inputStream;
|
||||
|
||||
if (config.stages.includes('detect')) {
|
||||
pipeline = pipeline.pipe(new FormatDetectionStream(einvoice));
|
||||
}
|
||||
|
||||
if (config.stages.includes('validate')) {
|
||||
pipeline = pipeline.pipe(new ValidationStream(einvoice));
|
||||
}
|
||||
|
||||
// Process
|
||||
await new Promise((resolve, reject) => {
|
||||
pipeline.pipe(outputStream)
|
||||
.on('finish', resolve)
|
||||
.on('error', reject);
|
||||
});
|
||||
|
||||
const endTime = Date.now();
|
||||
const duration = endTime - startTime;
|
||||
|
||||
results.pipelines.push({
|
||||
name: config.name,
|
||||
batchSize: config.batchSize,
|
||||
stages: config.stages.length,
|
||||
itemsProcessed: processedItems.length,
|
||||
duration,
|
||||
throughput: (processedItems.length / (duration / 1000)).toFixed(2),
|
||||
avgLatency: (duration / processedItems.length).toFixed(2),
|
||||
validItems: processedItems.filter(i => i.valid).length,
|
||||
errorItems: processedItems.filter(i => !i.valid).length
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
results.pipelines.push({
|
||||
name: config.name,
|
||||
error: error.message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Analyze transformation efficiency
|
||||
if (results.pipelines.length > 0) {
|
||||
const validPipelines = results.pipelines.filter(p => !p.error);
|
||||
if (validPipelines.length > 0) {
|
||||
const avgThroughput = validPipelines.reduce((sum, p) => sum + parseFloat(p.throughput), 0) / validPipelines.length;
|
||||
const bestPipeline = validPipelines.reduce((best, p) =>
|
||||
parseFloat(p.throughput) > parseFloat(best.throughput) ? p : best
|
||||
);
|
||||
|
||||
results.transformationStats = {
|
||||
avgThroughput: avgThroughput.toFixed(2),
|
||||
bestPipeline: bestPipeline.name,
|
||||
bestThroughput: bestPipeline.throughput
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
);
|
||||
|
||||
// Test 3: Backpressure handling
|
||||
const backpressureHandling = await performanceTracker.measureAsync(
|
||||
'backpressure-handling',
|
||||
async () => {
|
||||
const einvoice = new EInvoice();
|
||||
const results = {
|
||||
scenarios: [],
|
||||
backpressureStats: null
|
||||
};
|
||||
|
||||
// Test scenarios with different processing speeds
|
||||
const scenarios = [
|
||||
{
|
||||
name: 'Fast producer, slow consumer',
|
||||
producerDelay: 1,
|
||||
consumerDelay: 10,
|
||||
bufferSize: 100
|
||||
},
|
||||
{
|
||||
name: 'Slow producer, fast consumer',
|
||||
producerDelay: 10,
|
||||
consumerDelay: 1,
|
||||
bufferSize: 100
|
||||
},
|
||||
{
|
||||
name: 'Balanced pipeline',
|
||||
producerDelay: 5,
|
||||
consumerDelay: 5,
|
||||
bufferSize: 100
|
||||
},
|
||||
{
|
||||
name: 'High volume burst',
|
||||
producerDelay: 0,
|
||||
consumerDelay: 5,
|
||||
bufferSize: 1000
|
||||
}
|
||||
];
|
||||
|
||||
for (const scenario of scenarios) {
|
||||
const startTime = Date.now();
|
||||
const metrics = {
|
||||
produced: 0,
|
||||
consumed: 0,
|
||||
buffered: 0,
|
||||
maxBuffered: 0,
|
||||
backpressureEvents: 0
|
||||
};
|
||||
|
||||
try {
|
||||
// Create producer stream
|
||||
const producer = new Readable({
|
||||
objectMode: true,
|
||||
highWaterMark: scenario.bufferSize,
|
||||
read() {
|
||||
if (metrics.produced < 100) {
|
||||
setTimeout(() => {
|
||||
this.push({
|
||||
id: metrics.produced++,
|
||||
content: `<?xml version="1.0"?><Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"><ID>BP-${metrics.produced}</ID></Invoice>`
|
||||
});
|
||||
|
||||
metrics.buffered = metrics.produced - metrics.consumed;
|
||||
if (metrics.buffered > metrics.maxBuffered) {
|
||||
metrics.maxBuffered = metrics.buffered;
|
||||
}
|
||||
}, scenario.producerDelay);
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Create consumer stream with processing
|
||||
const consumer = new Writable({
|
||||
objectMode: true,
|
||||
highWaterMark: scenario.bufferSize,
|
||||
async write(chunk, encoding, callback) {
|
||||
// Simulate processing
|
||||
await new Promise(resolve => setTimeout(resolve, scenario.consumerDelay));
|
||||
|
||||
// Process invoice
|
||||
const format = await einvoice.detectFormat(chunk.content);
|
||||
|
||||
metrics.consumed++;
|
||||
metrics.buffered = metrics.produced - metrics.consumed;
|
||||
callback();
|
||||
}
|
||||
});
|
||||
|
||||
// Monitor backpressure
|
||||
producer.on('pause', () => metrics.backpressureEvents++);
|
||||
|
||||
// Process
|
||||
await new Promise((resolve, reject) => {
|
||||
producer.pipe(consumer)
|
||||
.on('finish', resolve)
|
||||
.on('error', reject);
|
||||
});
|
||||
|
||||
const endTime = Date.now();
|
||||
const duration = endTime - startTime;
|
||||
|
||||
results.scenarios.push({
|
||||
name: scenario.name,
|
||||
duration,
|
||||
produced: metrics.produced,
|
||||
consumed: metrics.consumed,
|
||||
maxBuffered: metrics.maxBuffered,
|
||||
backpressureEvents: metrics.backpressureEvents,
|
||||
throughput: (metrics.consumed / (duration / 1000)).toFixed(2),
|
||||
efficiency: ((metrics.consumed / metrics.produced) * 100).toFixed(2),
|
||||
avgBufferUtilization: ((metrics.maxBuffered / scenario.bufferSize) * 100).toFixed(2)
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
results.scenarios.push({
|
||||
name: scenario.name,
|
||||
error: error.message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Analyze backpressure handling
|
||||
const validScenarios = results.scenarios.filter(s => !s.error);
|
||||
if (validScenarios.length > 0) {
|
||||
results.backpressureStats = {
|
||||
avgBackpressureEvents: (validScenarios.reduce((sum, s) => sum + s.backpressureEvents, 0) / validScenarios.length).toFixed(2),
|
||||
maxBufferUtilization: Math.max(...validScenarios.map(s => parseFloat(s.avgBufferUtilization))).toFixed(2),
|
||||
recommendation: validScenarios.some(s => s.backpressureEvents > 10) ?
|
||||
'Consider increasing buffer sizes or optimizing processing speed' :
|
||||
'Backpressure handling is adequate'
|
||||
};
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
);
|
||||
|
||||
// Test 4: Corpus streaming analysis
|
||||
const corpusStreaming = await performanceTracker.measureAsync(
|
||||
'corpus-streaming-analysis',
|
||||
async () => {
|
||||
const files = await corpusLoader.getFilesByPattern('**/*.xml');
|
||||
const einvoice = new EInvoice();
|
||||
const results = {
|
||||
streamableFiles: 0,
|
||||
nonStreamableFiles: 0,
|
||||
processingStats: {
|
||||
streamed: [],
|
||||
traditional: []
|
||||
},
|
||||
comparison: null
|
||||
};
|
||||
|
||||
// Process sample files both ways
|
||||
const sampleFiles = files.slice(0, 20);
|
||||
|
||||
for (const file of sampleFiles) {
|
||||
try {
|
||||
const stats = await plugins.fs.stat(file);
|
||||
const fileSize = stats.size;
|
||||
|
||||
// Traditional processing
|
||||
const traditionalStart = Date.now();
|
||||
const content = await plugins.fs.readFile(file, 'utf-8');
|
||||
const format = await einvoice.detectFormat(content);
|
||||
if (format && format !== 'unknown') {
|
||||
const invoice = await einvoice.parseInvoice(content, format);
|
||||
await einvoice.validateInvoice(invoice);
|
||||
}
|
||||
const traditionalEnd = Date.now();
|
||||
|
||||
results.processingStats.traditional.push({
|
||||
size: fileSize,
|
||||
time: traditionalEnd - traditionalStart
|
||||
});
|
||||
|
||||
// Simulated streaming (chunked reading)
|
||||
const streamingStart = Date.now();
|
||||
const chunkSize = 64 * 1024; // 64KB chunks
|
||||
const chunks = [];
|
||||
|
||||
// Read in chunks
|
||||
const fd = await plugins.fs.open(file, 'r');
|
||||
const buffer = Buffer.alloc(chunkSize);
|
||||
let position = 0;
|
||||
|
||||
while (true) {
|
||||
const { bytesRead } = await fd.read(buffer, 0, chunkSize, position);
|
||||
if (bytesRead === 0) break;
|
||||
|
||||
chunks.push(buffer.slice(0, bytesRead).toString('utf-8'));
|
||||
position += bytesRead;
|
||||
}
|
||||
|
||||
await fd.close();
|
||||
|
||||
// Process accumulated content
|
||||
const streamedContent = chunks.join('');
|
||||
const streamedFormat = await einvoice.detectFormat(streamedContent);
|
||||
if (streamedFormat && streamedFormat !== 'unknown') {
|
||||
const invoice = await einvoice.parseInvoice(streamedContent, streamedFormat);
|
||||
await einvoice.validateInvoice(invoice);
|
||||
}
|
||||
const streamingEnd = Date.now();
|
||||
|
||||
results.processingStats.streamed.push({
|
||||
size: fileSize,
|
||||
time: streamingEnd - streamingStart,
|
||||
chunks: chunks.length
|
||||
});
|
||||
|
||||
// Determine if file benefits from streaming
|
||||
if (fileSize > 100 * 1024) { // Files > 100KB
|
||||
results.streamableFiles++;
|
||||
} else {
|
||||
results.nonStreamableFiles++;
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
// Skip files that can't be processed
|
||||
}
|
||||
}
|
||||
|
||||
// Compare approaches
|
||||
if (results.processingStats.traditional.length > 0 && results.processingStats.streamed.length > 0) {
|
||||
const avgTraditional = results.processingStats.traditional.reduce((sum, s) => sum + s.time, 0) /
|
||||
results.processingStats.traditional.length;
|
||||
const avgStreamed = results.processingStats.streamed.reduce((sum, s) => sum + s.time, 0) /
|
||||
results.processingStats.streamed.length;
|
||||
|
||||
const largeFiles = results.processingStats.traditional.filter(s => s.size > 100 * 1024);
|
||||
const avgTraditionalLarge = largeFiles.length > 0 ?
|
||||
largeFiles.reduce((sum, s) => sum + s.time, 0) / largeFiles.length : 0;
|
||||
|
||||
const largeStreamed = results.processingStats.streamed.filter(s => s.size > 100 * 1024);
|
||||
const avgStreamedLarge = largeStreamed.length > 0 ?
|
||||
largeStreamed.reduce((sum, s) => sum + s.time, 0) / largeStreamed.length : 0;
|
||||
|
||||
results.comparison = {
|
||||
avgTraditionalTime: avgTraditional.toFixed(2),
|
||||
avgStreamedTime: avgStreamed.toFixed(2),
|
||||
overheadPercent: ((avgStreamed - avgTraditional) / avgTraditional * 100).toFixed(2),
|
||||
largeFileImprovement: avgTraditionalLarge > 0 && avgStreamedLarge > 0 ?
|
||||
((avgTraditionalLarge - avgStreamedLarge) / avgTraditionalLarge * 100).toFixed(2) : 'N/A',
|
||||
recommendation: avgStreamed < avgTraditional * 1.1 ?
|
||||
'Streaming provides benefits for this workload' :
|
||||
'Traditional processing is more efficient for this workload'
|
||||
};
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
);
|
||||
|
||||
// Test 5: Real-time streaming performance
|
||||
const realtimeStreaming = await performanceTracker.measureAsync(
|
||||
'realtime-streaming',
|
||||
async () => {
|
||||
const einvoice = new EInvoice();
|
||||
const results = {
|
||||
latencyTests: [],
|
||||
jitterAnalysis: null
|
||||
};
|
||||
|
||||
// Test real-time processing with different arrival rates
|
||||
const arrivalRates = [
|
||||
{ name: 'Low rate', invoicesPerSecond: 10 },
|
||||
{ name: 'Medium rate', invoicesPerSecond: 50 },
|
||||
{ name: 'High rate', invoicesPerSecond: 100 },
|
||||
{ name: 'Burst rate', invoicesPerSecond: 200 }
|
||||
];
|
||||
|
||||
for (const rate of arrivalRates) {
|
||||
const testDuration = 5000; // 5 seconds
|
||||
const interval = 1000 / rate.invoicesPerSecond;
|
||||
const latencies = [];
|
||||
let processed = 0;
|
||||
let dropped = 0;
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
// Create processing queue
|
||||
const queue = [];
|
||||
let processing = false;
|
||||
|
||||
const processNext = async () => {
|
||||
if (processing || queue.length === 0) return;
|
||||
|
||||
processing = true;
|
||||
const item = queue.shift();
|
||||
|
||||
try {
|
||||
const processStart = Date.now();
|
||||
const format = await einvoice.detectFormat(item.content);
|
||||
const invoice = await einvoice.parseInvoice(item.content, format || 'ubl');
|
||||
await einvoice.validateInvoice(invoice);
|
||||
|
||||
const latency = Date.now() - item.arrivalTime;
|
||||
latencies.push(latency);
|
||||
processed++;
|
||||
} catch (error) {
|
||||
dropped++;
|
||||
}
|
||||
|
||||
processing = false;
|
||||
if (queue.length > 0) {
|
||||
setImmediate(processNext);
|
||||
}
|
||||
};
|
||||
|
||||
// Generate invoices at specified rate
|
||||
const generator = setInterval(() => {
|
||||
const invoice = {
|
||||
arrivalTime: Date.now(),
|
||||
content: `<?xml version="1.0"?><Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"><ID>RT-${Date.now()}</ID><IssueDate>2024-03-01</IssueDate></Invoice>`
|
||||
};
|
||||
|
||||
// Apply backpressure - drop if queue is too large
|
||||
if (queue.length < 100) {
|
||||
queue.push(invoice);
|
||||
processNext();
|
||||
} else {
|
||||
dropped++;
|
||||
}
|
||||
}, interval);
|
||||
|
||||
// Run test
|
||||
await new Promise(resolve => setTimeout(resolve, testDuration));
|
||||
clearInterval(generator);
|
||||
|
||||
// Process remaining items
|
||||
while (queue.length > 0) {
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
}
|
||||
|
||||
// Calculate statistics
|
||||
if (latencies.length > 0) {
|
||||
latencies.sort((a, b) => a - b);
|
||||
const avgLatency = latencies.reduce((a, b) => a + b, 0) / latencies.length;
|
||||
const p50 = latencies[Math.floor(latencies.length * 0.5)];
|
||||
const p95 = latencies[Math.floor(latencies.length * 0.95)];
|
||||
const p99 = latencies[Math.floor(latencies.length * 0.99)];
|
||||
|
||||
// Calculate jitter
|
||||
const jitters = [];
|
||||
for (let i = 1; i < latencies.length; i++) {
|
||||
jitters.push(Math.abs(latencies[i] - latencies[i - 1]));
|
||||
}
|
||||
const avgJitter = jitters.length > 0 ?
|
||||
jitters.reduce((a, b) => a + b, 0) / jitters.length : 0;
|
||||
|
||||
results.latencyTests.push({
|
||||
rate: rate.name,
|
||||
targetRate: rate.invoicesPerSecond,
|
||||
processed,
|
||||
dropped,
|
||||
actualRate: (processed / (testDuration / 1000)).toFixed(2),
|
||||
avgLatency: avgLatency.toFixed(2),
|
||||
p50Latency: p50,
|
||||
p95Latency: p95,
|
||||
p99Latency: p99,
|
||||
avgJitter: avgJitter.toFixed(2),
|
||||
dropRate: ((dropped / (processed + dropped)) * 100).toFixed(2)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Analyze jitter and stability
|
||||
if (results.latencyTests.length > 0) {
|
||||
const avgJitters = results.latencyTests.map(t => parseFloat(t.avgJitter));
|
||||
const avgDropRates = results.latencyTests.map(t => parseFloat(t.dropRate));
|
||||
|
||||
results.jitterAnalysis = {
|
||||
avgJitter: (avgJitters.reduce((a, b) => a + b, 0) / avgJitters.length).toFixed(2),
|
||||
maxJitter: Math.max(...avgJitters).toFixed(2),
|
||||
avgDropRate: (avgDropRates.reduce((a, b) => a + b, 0) / avgDropRates.length).toFixed(2),
|
||||
stable: Math.max(...avgJitters) < 50 && Math.max(...avgDropRates) < 5,
|
||||
recommendation: Math.max(...avgDropRates) > 10 ?
|
||||
'System cannot handle high arrival rates - consider scaling or optimization' :
|
||||
'System handles real-time streaming adequately'
|
||||
};
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
);
|
||||
|
||||
// Summary
|
||||
t.comment('\n=== PERF-09: Streaming Performance Test Summary ===');
|
||||
|
||||
t.comment('\nStreaming XML Parsing:');
|
||||
t.comment(' Stream Size | Items | Data | Duration | Memory | Peak | Throughput');
|
||||
t.comment(' ------------|-------|---------|----------|--------|--------|----------');
|
||||
streamingXMLParsing.result.tests.forEach(test => {
|
||||
if (!test.error) {
|
||||
t.comment(` ${test.size.padEnd(11)} | ${String(test.items).padEnd(5)} | ${test.totalBytes.padEnd(7)}KB | ${String(test.duration + 'ms').padEnd(8)} | ${test.memoryUsed.padEnd(6)}MB | ${test.peakMemory.padEnd(6)}MB | ${test.throughput}KB/s`);
|
||||
}
|
||||
});
|
||||
if (streamingXMLParsing.result.memoryEfficiency) {
|
||||
t.comment(` Memory efficiency: ${streamingXMLParsing.result.memoryEfficiency.efficient ? 'GOOD ✅' : 'POOR ⚠️'}`);
|
||||
t.comment(` Scaling: ${streamingXMLParsing.result.memoryEfficiency.memoryScaling}x memory for ${streamingXMLParsing.result.memoryEfficiency.itemScaling}x items`);
|
||||
}
|
||||
|
||||
t.comment('\nStream Transformation Pipeline:');
|
||||
streamTransformation.result.pipelines.forEach(pipeline => {
|
||||
if (!pipeline.error) {
|
||||
t.comment(` ${pipeline.name}:`);
|
||||
t.comment(` - Stages: ${pipeline.stages}, Items: ${pipeline.itemsProcessed}`);
|
||||
t.comment(` - Duration: ${pipeline.duration}ms, Throughput: ${pipeline.throughput}/s`);
|
||||
t.comment(` - Valid: ${pipeline.validItems}, Errors: ${pipeline.errorItems}`);
|
||||
}
|
||||
});
|
||||
if (streamTransformation.result.transformationStats) {
|
||||
t.comment(` Best pipeline: ${streamTransformation.result.transformationStats.bestPipeline} (${streamTransformation.result.transformationStats.bestThroughput}/s)`);
|
||||
}
|
||||
|
||||
t.comment('\nBackpressure Handling:');
|
||||
t.comment(' Scenario | Duration | Produced | Consumed | Max Buffer | BP Events | Efficiency');
|
||||
t.comment(' ----------------------------|----------|----------|----------|------------|-----------|----------');
|
||||
backpressureHandling.result.scenarios.forEach(scenario => {
|
||||
if (!scenario.error) {
|
||||
t.comment(` ${scenario.name.padEnd(27)} | ${String(scenario.duration + 'ms').padEnd(8)} | ${String(scenario.produced).padEnd(8)} | ${String(scenario.consumed).padEnd(8)} | ${String(scenario.maxBuffered).padEnd(10)} | ${String(scenario.backpressureEvents).padEnd(9)} | ${scenario.efficiency}%`);
|
||||
}
|
||||
});
|
||||
if (backpressureHandling.result.backpressureStats) {
|
||||
t.comment(` ${backpressureHandling.result.backpressureStats.recommendation}`);
|
||||
}
|
||||
|
||||
t.comment('\nCorpus Streaming Analysis:');
|
||||
t.comment(` Streamable files: ${corpusStreaming.result.streamableFiles}`);
|
||||
t.comment(` Non-streamable files: ${corpusStreaming.result.nonStreamableFiles}`);
|
||||
if (corpusStreaming.result.comparison) {
|
||||
t.comment(` Traditional avg: ${corpusStreaming.result.comparison.avgTraditionalTime}ms`);
|
||||
t.comment(` Streamed avg: ${corpusStreaming.result.comparison.avgStreamedTime}ms`);
|
||||
t.comment(` Overhead: ${corpusStreaming.result.comparison.overheadPercent}%`);
|
||||
t.comment(` Large file improvement: ${corpusStreaming.result.comparison.largeFileImprovement}%`);
|
||||
t.comment(` ${corpusStreaming.result.comparison.recommendation}`);
|
||||
}
|
||||
|
||||
t.comment('\nReal-time Streaming:');
|
||||
t.comment(' Rate | Target | Actual | Processed | Dropped | Avg Latency | P95 | Jitter');
|
||||
t.comment(' ------------|--------|--------|-----------|---------|-------------|--------|-------');
|
||||
realtimeStreaming.result.latencyTests.forEach(test => {
|
||||
t.comment(` ${test.rate.padEnd(11)} | ${String(test.targetRate).padEnd(6)} | ${test.actualRate.padEnd(6)} | ${String(test.processed).padEnd(9)} | ${test.dropRate.padEnd(7)}% | ${test.avgLatency.padEnd(11)}ms | ${String(test.p95Latency).padEnd(6)}ms | ${test.avgJitter}ms`);
|
||||
});
|
||||
if (realtimeStreaming.result.jitterAnalysis) {
|
||||
t.comment(` System stability: ${realtimeStreaming.result.jitterAnalysis.stable ? 'STABLE ✅' : 'UNSTABLE ⚠️'}`);
|
||||
t.comment(` ${realtimeStreaming.result.jitterAnalysis.recommendation}`);
|
||||
}
|
||||
|
||||
// Performance targets check
|
||||
t.comment('\n=== Performance Targets Check ===');
|
||||
const streamingEfficient = streamingXMLParsing.result.memoryEfficiency?.efficient || false;
|
||||
const realtimeStable = realtimeStreaming.result.jitterAnalysis?.stable || false;
|
||||
|
||||
t.comment(`Streaming memory efficiency: ${streamingEfficient ? 'EFFICIENT ✅' : 'INEFFICIENT ⚠️'}`);
|
||||
t.comment(`Real-time stability: ${realtimeStable ? 'STABLE ✅' : 'UNSTABLE ⚠️'}`);
|
||||
|
||||
// Overall performance summary
|
||||
t.comment('\n=== Overall Performance Summary ===');
|
||||
performanceTracker.logSummary();
|
||||
|
||||
t.end();
|
||||
});
|
||||
|
||||
tap.start();
|
Reference in New Issue
Block a user