280 lines
8.3 KiB
TypeScript
280 lines
8.3 KiB
TypeScript
import { logger } from './logger.js';
|
|
|
|
interface ILogEvent {
|
|
level: 'info' | 'warn' | 'error' | 'debug';
|
|
message: string;
|
|
data?: any;
|
|
count: number;
|
|
firstSeen: number;
|
|
lastSeen: number;
|
|
}
|
|
|
|
interface IAggregatedEvent {
|
|
key: string;
|
|
events: Map<string, ILogEvent>;
|
|
flushTimer?: NodeJS.Timeout;
|
|
}
|
|
|
|
/**
|
|
* Log deduplication utility to reduce log spam for repetitive events
|
|
*/
|
|
export class LogDeduplicator {
|
|
private globalFlushTimer?: NodeJS.Timeout;
|
|
private aggregatedEvents: Map<string, IAggregatedEvent> = new Map();
|
|
private flushInterval: number = 5000; // 5 seconds
|
|
private maxBatchSize: number = 100;
|
|
|
|
constructor(flushInterval?: number) {
|
|
if (flushInterval) {
|
|
this.flushInterval = flushInterval;
|
|
}
|
|
|
|
// Set up global periodic flush to ensure logs are emitted regularly
|
|
this.globalFlushTimer = setInterval(() => {
|
|
this.flushAll();
|
|
}, this.flushInterval * 2); // Flush everything every 2x the normal interval
|
|
|
|
if (this.globalFlushTimer.unref) {
|
|
this.globalFlushTimer.unref();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Log a deduplicated event
|
|
* @param key - Aggregation key (e.g., 'connection-rejected', 'cleanup-batch')
|
|
* @param level - Log level
|
|
* @param message - Log message template
|
|
* @param data - Additional data
|
|
* @param dedupeKey - Deduplication key within the aggregation (e.g., IP address, reason)
|
|
*/
|
|
public log(
|
|
key: string,
|
|
level: 'info' | 'warn' | 'error' | 'debug',
|
|
message: string,
|
|
data?: any,
|
|
dedupeKey?: string
|
|
): void {
|
|
const eventKey = dedupeKey || message;
|
|
const now = Date.now();
|
|
|
|
if (!this.aggregatedEvents.has(key)) {
|
|
this.aggregatedEvents.set(key, {
|
|
key,
|
|
events: new Map(),
|
|
flushTimer: undefined
|
|
});
|
|
}
|
|
|
|
const aggregated = this.aggregatedEvents.get(key)!;
|
|
|
|
if (aggregated.events.has(eventKey)) {
|
|
const event = aggregated.events.get(eventKey)!;
|
|
event.count++;
|
|
event.lastSeen = now;
|
|
if (data) {
|
|
event.data = { ...event.data, ...data };
|
|
}
|
|
} else {
|
|
aggregated.events.set(eventKey, {
|
|
level,
|
|
message,
|
|
data,
|
|
count: 1,
|
|
firstSeen: now,
|
|
lastSeen: now
|
|
});
|
|
}
|
|
|
|
// Check if we should flush due to size
|
|
if (aggregated.events.size >= this.maxBatchSize) {
|
|
this.flush(key);
|
|
} else if (!aggregated.flushTimer) {
|
|
// Schedule flush
|
|
aggregated.flushTimer = setTimeout(() => {
|
|
this.flush(key);
|
|
}, this.flushInterval);
|
|
|
|
if (aggregated.flushTimer.unref) {
|
|
aggregated.flushTimer.unref();
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush aggregated events for a specific key
|
|
*/
|
|
public flush(key: string): void {
|
|
const aggregated = this.aggregatedEvents.get(key);
|
|
if (!aggregated || aggregated.events.size === 0) {
|
|
return;
|
|
}
|
|
|
|
if (aggregated.flushTimer) {
|
|
clearTimeout(aggregated.flushTimer);
|
|
aggregated.flushTimer = undefined;
|
|
}
|
|
|
|
// Emit aggregated log based on the key
|
|
switch (key) {
|
|
case 'connection-rejected':
|
|
this.flushConnectionRejections(aggregated);
|
|
break;
|
|
case 'connection-cleanup':
|
|
this.flushConnectionCleanups(aggregated);
|
|
break;
|
|
case 'ip-rejected':
|
|
this.flushIPRejections(aggregated);
|
|
break;
|
|
default:
|
|
this.flushGeneric(aggregated);
|
|
}
|
|
|
|
// Clear events
|
|
aggregated.events.clear();
|
|
}
|
|
|
|
/**
|
|
* Flush all pending events
|
|
*/
|
|
public flushAll(): void {
|
|
for (const key of this.aggregatedEvents.keys()) {
|
|
this.flush(key);
|
|
}
|
|
}
|
|
|
|
private flushConnectionRejections(aggregated: IAggregatedEvent): void {
|
|
const totalCount = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0);
|
|
const byReason = new Map<string, number>();
|
|
|
|
for (const [, event] of aggregated.events) {
|
|
const reason = event.data?.reason || 'unknown';
|
|
byReason.set(reason, (byReason.get(reason) || 0) + event.count);
|
|
}
|
|
|
|
const reasonSummary = Array.from(byReason.entries())
|
|
.sort((a, b) => b[1] - a[1])
|
|
.map(([reason, count]) => `${reason}: ${count}`)
|
|
.join(', ');
|
|
|
|
logger.log('warn', `Rejected ${totalCount} connections`, {
|
|
reasons: reasonSummary,
|
|
uniqueIPs: aggregated.events.size,
|
|
duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)),
|
|
component: 'connection-dedup'
|
|
});
|
|
}
|
|
|
|
private flushConnectionCleanups(aggregated: IAggregatedEvent): void {
|
|
const totalCount = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0);
|
|
const byReason = new Map<string, number>();
|
|
|
|
for (const [, event] of aggregated.events) {
|
|
const reason = event.data?.reason || 'normal';
|
|
byReason.set(reason, (byReason.get(reason) || 0) + event.count);
|
|
}
|
|
|
|
const reasonSummary = Array.from(byReason.entries())
|
|
.sort((a, b) => b[1] - a[1])
|
|
.slice(0, 5) // Top 5 reasons
|
|
.map(([reason, count]) => `${reason}: ${count}`)
|
|
.join(', ');
|
|
|
|
logger.log('info', `Cleaned up ${totalCount} connections`, {
|
|
reasons: reasonSummary,
|
|
duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)),
|
|
component: 'connection-dedup'
|
|
});
|
|
}
|
|
|
|
private flushIPRejections(aggregated: IAggregatedEvent): void {
|
|
const byIP = new Map<string, { count: number; reasons: Set<string> }>();
|
|
|
|
for (const [ip, event] of aggregated.events) {
|
|
if (!byIP.has(ip)) {
|
|
byIP.set(ip, { count: 0, reasons: new Set() });
|
|
}
|
|
const ipData = byIP.get(ip)!;
|
|
ipData.count += event.count;
|
|
if (event.data?.reason) {
|
|
ipData.reasons.add(event.data.reason);
|
|
}
|
|
}
|
|
|
|
// Log top offenders
|
|
const topOffenders = Array.from(byIP.entries())
|
|
.sort((a, b) => b[1].count - a[1].count)
|
|
.slice(0, 10)
|
|
.map(([ip, data]) => `${ip} (${data.count}x, ${Array.from(data.reasons).join('/')})`)
|
|
.join(', ');
|
|
|
|
const totalRejections = Array.from(byIP.values()).reduce((sum, data) => sum + data.count, 0);
|
|
|
|
logger.log('warn', `Rejected ${totalRejections} connections from ${byIP.size} IPs`, {
|
|
topOffenders,
|
|
duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)),
|
|
component: 'ip-dedup'
|
|
});
|
|
}
|
|
|
|
private flushGeneric(aggregated: IAggregatedEvent): void {
|
|
const totalCount = Array.from(aggregated.events.values()).reduce((sum, e) => sum + e.count, 0);
|
|
const level = aggregated.events.values().next().value?.level || 'info';
|
|
|
|
// Special handling for IP cleanup events
|
|
if (aggregated.key === 'ip-cleanup') {
|
|
const totalCleaned = Array.from(aggregated.events.values()).reduce((sum, e) => {
|
|
return sum + (e.data?.cleanedIPs || 0) + (e.data?.cleanedRateLimits || 0);
|
|
}, 0);
|
|
|
|
if (totalCleaned > 0) {
|
|
logger.log(level as any, `IP tracking cleanup: removed ${totalCleaned} entries across ${totalCount} cleanup cycles`, {
|
|
duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)),
|
|
component: 'log-dedup'
|
|
});
|
|
}
|
|
} else {
|
|
logger.log(level as any, `${aggregated.key}: ${totalCount} events`, {
|
|
uniqueEvents: aggregated.events.size,
|
|
duration: Date.now() - Math.min(...Array.from(aggregated.events.values()).map(e => e.firstSeen)),
|
|
component: 'log-dedup'
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Cleanup and stop deduplication
|
|
*/
|
|
public cleanup(): void {
|
|
this.flushAll();
|
|
|
|
if (this.globalFlushTimer) {
|
|
clearInterval(this.globalFlushTimer);
|
|
this.globalFlushTimer = undefined;
|
|
}
|
|
|
|
for (const aggregated of this.aggregatedEvents.values()) {
|
|
if (aggregated.flushTimer) {
|
|
clearTimeout(aggregated.flushTimer);
|
|
}
|
|
}
|
|
this.aggregatedEvents.clear();
|
|
}
|
|
}
|
|
|
|
// Global instance for connection-related log deduplication
|
|
export const connectionLogDeduplicator = new LogDeduplicator(5000); // 5 second batches
|
|
|
|
// Ensure logs are flushed on process exit
|
|
process.on('beforeExit', () => {
|
|
connectionLogDeduplicator.flushAll();
|
|
});
|
|
|
|
process.on('SIGINT', () => {
|
|
connectionLogDeduplicator.cleanup();
|
|
process.exit(0);
|
|
});
|
|
|
|
process.on('SIGTERM', () => {
|
|
connectionLogDeduplicator.cleanup();
|
|
process.exit(0);
|
|
}); |