/** * Caddy Log Receiver for Onebox * * TCP server that receives access logs from Caddy and broadcasts them to WebSocket clients. * Supports per-client filtering by domain and adaptive sampling at high volume. */ import { logger } from '../logging.ts'; import { getErrorMessage } from '../utils/error.ts'; /** * Filter applied to a WebSocket client's log stream */ export interface ILogFilter { domain?: string; service?: string; sampleRate?: number; // 0.01 to 1.0, default 1.0 } /** * Caddy access log entry structure (from Caddy JSON format) */ export interface ICaddyAccessLog { ts: number; level?: string; logger?: string; msg?: string; request: { remote_ip: string; remote_port?: string; client_ip?: string; proto: string; method: string; host: string; uri: string; headers?: Record; tls?: { resumed: boolean; version: number; cipher_suite: number; proto: string; server_name: string; }; }; bytes_read?: number; user_id?: string; duration: number; size: number; status: number; resp_headers?: Record; } /** * WebSocket client with filter */ interface ILogClient { id: string; ws: WebSocket; filter: ILogFilter; } /** * CaddyLogReceiver - TCP server for Caddy access logs */ export class CaddyLogReceiver { private server: Deno.TcpListener | null = null; private clients: Map = new Map(); private port: number; private running = false; private connections: Set = new Set(); // Adaptive sampling state private logCountWindow: number[] = []; // timestamps of recent logs private windowSize = 1000; // track last 1 second private currentSampleRate = 1.0; private logCounter = 0; // Ring buffer for recent logs (for late-joining clients) private recentLogs: ICaddyAccessLog[] = []; private maxRecentLogs = 100; constructor(port = 9999) { this.port = port; } /** * Start the TCP server */ async start(): Promise { if (this.running) { logger.warn('CaddyLogReceiver is already running'); return; } try { this.server = Deno.listen({ port: this.port, transport: 'tcp' }); this.running = true; logger.success(`CaddyLogReceiver started on TCP port ${this.port}`); // Start accepting connections in background this.acceptConnections(); } catch (error) { logger.error(`Failed to start CaddyLogReceiver: ${getErrorMessage(error)}`); throw error; } } /** * Accept incoming TCP connections from Caddy */ private async acceptConnections(): Promise { if (!this.server) return; try { for await (const conn of this.server) { this.connections.add(conn); this.handleConnection(conn); } } catch (error) { if (this.running) { logger.error(`CaddyLogReceiver accept error: ${getErrorMessage(error)}`); } } } /** * Handle a single TCP connection from Caddy */ private async handleConnection(conn: Deno.TcpConn): Promise { const remoteAddr = conn.remoteAddr as Deno.NetAddr; logger.debug(`CaddyLogReceiver: Connection from ${remoteAddr.hostname}:${remoteAddr.port}`); const reader = conn.readable.getReader(); const decoder = new TextDecoder(); let buffer = ''; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); // Process complete lines (Caddy sends newline-delimited JSON) const lines = buffer.split('\n'); buffer = lines.pop() || ''; // Keep incomplete line in buffer for (const line of lines) { if (line.trim()) { this.processLogLine(line); } } } } catch (error) { if (this.running) { logger.debug(`CaddyLogReceiver connection closed: ${getErrorMessage(error)}`); } } finally { this.connections.delete(conn); try { conn.close(); } catch { // Already closed } } } /** * Process a single log line from Caddy */ private processLogLine(line: string): void { try { const log = JSON.parse(line) as ICaddyAccessLog; // Only process access logs (check for http.log.access or just access, or any log with request/status) const isAccessLog = log.logger === 'http.log.access' || log.logger === 'access' || (log.request && typeof log.status === 'number'); if (!isAccessLog) { logger.debug(`CaddyLogReceiver: Skipping non-access log: ${log.logger || 'unknown'}`); return; } // Update adaptive sampling this.updateSampling(); // Apply global sampling (skip if randomly filtered out) if (this.currentSampleRate < 1.0 && Math.random() > this.currentSampleRate) { return; } logger.debug(`CaddyLogReceiver: Access log received - ${log.request?.method} ${log.request?.host}${log.request?.uri} (status: ${log.status})`); // Add to recent logs buffer this.recentLogs.push(log); if (this.recentLogs.length > this.maxRecentLogs) { this.recentLogs.shift(); } // Broadcast to WebSocket clients (log how many clients) logger.debug(`CaddyLogReceiver: Broadcasting to ${this.clients.size} clients`); this.broadcast(log); } catch (error) { logger.debug(`Failed to parse Caddy log line: ${getErrorMessage(error)}`); } } /** * Update adaptive sampling rate based on log volume */ private updateSampling(): void { const now = Date.now(); // Add current timestamp this.logCountWindow.push(now); // Remove timestamps older than 1 second const cutoff = now - this.windowSize; while (this.logCountWindow.length > 0 && this.logCountWindow[0] < cutoff) { this.logCountWindow.shift(); } // Calculate logs per second const logsPerSecond = this.logCountWindow.length; // Adjust sample rate based on volume if (logsPerSecond > 1000) { this.currentSampleRate = 0.01; // 1% } else if (logsPerSecond > 500) { this.currentSampleRate = 0.1; // 10% } else if (logsPerSecond > 100) { this.currentSampleRate = 0.5; // 50% } else { this.currentSampleRate = 1.0; // 100% } } /** * Broadcast a log entry to all connected WebSocket clients */ private broadcast(log: ICaddyAccessLog): void { const message = JSON.stringify({ type: 'access_log', data: { ts: log.ts, request: { remote_ip: log.request.remote_ip, method: log.request.method, host: log.request.host, uri: log.request.uri, proto: log.request.proto, }, status: log.status, duration: log.duration, size: log.size, }, timestamp: Date.now(), }); for (const [clientId, client] of this.clients) { try { // Apply client-specific filter if (!this.matchesFilter(log, client.filter)) { continue; } // Apply client-specific sample rate if (client.filter.sampleRate && client.filter.sampleRate < 1.0) { if (Math.random() > client.filter.sampleRate) { continue; } } if (client.ws.readyState === WebSocket.OPEN) { client.ws.send(message); } else { // Remove dead clients this.clients.delete(clientId); } } catch { this.clients.delete(clientId); } } } /** * Check if a log entry matches a client's filter */ private matchesFilter(log: ICaddyAccessLog, filter: ILogFilter): boolean { // Domain filter if (filter.domain) { const logHost = log.request.host.toLowerCase(); const filterDomain = filter.domain.toLowerCase(); // Match exact domain or subdomain if (logHost !== filterDomain && !logHost.endsWith(`.${filterDomain}`)) { return false; } } return true; } /** * Add a WebSocket client to receive logs */ addClient(clientId: string, ws: WebSocket, filter: ILogFilter = {}): void { this.clients.set(clientId, { id: clientId, ws, filter }); logger.debug(`CaddyLogReceiver: Added client ${clientId} (${this.clients.size} total)`); // Send recent logs to new client for (const log of this.recentLogs) { if (this.matchesFilter(log, filter)) { try { ws.send( JSON.stringify({ type: 'access_log', data: { ts: log.ts, request: { remote_ip: log.request.remote_ip, method: log.request.method, host: log.request.host, uri: log.request.uri, proto: log.request.proto, }, status: log.status, duration: log.duration, size: log.size, }, timestamp: Date.now(), }), ); } catch { // Client disconnected } } } } /** * Remove a WebSocket client */ removeClient(clientId: string): void { if (this.clients.delete(clientId)) { logger.debug(`CaddyLogReceiver: Removed client ${clientId} (${this.clients.size} remaining)`); } } /** * Update a client's filter */ updateClientFilter(clientId: string, filter: ILogFilter): void { const client = this.clients.get(clientId); if (client) { client.filter = filter; logger.debug(`CaddyLogReceiver: Updated filter for client ${clientId}`); } } /** * Stop the TCP server */ async stop(): Promise { if (!this.running) { return; } this.running = false; // Close all connections for (const conn of this.connections) { try { conn.close(); } catch { // Already closed } } this.connections.clear(); // Close server if (this.server) { try { this.server.close(); } catch { // Already closed } this.server = null; } // Clear clients this.clients.clear(); logger.info('CaddyLogReceiver stopped'); } /** * Get current stats */ getStats(): { running: boolean; port: number; clients: number; connections: number; sampleRate: number; recentLogsCount: number; } { return { running: this.running, port: this.port, clients: this.clients.size, connections: this.connections.size, sampleRate: this.currentSampleRate, recentLogsCount: this.recentLogs.length, }; } }