2025-05-25 19:45:37 +00:00
/ * *
* @file test . perf - 09 . streaming . ts
* @description Performance tests for streaming operations
* /
import { tap } from '@git.zone/tstest/tapbundle' ;
import * as plugins from '../../plugins.js' ;
import { EInvoice } from '../../../ts/index.js' ;
import { CorpusLoader } from '../../suite/corpus.loader.js' ;
import { PerformanceTracker } from '../../suite/performance.tracker.js' ;
2025-05-29 13:35:36 +00:00
import { FormatDetector } from '../../../ts/formats/utils/format.detector.js' ;
2025-05-25 19:45:37 +00:00
import { Readable , Writable , Transform } from 'stream' ;
const performanceTracker = new PerformanceTracker ( 'PERF-09: Streaming Performance' ) ;
tap . test ( 'PERF-09: Streaming Performance - should handle streaming operations efficiently' , async ( t ) = > {
// Test 1: Streaming XML parsing
const streamingXMLParsing = await performanceTracker . measureAsync (
'streaming-xml-parsing' ,
async ( ) = > {
const results = {
tests : [ ] ,
memoryEfficiency : null
} ;
// Create test XML streams of different sizes
const createXMLStream = ( itemCount : number ) : Readable = > {
let currentItem = 0 ;
let headerSent = false ;
let itemsSent = false ;
return new Readable ( {
read() {
if ( ! headerSent ) {
this . push ( ` <?xml version="1.0" encoding="UTF-8"?>
< Invoice xmlns = "urn:oasis:names:specification:ubl:schema:xsd:Invoice-2" >
< ID > STREAM - $ { itemCount } < / ID >
< IssueDate > 2024 - 03 - 01 < / IssueDate >
< AccountingSupplierParty >
< Party >
< PartyName > < Name > Streaming Supplier < / Name > < / PartyName >
< / Party >
< / AccountingSupplierParty >
< AccountingCustomerParty >
< Party >
< PartyName > < Name > Streaming Customer < / Name > < / PartyName >
< / Party >
< / AccountingCustomerParty >
< InvoiceLine > ` );
headerSent = true ;
} else if ( currentItem < itemCount ) {
// Send items in chunks
const chunkSize = Math . min ( 10 , itemCount - currentItem ) ;
let chunk = '' ;
for ( let i = 0 ; i < chunkSize ; i ++ ) {
chunk += `
< InvoiceLine >
< ID > $ { currentItem + i + 1 } < / ID >
< InvoicedQuantity > 1 < / InvoicedQuantity >
< LineExtensionAmount > 100.00 < / LineExtensionAmount >
< Item >
< Description > Streaming Item $ { currentItem + i + 1 } < / Description >
< / Item >
< / InvoiceLine > ` ;
}
this . push ( chunk ) ;
currentItem += chunkSize ;
// Simulate streaming delay
setTimeout ( ( ) = > this . read ( ) , 1 ) ;
} else if ( ! itemsSent ) {
this . push ( `
< / InvoiceLine >
< / Invoice > ` );
itemsSent = true ;
} else {
this . push ( null ) ; // End stream
}
}
} ) ;
} ;
// Test different stream sizes
const streamSizes = [
{ items : 10 , name : 'Small stream' } ,
{ items : 100 , name : 'Medium stream' } ,
{ items : 1000 , name : 'Large stream' } ,
{ items : 5000 , name : 'Very large stream' }
] ;
for ( const size of streamSizes ) {
const startTime = Date . now ( ) ;
const startMemory = process . memoryUsage ( ) ;
const memorySnapshots = [ ] ;
// Create monitoring interval
const monitorInterval = setInterval ( ( ) = > {
memorySnapshots . push ( process . memoryUsage ( ) . heapUsed / 1024 / 1024 ) ;
} , 100 ) ;
try {
// Simulate streaming parsing
const stream = createXMLStream ( size . items ) ;
const chunks = [ ] ;
let totalBytes = 0 ;
await new Promise ( ( resolve , reject ) = > {
stream . on ( 'data' , ( chunk ) = > {
chunks . push ( chunk ) ;
totalBytes += chunk . length ;
} ) ;
stream . on ( 'end' , async ( ) = > {
clearInterval ( monitorInterval ) ;
// Parse accumulated XML
const xml = chunks . join ( '' ) ;
2025-05-29 13:35:36 +00:00
const format = FormatDetector . detectFormat ( xml ) ;
const invoice = await EInvoice . fromXml ( xml ) ;
2025-05-25 19:45:37 +00:00
const endTime = Date . now ( ) ;
const endMemory = process . memoryUsage ( ) ;
results . tests . push ( {
size : size.name ,
items : size.items ,
totalBytes : ( totalBytes / 1024 ) . toFixed ( 2 ) ,
duration : endTime - startTime ,
memoryUsed : ( ( endMemory . heapUsed - startMemory . heapUsed ) / 1024 / 1024 ) . toFixed ( 2 ) ,
peakMemory : Math.max ( . . . memorySnapshots ) . toFixed ( 2 ) ,
avgMemory : ( memorySnapshots . reduce ( ( a , b ) = > a + b , 0 ) / memorySnapshots . length ) . toFixed ( 2 ) ,
throughput : ( ( totalBytes / 1024 ) / ( ( endTime - startTime ) / 1000 ) ) . toFixed ( 2 ) ,
2025-05-29 13:35:36 +00:00
itemsProcessed : size.items
2025-05-25 19:45:37 +00:00
} ) ;
resolve ( null ) ;
} ) ;
stream . on ( 'error' , reject ) ;
} ) ;
} catch ( error ) {
clearInterval ( monitorInterval ) ;
results . tests . push ( {
size : size.name ,
error : error.message
} ) ;
}
}
// Analyze memory efficiency
if ( results . tests . length >= 2 ) {
const small = results . tests [ 0 ] ;
const large = results . tests [ results . tests . length - 1 ] ;
if ( ! small . error && ! large . error ) {
results . memoryEfficiency = {
smallStreamMemory : small.memoryUsed ,
largeStreamMemory : large.memoryUsed ,
memoryScaling : ( parseFloat ( large . memoryUsed ) / parseFloat ( small . memoryUsed ) ) . toFixed ( 2 ) ,
itemScaling : large.items / small . items ,
efficient : parseFloat ( large . memoryUsed ) < parseFloat ( small . memoryUsed ) * ( large . items / small . items )
} ;
}
}
return results ;
}
) ;
// Test 2: Stream transformation pipeline
const streamTransformation = await performanceTracker . measureAsync (
'stream-transformation-pipeline' ,
async ( ) = > {
const results = {
pipelines : [ ] ,
transformationStats : null
} ;
// Create transformation streams
class FormatDetectionStream extends Transform {
2025-05-29 13:35:36 +00:00
constructor ( ) {
2025-05-25 19:45:37 +00:00
super ( { objectMode : true } ) ;
}
async _transform ( chunk : any , encoding : string , callback : Function ) {
try {
2025-05-29 13:35:36 +00:00
const format = FormatDetector . detectFormat ( chunk . content ) ;
2025-05-25 19:45:37 +00:00
this . push ( { . . . chunk , format } ) ;
callback ( ) ;
} catch ( error ) {
callback ( error ) ;
}
}
}
class ValidationStream extends Transform {
2025-05-29 13:35:36 +00:00
constructor ( ) {
2025-05-25 19:45:37 +00:00
super ( { objectMode : true } ) ;
}
async _transform ( chunk : any , encoding : string , callback : Function ) {
try {
if ( chunk . format && chunk . format !== 'unknown' ) {
2025-05-29 13:35:36 +00:00
const invoice = await EInvoice . fromXml ( chunk . content ) ;
const validation = await invoice . validate ( ) ;
this . push ( { . . . chunk , valid : validation.valid , errors : validation.errors?.length || 0 } ) ;
2025-05-25 19:45:37 +00:00
} else {
this . push ( { . . . chunk , valid : false , errors : - 1 } ) ;
}
callback ( ) ;
} catch ( error ) {
callback ( error ) ;
}
}
}
// Test different pipeline configurations
const pipelineConfigs = [
{
name : 'Simple pipeline' ,
batchSize : 10 ,
stages : [ 'detect' , 'validate' ]
} ,
{
name : 'Parallel pipeline' ,
batchSize : 50 ,
stages : [ 'detect' , 'validate' ] ,
parallel : true
} ,
{
name : 'Complex pipeline' ,
batchSize : 100 ,
stages : [ 'detect' , 'parse' , 'validate' , 'convert' ]
}
] ;
// Create test data
const testInvoices = Array . from ( { length : 100 } , ( _ , i ) = > ( {
id : i ,
content : ` <?xml version="1.0"?>
< Invoice xmlns = "urn:oasis:names:specification:ubl:schema:xsd:Invoice-2" >
< ID > PIPELINE - $ { i } < / ID >
< IssueDate > 2024 - 03 - 01 < / IssueDate >
< AccountingSupplierParty > < Party > < PartyName > < Name > Supplier $ { i } < / Name > < / PartyName > < / Party > < / AccountingSupplierParty >
< AccountingCustomerParty > < Party > < PartyName > < Name > Customer $ { i } < / Name > < / PartyName > < / Party > < / AccountingCustomerParty >
< InvoiceLine >
< ID > 1 < / ID >
< InvoicedQuantity > 1 < / InvoicedQuantity >
< LineExtensionAmount > $ { 100 + i } < / LineExtensionAmount >
< / InvoiceLine >
< / Invoice > `
} ) ) ;
for ( const config of pipelineConfigs ) {
const startTime = Date . now ( ) ;
const processedItems = [ ] ;
try {
// Create pipeline
const inputStream = new Readable ( {
objectMode : true ,
read() {
const item = testInvoices . shift ( ) ;
if ( item ) {
this . push ( item ) ;
} else {
this . push ( null ) ;
}
}
} ) ;
const outputStream = new Writable ( {
objectMode : true ,
write ( chunk , encoding , callback ) {
processedItems . push ( chunk ) ;
callback ( ) ;
}
} ) ;
// Build pipeline
let pipeline = inputStream ;
if ( config . stages . includes ( 'detect' ) ) {
2025-05-29 13:35:36 +00:00
pipeline = pipeline . pipe ( new FormatDetectionStream ( ) ) ;
2025-05-25 19:45:37 +00:00
}
if ( config . stages . includes ( 'validate' ) ) {
2025-05-29 13:35:36 +00:00
pipeline = pipeline . pipe ( new ValidationStream ( ) ) ;
2025-05-25 19:45:37 +00:00
}
// Process
await new Promise ( ( resolve , reject ) = > {
pipeline . pipe ( outputStream )
. on ( 'finish' , resolve )
. on ( 'error' , reject ) ;
} ) ;
const endTime = Date . now ( ) ;
const duration = endTime - startTime ;
results . pipelines . push ( {
name : config.name ,
batchSize : config.batchSize ,
stages : config.stages.length ,
itemsProcessed : processedItems.length ,
duration ,
throughput : ( processedItems . length / ( duration / 1000 ) ) . toFixed ( 2 ) ,
avgLatency : ( duration / processedItems . length ) . toFixed ( 2 ) ,
validItems : processedItems.filter ( i = > i . valid ) . length ,
errorItems : processedItems.filter ( i = > ! i . valid ) . length
} ) ;
} catch ( error ) {
results . pipelines . push ( {
name : config.name ,
error : error.message
} ) ;
}
}
// Analyze transformation efficiency
if ( results . pipelines . length > 0 ) {
const validPipelines = results . pipelines . filter ( p = > ! p . error ) ;
if ( validPipelines . length > 0 ) {
const avgThroughput = validPipelines . reduce ( ( sum , p ) = > sum + parseFloat ( p . throughput ) , 0 ) / validPipelines . length ;
const bestPipeline = validPipelines . reduce ( ( best , p ) = >
parseFloat ( p . throughput ) > parseFloat ( best . throughput ) ? p : best
) ;
results . transformationStats = {
avgThroughput : avgThroughput.toFixed ( 2 ) ,
bestPipeline : bestPipeline.name ,
bestThroughput : bestPipeline.throughput
} ;
}
}
return results ;
}
) ;
// Test 3: Backpressure handling
const backpressureHandling = await performanceTracker . measureAsync (
'backpressure-handling' ,
async ( ) = > {
const results = {
scenarios : [ ] ,
backpressureStats : null
} ;
// Test scenarios with different processing speeds
const scenarios = [
{
name : 'Fast producer, slow consumer' ,
producerDelay : 1 ,
consumerDelay : 10 ,
bufferSize : 100
} ,
{
name : 'Slow producer, fast consumer' ,
producerDelay : 10 ,
consumerDelay : 1 ,
bufferSize : 100
} ,
{
name : 'Balanced pipeline' ,
producerDelay : 5 ,
consumerDelay : 5 ,
bufferSize : 100
} ,
{
name : 'High volume burst' ,
producerDelay : 0 ,
consumerDelay : 5 ,
bufferSize : 1000
}
] ;
for ( const scenario of scenarios ) {
const startTime = Date . now ( ) ;
const metrics = {
produced : 0 ,
consumed : 0 ,
buffered : 0 ,
maxBuffered : 0 ,
backpressureEvents : 0
} ;
try {
// Create producer stream
const producer = new Readable ( {
objectMode : true ,
highWaterMark : scenario.bufferSize ,
read() {
if ( metrics . produced < 100 ) {
setTimeout ( ( ) = > {
this . push ( {
id : metrics.produced ++ ,
content : ` <?xml version="1.0"?><Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"><ID>BP- ${ metrics . produced } </ID></Invoice> `
} ) ;
metrics . buffered = metrics . produced - metrics . consumed ;
if ( metrics . buffered > metrics . maxBuffered ) {
metrics . maxBuffered = metrics . buffered ;
}
} , scenario . producerDelay ) ;
} else {
this . push ( null ) ;
}
}
} ) ;
// Create consumer stream with processing
const consumer = new Writable ( {
objectMode : true ,
highWaterMark : scenario.bufferSize ,
async write ( chunk , encoding , callback ) {
// Simulate processing
await new Promise ( resolve = > setTimeout ( resolve , scenario . consumerDelay ) ) ;
// Process invoice
2025-05-29 13:35:36 +00:00
const format = FormatDetector . detectFormat ( chunk . content ) ;
2025-05-25 19:45:37 +00:00
metrics . consumed ++ ;
metrics . buffered = metrics . produced - metrics . consumed ;
callback ( ) ;
}
} ) ;
// Monitor backpressure
producer . on ( 'pause' , ( ) = > metrics . backpressureEvents ++ ) ;
// Process
await new Promise ( ( resolve , reject ) = > {
producer . pipe ( consumer )
. on ( 'finish' , resolve )
. on ( 'error' , reject ) ;
} ) ;
const endTime = Date . now ( ) ;
const duration = endTime - startTime ;
results . scenarios . push ( {
name : scenario.name ,
duration ,
produced : metrics.produced ,
consumed : metrics.consumed ,
maxBuffered : metrics.maxBuffered ,
backpressureEvents : metrics.backpressureEvents ,
throughput : ( metrics . consumed / ( duration / 1000 ) ) . toFixed ( 2 ) ,
efficiency : ( ( metrics . consumed / metrics . produced ) * 100 ) . toFixed ( 2 ) ,
avgBufferUtilization : ( ( metrics . maxBuffered / scenario . bufferSize ) * 100 ) . toFixed ( 2 )
} ) ;
} catch ( error ) {
results . scenarios . push ( {
name : scenario.name ,
error : error.message
} ) ;
}
}
// Analyze backpressure handling
const validScenarios = results . scenarios . filter ( s = > ! s . error ) ;
if ( validScenarios . length > 0 ) {
results . backpressureStats = {
avgBackpressureEvents : ( validScenarios . reduce ( ( sum , s ) = > sum + s . backpressureEvents , 0 ) / validScenarios . length ) . toFixed ( 2 ) ,
maxBufferUtilization : Math.max ( . . . validScenarios . map ( s = > parseFloat ( s . avgBufferUtilization ) ) ) . toFixed ( 2 ) ,
recommendation : validScenarios.some ( s = > s . backpressureEvents > 10 ) ?
'Consider increasing buffer sizes or optimizing processing speed' :
'Backpressure handling is adequate'
} ;
}
return results ;
}
) ;
// Test 4: Corpus streaming analysis
const corpusStreaming = await performanceTracker . measureAsync (
'corpus-streaming-analysis' ,
async ( ) = > {
2025-05-29 13:35:36 +00:00
const files = await CorpusLoader . loadPattern ( '**/*.xml' ) ;
2025-05-25 19:45:37 +00:00
const results = {
streamableFiles : 0 ,
nonStreamableFiles : 0 ,
processingStats : {
streamed : [ ] ,
traditional : [ ]
} ,
comparison : null
} ;
// Process sample files both ways
const sampleFiles = files . slice ( 0 , 20 ) ;
for ( const file of sampleFiles ) {
try {
2025-05-29 13:35:36 +00:00
const stats = await plugins . fs . stat ( file . path ) ;
2025-05-25 19:45:37 +00:00
const fileSize = stats . size ;
// Traditional processing
const traditionalStart = Date . now ( ) ;
2025-05-29 13:35:36 +00:00
const content = await plugins . fs . readFile ( file . path , 'utf-8' ) ;
const format = FormatDetector . detectFormat ( content ) ;
2025-05-25 19:45:37 +00:00
if ( format && format !== 'unknown' ) {
2025-05-29 13:35:36 +00:00
const invoice = await EInvoice . fromXml ( content ) ;
await invoice . validate ( ) ;
2025-05-25 19:45:37 +00:00
}
const traditionalEnd = Date . now ( ) ;
results . processingStats . traditional . push ( {
size : fileSize ,
time : traditionalEnd - traditionalStart
} ) ;
// Simulated streaming (chunked reading)
const streamingStart = Date . now ( ) ;
const chunkSize = 64 * 1024 ; // 64KB chunks
const chunks = [ ] ;
// Read in chunks
2025-05-29 13:35:36 +00:00
const fd = await plugins . fs . open ( file . path , 'r' ) ;
2025-05-25 19:45:37 +00:00
const buffer = Buffer . alloc ( chunkSize ) ;
let position = 0 ;
while ( true ) {
2025-05-29 13:35:36 +00:00
const result = await fd . read ( buffer , 0 , chunkSize , position ) ;
const bytesRead = result . bytesRead ;
2025-05-25 19:45:37 +00:00
if ( bytesRead === 0 ) break ;
2025-05-29 13:35:36 +00:00
chunks . push ( Buffer . from ( buffer . slice ( 0 , bytesRead ) ) . toString ( 'utf-8' ) ) ;
2025-05-25 19:45:37 +00:00
position += bytesRead ;
}
await fd . close ( ) ;
// Process accumulated content
const streamedContent = chunks . join ( '' ) ;
2025-05-29 13:35:36 +00:00
const streamedFormat = FormatDetector . detectFormat ( streamedContent ) ;
2025-05-25 19:45:37 +00:00
if ( streamedFormat && streamedFormat !== 'unknown' ) {
2025-05-29 13:35:36 +00:00
const invoice = await EInvoice . fromXml ( streamedContent ) ;
await invoice . validate ( ) ;
2025-05-25 19:45:37 +00:00
}
const streamingEnd = Date . now ( ) ;
results . processingStats . streamed . push ( {
size : fileSize ,
time : streamingEnd - streamingStart ,
chunks : chunks.length
} ) ;
// Determine if file benefits from streaming
if ( fileSize > 100 * 1024 ) { // Files > 100KB
results . streamableFiles ++ ;
} else {
results . nonStreamableFiles ++ ;
}
} catch ( error ) {
// Skip files that can't be processed
}
}
// Compare approaches
if ( results . processingStats . traditional . length > 0 && results . processingStats . streamed . length > 0 ) {
const avgTraditional = results . processingStats . traditional . reduce ( ( sum , s ) = > sum + s . time , 0 ) /
results . processingStats . traditional . length ;
const avgStreamed = results . processingStats . streamed . reduce ( ( sum , s ) = > sum + s . time , 0 ) /
results . processingStats . streamed . length ;
const largeFiles = results . processingStats . traditional . filter ( s = > s . size > 100 * 1024 ) ;
const avgTraditionalLarge = largeFiles . length > 0 ?
largeFiles . reduce ( ( sum , s ) = > sum + s . time , 0 ) / largeFiles.length : 0 ;
const largeStreamed = results . processingStats . streamed . filter ( s = > s . size > 100 * 1024 ) ;
const avgStreamedLarge = largeStreamed . length > 0 ?
largeStreamed . reduce ( ( sum , s ) = > sum + s . time , 0 ) / largeStreamed.length : 0 ;
results . comparison = {
avgTraditionalTime : avgTraditional.toFixed ( 2 ) ,
avgStreamedTime : avgStreamed.toFixed ( 2 ) ,
overheadPercent : ( ( avgStreamed - avgTraditional ) / avgTraditional * 100 ) . toFixed ( 2 ) ,
largeFileImprovement : avgTraditionalLarge > 0 && avgStreamedLarge > 0 ?
( ( avgTraditionalLarge - avgStreamedLarge ) / avgTraditionalLarge * 100 ) . toFixed ( 2 ) : 'N/A' ,
recommendation : avgStreamed < avgTraditional * 1.1 ?
'Streaming provides benefits for this workload' :
'Traditional processing is more efficient for this workload'
} ;
}
return results ;
}
) ;
// Test 5: Real-time streaming performance
const realtimeStreaming = await performanceTracker . measureAsync (
'realtime-streaming' ,
async ( ) = > {
const results = {
latencyTests : [ ] ,
jitterAnalysis : null
} ;
// Test real-time processing with different arrival rates
const arrivalRates = [
{ name : 'Low rate' , invoicesPerSecond : 10 } ,
{ name : 'Medium rate' , invoicesPerSecond : 50 } ,
{ name : 'High rate' , invoicesPerSecond : 100 } ,
{ name : 'Burst rate' , invoicesPerSecond : 200 }
] ;
for ( const rate of arrivalRates ) {
const testDuration = 5000 ; // 5 seconds
const interval = 1000 / rate . invoicesPerSecond ;
const latencies = [ ] ;
let processed = 0 ;
let dropped = 0 ;
const startTime = Date . now ( ) ;
// Create processing queue
const queue = [ ] ;
let processing = false ;
const processNext = async ( ) = > {
if ( processing || queue . length === 0 ) return ;
processing = true ;
const item = queue . shift ( ) ;
try {
const processStart = Date . now ( ) ;
2025-05-29 13:35:36 +00:00
const format = FormatDetector . detectFormat ( item . content ) ;
const invoice = await EInvoice . fromXml ( item . content ) ;
await invoice . validate ( ) ;
2025-05-25 19:45:37 +00:00
const latency = Date . now ( ) - item . arrivalTime ;
latencies . push ( latency ) ;
processed ++ ;
} catch ( error ) {
dropped ++ ;
}
processing = false ;
if ( queue . length > 0 ) {
setImmediate ( processNext ) ;
}
} ;
// Generate invoices at specified rate
const generator = setInterval ( ( ) = > {
const invoice = {
arrivalTime : Date.now ( ) ,
content : ` <?xml version="1.0"?><Invoice xmlns="urn:oasis:names:specification:ubl:schema:xsd:Invoice-2"><ID>RT- ${ Date . now ( ) } </ID><IssueDate>2024-03-01</IssueDate></Invoice> `
} ;
// Apply backpressure - drop if queue is too large
if ( queue . length < 100 ) {
queue . push ( invoice ) ;
processNext ( ) ;
} else {
dropped ++ ;
}
} , interval ) ;
// Run test
await new Promise ( resolve = > setTimeout ( resolve , testDuration ) ) ;
clearInterval ( generator ) ;
// Process remaining items
while ( queue . length > 0 ) {
await new Promise ( resolve = > setTimeout ( resolve , 10 ) ) ;
}
// Calculate statistics
if ( latencies . length > 0 ) {
latencies . sort ( ( a , b ) = > a - b ) ;
const avgLatency = latencies . reduce ( ( a , b ) = > a + b , 0 ) / latencies . length ;
const p50 = latencies [ Math . floor ( latencies . length * 0.5 ) ] ;
const p95 = latencies [ Math . floor ( latencies . length * 0.95 ) ] ;
const p99 = latencies [ Math . floor ( latencies . length * 0.99 ) ] ;
// Calculate jitter
const jitters = [ ] ;
for ( let i = 1 ; i < latencies . length ; i ++ ) {
jitters . push ( Math . abs ( latencies [ i ] - latencies [ i - 1 ] ) ) ;
}
const avgJitter = jitters . length > 0 ?
jitters . reduce ( ( a , b ) = > a + b , 0 ) / jitters.length : 0 ;
results . latencyTests . push ( {
rate : rate.name ,
targetRate : rate.invoicesPerSecond ,
processed ,
dropped ,
actualRate : ( processed / ( testDuration / 1000 ) ) . toFixed ( 2 ) ,
avgLatency : avgLatency.toFixed ( 2 ) ,
p50Latency : p50 ,
p95Latency : p95 ,
p99Latency : p99 ,
avgJitter : avgJitter.toFixed ( 2 ) ,
dropRate : ( ( dropped / ( processed + dropped ) ) * 100 ) . toFixed ( 2 )
} ) ;
}
}
// Analyze jitter and stability
if ( results . latencyTests . length > 0 ) {
const avgJitters = results . latencyTests . map ( t = > parseFloat ( t . avgJitter ) ) ;
const avgDropRates = results . latencyTests . map ( t = > parseFloat ( t . dropRate ) ) ;
results . jitterAnalysis = {
avgJitter : ( avgJitters . reduce ( ( a , b ) = > a + b , 0 ) / avgJitters . length ) . toFixed ( 2 ) ,
maxJitter : Math.max ( . . . avgJitters ) . toFixed ( 2 ) ,
avgDropRate : ( avgDropRates . reduce ( ( a , b ) = > a + b , 0 ) / avgDropRates . length ) . toFixed ( 2 ) ,
stable : Math.max ( . . . avgJitters ) < 50 && Math . max ( . . . avgDropRates ) < 5 ,
recommendation : Math.max ( . . . avgDropRates ) > 10 ?
'System cannot handle high arrival rates - consider scaling or optimization' :
'System handles real-time streaming adequately'
} ;
}
return results ;
}
) ;
// Summary
2025-05-29 13:35:36 +00:00
console . log ( '\n=== PERF-09: Streaming Performance Test Summary ===' ) ;
2025-05-25 19:45:37 +00:00
2025-05-29 13:35:36 +00:00
console . log ( '\nStreaming XML Parsing:' ) ;
console . log ( ' Stream Size | Items | Data | Duration | Memory | Peak | Throughput' ) ;
console . log ( ' ------------|-------|---------|----------|--------|--------|----------' ) ;
streamingXMLParsing . tests . forEach ( ( test : any ) = > {
2025-05-25 19:45:37 +00:00
if ( ! test . error ) {
2025-05-29 13:35:36 +00:00
console . log ( ` ${ test . size . padEnd ( 11 ) } | ${ String ( test . items ) . padEnd ( 5 ) } | ${ test . totalBytes . padEnd ( 7 ) } KB | ${ String ( test . duration + 'ms' ) . padEnd ( 8 ) } | ${ test . memoryUsed . padEnd ( 6 ) } MB | ${ test . peakMemory . padEnd ( 6 ) } MB | ${ test . throughput } KB/s ` ) ;
2025-05-25 19:45:37 +00:00
}
} ) ;
2025-05-29 13:35:36 +00:00
if ( streamingXMLParsing . memoryEfficiency ) {
console . log ( ` Memory efficiency: ${ streamingXMLParsing . memoryEfficiency . efficient ? 'GOOD ✅' : 'POOR ⚠️' } ` ) ;
console . log ( ` Scaling: ${ streamingXMLParsing . memoryEfficiency . memoryScaling } x memory for ${ streamingXMLParsing . memoryEfficiency . itemScaling } x items ` ) ;
2025-05-25 19:45:37 +00:00
}
2025-05-29 13:35:36 +00:00
console . log ( '\nStream Transformation Pipeline:' ) ;
streamTransformation . pipelines . forEach ( ( pipeline : any ) = > {
2025-05-25 19:45:37 +00:00
if ( ! pipeline . error ) {
2025-05-29 13:35:36 +00:00
console . log ( ` ${ pipeline . name } : ` ) ;
console . log ( ` - Stages: ${ pipeline . stages } , Items: ${ pipeline . itemsProcessed } ` ) ;
console . log ( ` - Duration: ${ pipeline . duration } ms, Throughput: ${ pipeline . throughput } /s ` ) ;
console . log ( ` - Valid: ${ pipeline . validItems } , Errors: ${ pipeline . errorItems } ` ) ;
2025-05-25 19:45:37 +00:00
}
} ) ;
2025-05-29 13:35:36 +00:00
if ( streamTransformation . transformationStats ) {
console . log ( ` Best pipeline: ${ streamTransformation . transformationStats . bestPipeline } ( ${ streamTransformation . transformationStats . bestThroughput } /s) ` ) ;
2025-05-25 19:45:37 +00:00
}
2025-05-29 13:35:36 +00:00
console . log ( '\nBackpressure Handling:' ) ;
console . log ( ' Scenario | Duration | Produced | Consumed | Max Buffer | BP Events | Efficiency' ) ;
console . log ( ' ----------------------------|----------|----------|----------|------------|-----------|----------' ) ;
backpressureHandling . scenarios . forEach ( ( scenario : any ) = > {
2025-05-25 19:45:37 +00:00
if ( ! scenario . error ) {
2025-05-29 13:35:36 +00:00
console . log ( ` ${ scenario . name . padEnd ( 27 ) } | ${ String ( scenario . duration + 'ms' ) . padEnd ( 8 ) } | ${ String ( scenario . produced ) . padEnd ( 8 ) } | ${ String ( scenario . consumed ) . padEnd ( 8 ) } | ${ String ( scenario . maxBuffered ) . padEnd ( 10 ) } | ${ String ( scenario . backpressureEvents ) . padEnd ( 9 ) } | ${ scenario . efficiency } % ` ) ;
2025-05-25 19:45:37 +00:00
}
} ) ;
2025-05-29 13:35:36 +00:00
if ( backpressureHandling . backpressureStats ) {
console . log ( ` ${ backpressureHandling . backpressureStats . recommendation } ` ) ;
2025-05-25 19:45:37 +00:00
}
2025-05-29 13:35:36 +00:00
console . log ( '\nCorpus Streaming Analysis:' ) ;
console . log ( ` Streamable files: ${ corpusStreaming . streamableFiles } ` ) ;
console . log ( ` Non-streamable files: ${ corpusStreaming . nonStreamableFiles } ` ) ;
if ( corpusStreaming . comparison ) {
console . log ( ` Traditional avg: ${ corpusStreaming . comparison . avgTraditionalTime } ms ` ) ;
console . log ( ` Streamed avg: ${ corpusStreaming . comparison . avgStreamedTime } ms ` ) ;
console . log ( ` Overhead: ${ corpusStreaming . comparison . overheadPercent } % ` ) ;
console . log ( ` Large file improvement: ${ corpusStreaming . comparison . largeFileImprovement } % ` ) ;
console . log ( ` ${ corpusStreaming . comparison . recommendation } ` ) ;
2025-05-25 19:45:37 +00:00
}
2025-05-29 13:35:36 +00:00
console . log ( '\nReal-time Streaming:' ) ;
console . log ( ' Rate | Target | Actual | Processed | Dropped | Avg Latency | P95 | Jitter' ) ;
console . log ( ' ------------|--------|--------|-----------|---------|-------------|--------|-------' ) ;
realtimeStreaming . latencyTests . forEach ( ( test : any ) = > {
console . log ( ` ${ test . rate . padEnd ( 11 ) } | ${ String ( test . targetRate ) . padEnd ( 6 ) } | ${ test . actualRate . padEnd ( 6 ) } | ${ String ( test . processed ) . padEnd ( 9 ) } | ${ test . dropRate . padEnd ( 7 ) } % | ${ test . avgLatency . padEnd ( 11 ) } ms | ${ String ( test . p95Latency ) . padEnd ( 6 ) } ms | ${ test . avgJitter } ms ` ) ;
2025-05-25 19:45:37 +00:00
} ) ;
2025-05-29 13:35:36 +00:00
if ( realtimeStreaming . jitterAnalysis ) {
console . log ( ` System stability: ${ realtimeStreaming . jitterAnalysis . stable ? 'STABLE ✅' : 'UNSTABLE ⚠️' } ` ) ;
console . log ( ` ${ realtimeStreaming . jitterAnalysis . recommendation } ` ) ;
2025-05-25 19:45:37 +00:00
}
// Performance targets check
2025-05-29 13:35:36 +00:00
console . log ( '\n=== Performance Targets Check ===' ) ;
const streamingEfficient = streamingXMLParsing . memoryEfficiency ? . efficient || false ;
const realtimeStable = realtimeStreaming . jitterAnalysis ? . stable || false ;
2025-05-25 19:45:37 +00:00
2025-05-29 13:35:36 +00:00
console . log ( ` Streaming memory efficiency: ${ streamingEfficient ? 'EFFICIENT ✅' : 'INEFFICIENT ⚠️' } ` ) ;
console . log ( ` Real-time stability: ${ realtimeStable ? 'STABLE ✅' : 'UNSTABLE ⚠️' } ` ) ;
2025-05-25 19:45:37 +00:00
// Overall performance summary
2025-05-29 13:35:36 +00:00
console . log ( '\n=== Overall Performance Summary ===' ) ;
console . log ( performanceTracker . getSummary ( ) ) ;
2025-05-25 19:45:37 +00:00
} ) ;
tap . start ( ) ;