use dashmap::DashMap; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use tracing::info; /// An aggregated event during the deduplication window. struct AggregatedEvent { category: String, first_message: String, count: AtomicU64, first_seen: Instant, #[allow(dead_code)] last_seen: Instant, } /// Log deduplicator that batches similar events over a time window. /// /// Events are grouped by a composite key of `category:key`. Within each /// deduplication window (`flush_interval`) identical events are counted /// instead of being emitted individually. When the window expires (or the /// batch reaches `max_batch_size`) a single summary line is written via /// `tracing::info!`. pub struct LogDeduplicator { events: DashMap, flush_interval: Duration, max_batch_size: u64, #[allow(dead_code)] rapid_threshold: u64, // events/sec that triggers immediate flush } impl LogDeduplicator { pub fn new() -> Self { Self { events: DashMap::new(), flush_interval: Duration::from_secs(5), max_batch_size: 100, rapid_threshold: 50, } } /// Log an event, deduplicating by `category` + `key`. /// /// If the batch for this composite key reaches `max_batch_size` the /// accumulated events are flushed immediately. pub fn log(&self, category: &str, key: &str, message: &str) { let map_key = format!("{}:{}", category, key); let now = Instant::now(); let entry = self.events.entry(map_key).or_insert_with(|| AggregatedEvent { category: category.to_string(), first_message: message.to_string(), count: AtomicU64::new(0), first_seen: now, last_seen: now, }); let count = entry.count.fetch_add(1, Ordering::Relaxed) + 1; // Check if we should flush (batch size exceeded) if count >= self.max_batch_size { drop(entry); self.flush(); } } /// Flush all accumulated events, emitting summary log lines. pub fn flush(&self) { // Collect and remove all events self.events.retain(|_key, event| { let count = event.count.load(Ordering::Relaxed); if count > 0 { let elapsed = event.first_seen.elapsed(); if count == 1 { info!("[{}] {}", event.category, event.first_message); } else { info!( "[SUMMARY] {} {} events in {:.1}s: {}", count, event.category, elapsed.as_secs_f64(), event.first_message ); } } false // remove all entries after flushing }); } /// Start a background flush task that periodically drains accumulated /// events. The task runs until the supplied `CancellationToken` is /// cancelled, at which point it performs one final flush before exiting. pub fn start_flush_task(self: &Arc, cancel: tokio_util::sync::CancellationToken) { let dedup = Arc::clone(self); let interval = self.flush_interval; tokio::spawn(async move { loop { tokio::select! { _ = cancel.cancelled() => { dedup.flush(); break; } _ = tokio::time::sleep(interval) => { dedup.flush(); } } } }); } } impl Default for LogDeduplicator { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; #[test] fn test_single_event_emitted_as_is() { let dedup = LogDeduplicator::new(); dedup.log("conn", "open", "connection opened from 1.2.3.4"); // One event should exist assert_eq!(dedup.events.len(), 1); let entry = dedup.events.get("conn:open").unwrap(); assert_eq!(entry.count.load(Ordering::Relaxed), 1); assert_eq!(entry.first_message, "connection opened from 1.2.3.4"); drop(entry); dedup.flush(); // After flush, map should be empty assert_eq!(dedup.events.len(), 0); } #[test] fn test_duplicate_events_aggregated() { let dedup = LogDeduplicator::new(); for _ in 0..10 { dedup.log("conn", "timeout", "connection timed out"); } assert_eq!(dedup.events.len(), 1); let entry = dedup.events.get("conn:timeout").unwrap(); assert_eq!(entry.count.load(Ordering::Relaxed), 10); drop(entry); dedup.flush(); assert_eq!(dedup.events.len(), 0); } #[test] fn test_different_keys_separate() { let dedup = LogDeduplicator::new(); dedup.log("conn", "open", "opened"); dedup.log("conn", "close", "closed"); dedup.log("tls", "handshake", "TLS handshake"); assert_eq!(dedup.events.len(), 3); dedup.flush(); assert_eq!(dedup.events.len(), 0); } #[test] fn test_flush_clears_events() { let dedup = LogDeduplicator::new(); dedup.log("a", "b", "msg1"); dedup.log("a", "b", "msg2"); dedup.flush(); assert_eq!(dedup.events.len(), 0); // Logging after flush creates a new entry dedup.log("a", "b", "msg3"); assert_eq!(dedup.events.len(), 1); let entry = dedup.events.get("a:b").unwrap(); assert_eq!(entry.count.load(Ordering::Relaxed), 1); assert_eq!(entry.first_message, "msg3"); } #[test] fn test_max_batch_triggers_flush() { let dedup = LogDeduplicator::new(); // max_batch_size defaults to 100 for i in 0..100 { dedup.log("flood", "key", &format!("event {}", i)); } // After hitting max_batch_size the events map should have been flushed assert_eq!(dedup.events.len(), 0); } #[test] fn test_default_trait() { let dedup = LogDeduplicator::default(); assert_eq!(dedup.flush_interval, Duration::from_secs(5)); assert_eq!(dedup.max_batch_size, 100); } #[tokio::test] async fn test_background_flush_task() { let dedup = Arc::new(LogDeduplicator { events: DashMap::new(), flush_interval: Duration::from_millis(50), max_batch_size: 100, rapid_threshold: 50, }); let cancel = tokio_util::sync::CancellationToken::new(); dedup.start_flush_task(cancel.clone()); // Log some events dedup.log("bg", "test", "background flush test"); assert_eq!(dedup.events.len(), 1); // Wait for the background task to flush tokio::time::sleep(Duration::from_millis(100)).await; assert_eq!(dedup.events.len(), 0); // Cancel the task cancel.cancel(); tokio::time::sleep(Duration::from_millis(20)).await; } }