import { logger } from './logger.js'; interface ILogEvent { level: 'info' | 'warn' | 'error' | 'debug'; message: string; data?: any; count: number; firstSeen: number; lastSeen: number; } interface IAggregatedEvent { key: string; events: Map; flushTimer?: NodeJS.Timeout; } /** * Log deduplication utility to reduce log spam for repetitive events */ export class LogDeduplicator { private globalFlushTimer?: NodeJS.Timeout; private aggregatedEvents: Map = new Map(); private flushInterval: number = 5000; // 5 seconds private maxBatchSize: number = 100; private rapidEventThreshold: number = 50; // Flush early if this many events in 1 second private lastRapidCheck: number = Date.now(); constructor(flushInterval?: number) { if (flushInterval) { this.flushInterval = flushInterval; } // Set up global periodic flush to ensure logs are emitted regularly this.globalFlushTimer = setInterval(() => { this.flushAll(); }, this.flushInterval * 2); // Flush everything every 2x the normal interval if (this.globalFlushTimer.unref) { this.globalFlushTimer.unref(); } } /** * Log a deduplicated event * @param key - Aggregation key (e.g., 'connection-rejected', 'cleanup-batch') * @param level - Log level * @param message - Log message template * @param data - Additional data * @param dedupeKey - Deduplication key within the aggregation (e.g., IP address, reason) */ public log( key: string, level: 'info' | 'warn' | 'error' | 'debug', message: string, data?: any, dedupeKey?: string ): void { const eventKey = dedupeKey || message; const now = Date.now(); if (!this.aggregatedEvents.has(key)) { this.aggregatedEvents.set(key, { key, events: new Map(), flushTimer: undefined }); } const aggregated = this.aggregatedEvents.get(key)!; if (aggregated.events.has(eventKey)) { const event = aggregated.events.get(eventKey)!; event.count++; event.lastSeen = now; if (data) { event.data = { ...event.data, ...data }; } } else { aggregated.events.set(eventKey, { level, message, data, count: 1, firstSeen: now, lastSeen: now }); } // Check for rapid events (many events in short time) const totalEvents = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0); // If we're getting flooded with events, flush more frequently if (now - this.lastRapidCheck < 1000 && totalEvents >= this.rapidEventThreshold) { this.flush(key); this.lastRapidCheck = now; } else if (aggregated.events.size >= this.maxBatchSize) { // Check if we should flush due to size this.flush(key); } else if (!aggregated.flushTimer) { // Schedule flush aggregated.flushTimer = setTimeout(() => { this.flush(key); }, this.flushInterval); if (aggregated.flushTimer.unref) { aggregated.flushTimer.unref(); } } // Update rapid check time if (now - this.lastRapidCheck >= 1000) { this.lastRapidCheck = now; } } /** * Flush aggregated events for a specific key */ public flush(key: string): void { const aggregated = this.aggregatedEvents.get(key); if (!aggregated || aggregated.events.size === 0) { return; } if (aggregated.flushTimer) { clearTimeout(aggregated.flushTimer); aggregated.flushTimer = undefined; } // Emit aggregated log based on the key switch (key) { case 'connection-rejected': this.flushConnectionRejections(aggregated); break; case 'connection-cleanup': this.flushConnectionCleanups(aggregated); break; case 'connection-terminated': this.flushConnectionTerminations(aggregated); break; case 'ip-rejected': this.flushIPRejections(aggregated); break; default: this.flushGeneric(aggregated); } // Clear events aggregated.events.clear(); } /** * Flush all pending events */ public flushAll(): void { for (const key of this.aggregatedEvents.keys()) { this.flush(key); } } private flushConnectionRejections(aggregated: IAggregatedEvent): void { const totalCount = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0); const byReason = new Map(); for (const [, event] of aggregated.events) { const reason = event.data?.reason || 'unknown'; byReason.set(reason, (byReason.get(reason) || 0) + event.count); } const reasonSummary = Array.from(byReason.entries()) .sort((a, b) => b[1] - a[1]) .map(([reason, count]) => `${reason}: ${count}`) .join(', '); const duration = Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)); logger.log('warn', `[SUMMARY] Rejected ${totalCount} connections in ${Math.round(duration/1000)}s`, { reasons: reasonSummary, uniqueIPs: aggregated.events.size, component: 'connection-dedup' }); } private flushConnectionCleanups(aggregated: IAggregatedEvent): void { const totalCount = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0); const byReason = new Map(); for (const [, event] of aggregated.events) { const reason = event.data?.reason || 'normal'; byReason.set(reason, (byReason.get(reason) || 0) + event.count); } const reasonSummary = Array.from(byReason.entries()) .sort((a, b) => b[1] - a[1]) .slice(0, 5) // Top 5 reasons .map(([reason, count]) => `${reason}: ${count}`) .join(', '); logger.log('info', `Cleaned up ${totalCount} connections`, { reasons: reasonSummary, duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)), component: 'connection-dedup' }); } private flushConnectionTerminations(aggregated: IAggregatedEvent): void { const totalCount = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0); const byReason = new Map(); const byIP = new Map(); let lastActiveCount = 0; for (const [, event] of aggregated.events) { const reason = event.data?.reason || 'unknown'; const ip = event.data?.remoteIP || 'unknown'; byReason.set(reason, (byReason.get(reason) || 0) + event.count); // Track by IP if (ip !== 'unknown') { byIP.set(ip, (byIP.get(ip) || 0) + event.count); } // Track the last active connection count if (event.data?.activeConnections !== undefined) { lastActiveCount = event.data.activeConnections; } } const reasonSummary = Array.from(byReason.entries()) .sort((a, b) => b[1] - a[1]) .slice(0, 5) // Top 5 reasons .map(([reason, count]) => `${reason}: ${count}`) .join(', '); // Show top IPs if there are many different ones let ipInfo = ''; if (byIP.size > 3) { const topIPs = Array.from(byIP.entries()) .sort((a, b) => b[1] - a[1]) .slice(0, 3) .map(([ip, count]) => `${ip} (${count})`) .join(', '); ipInfo = `, from ${byIP.size} IPs (top: ${topIPs})`; } else if (byIP.size > 0) { ipInfo = `, IPs: ${Array.from(byIP.keys()).join(', ')}`; } const duration = Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)); // Special handling for localhost connections (HttpProxy) const localhostCount = byIP.get('::ffff:127.0.0.1') || 0; if (localhostCount > 0 && byIP.size === 1) { // All connections are from localhost (HttpProxy) logger.log('info', `[SUMMARY] ${totalCount} HttpProxy connections terminated in ${Math.round(duration/1000)}s`, { reasons: reasonSummary, activeConnections: lastActiveCount, component: 'connection-dedup' }); } else { logger.log('info', `[SUMMARY] ${totalCount} connections terminated in ${Math.round(duration/1000)}s`, { reasons: reasonSummary, activeConnections: lastActiveCount, uniqueReasons: byReason.size, ...(ipInfo ? { ips: ipInfo } : {}), component: 'connection-dedup' }); } } private flushIPRejections(aggregated: IAggregatedEvent): void { const byIP = new Map }>(); for (const [ip, event] of aggregated.events) { if (!byIP.has(ip)) { byIP.set(ip, { count: 0, reasons: new Set() }); } const ipData = byIP.get(ip)!; ipData.count += event.count; if (event.data?.reason) { ipData.reasons.add(event.data.reason); } } // Log top offenders const topOffenders = Array.from(byIP.entries()) .sort((a, b) => b[1].count - a[1].count) .slice(0, 10) .map(([ip, data]) => `${ip} (${data.count}x, ${Array.from(data.reasons).join('/')})`) .join(', '); const totalRejections = Array.from(byIP.values()).reduce((sum, data) => sum + data.count, 0); const duration = Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)); logger.log('warn', `[SUMMARY] Rejected ${totalRejections} connections from ${byIP.size} IPs in ${Math.round(duration/1000)}s`, { topOffenders, component: 'ip-dedup' }); } private flushGeneric(aggregated: IAggregatedEvent): void { const totalCount = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0); const level = aggregated.events.values().next().value?.level || 'info'; // Special handling for IP cleanup events if (aggregated.key === 'ip-cleanup') { const totalCleaned = Array.from(aggregated.events.values()).reduce((sum, e) => { return sum + (e.data?.cleanedIPs || 0) + (e.data?.cleanedRateLimits || 0); }, 0); if (totalCleaned > 0) { logger.log(level as any, `IP tracking cleanup: removed ${totalCleaned} entries across ${totalCount} cleanup cycles`, { duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)), component: 'log-dedup' }); } } else { logger.log(level as any, `${aggregated.key}: ${totalCount} events`, { uniqueEvents: aggregated.events.size, duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)), component: 'log-dedup' }); } } /** * Cleanup and stop deduplication */ public cleanup(): void { this.flushAll(); if (this.globalFlushTimer) { clearInterval(this.globalFlushTimer); this.globalFlushTimer = undefined; } for (const aggregated of this.aggregatedEvents.values()) { if (aggregated.flushTimer) { clearTimeout(aggregated.flushTimer); } } this.aggregatedEvents.clear(); } } // Global instance for connection-related log deduplication export const connectionLogDeduplicator = new LogDeduplicator(5000); // 5 second batches // Ensure logs are flushed on process exit process.on('beforeExit', () => { connectionLogDeduplicator.flushAll(); }); process.on('SIGINT', () => { connectionLogDeduplicator.cleanup(); process.exit(0); }); process.on('SIGTERM', () => { connectionLogDeduplicator.cleanup(); process.exit(0); });