import { Worker } from 'worker_threads'; import * as path from 'path'; import type { ValidationResult } from './validation.types.js'; import type { SchematronOptions } from './schematron.validator.js'; /** * Worker pool for Schematron validation * Provides non-blocking validation in worker threads */ export class SchematronWorkerPool { private workers: Worker[] = []; private availableWorkers: Worker[] = []; private taskQueue: Array<{ xmlContent: string; options: SchematronOptions; resolve: (results: ValidationResult[]) => void; reject: (error: Error) => void; }> = []; private maxWorkers: number; private schematronRules: string = ''; constructor(maxWorkers: number = 4) { this.maxWorkers = maxWorkers; } /** * Initialize worker pool */ public async initialize(schematronRules: string): Promise { this.schematronRules = schematronRules; // Create workers for (let i = 0; i < this.maxWorkers; i++) { await this.createWorker(); } } /** * Create a new worker */ private async createWorker(): Promise { const workerPath = path.join(import.meta.url, 'schematron.worker.impl.js'); const worker = new Worker(` const { parentPort } = require('worker_threads'); const SaxonJS = require('saxon-js'); let compiledStylesheet = null; parentPort.on('message', async (msg) => { try { if (msg.type === 'init') { // Compile Schematron to XSLT compiledStylesheet = await SaxonJS.compile({ stylesheetText: msg.xslt, warnings: 'silent' }); parentPort.postMessage({ type: 'ready' }); } else if (msg.type === 'validate') { if (!compiledStylesheet) { throw new Error('Worker not initialized'); } // Transform XML with compiled Schematron const result = await SaxonJS.transform({ stylesheetInternal: compiledStylesheet, sourceText: msg.xmlContent, destination: 'serialized', stylesheetParams: msg.options.parameters || {} }); parentPort.postMessage({ type: 'result', svrl: result.principalResult }); } } catch (error) { parentPort.postMessage({ type: 'error', error: error.message }); } }); `, { eval: true }); // Initialize worker with Schematron rules await new Promise((resolve, reject) => { worker.once('message', (msg) => { if (msg.type === 'ready') { resolve(); } else if (msg.type === 'error') { reject(new Error(msg.error)); } }); // Send initialization message worker.postMessage({ type: 'init', xslt: this.generateXSLTFromSchematron(this.schematronRules) }); }); this.workers.push(worker); this.availableWorkers.push(worker); } /** * Validate XML using worker pool */ public async validate( xmlContent: string, options: SchematronOptions = {} ): Promise { return new Promise((resolve, reject) => { // Add task to queue this.taskQueue.push({ xmlContent, options, resolve, reject }); this.processTasks(); }); } /** * Process queued validation tasks */ private processTasks(): void { while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) { const task = this.taskQueue.shift()!; const worker = this.availableWorkers.shift()!; // Set up one-time listeners const messageHandler = (msg: any) => { if (msg.type === 'result') { // Parse SVRL and return results const results = this.parseSVRL(msg.svrl); task.resolve(results); // Return worker to pool this.availableWorkers.push(worker); worker.removeListener('message', messageHandler); // Process next task this.processTasks(); } else if (msg.type === 'error') { task.reject(new Error(msg.error)); // Return worker to pool this.availableWorkers.push(worker); worker.removeListener('message', messageHandler); // Process next task this.processTasks(); } }; worker.on('message', messageHandler); // Send validation task worker.postMessage({ type: 'validate', xmlContent: task.xmlContent, options: task.options }); } } /** * Parse SVRL output */ private parseSVRL(svrlXml: string): ValidationResult[] { const results: ValidationResult[] = []; // This would use the same parsing logic as SchematronValidator // Simplified for brevity return results; } /** * Generate XSLT from Schematron (simplified) */ private generateXSLTFromSchematron(schematron: string): string { // Simplified - would use ISO Schematron skeleton in production return ` `; } /** * Terminate all workers */ public async terminate(): Promise { await Promise.all(this.workers.map(w => w.terminate())); this.workers = []; this.availableWorkers = []; this.taskQueue = []; } /** * Get pool statistics */ public getStats(): { totalWorkers: number; availableWorkers: number; queuedTasks: number; } { return { totalWorkers: this.workers.length, availableWorkers: this.availableWorkers.length, queuedTasks: this.taskQueue.length }; } }