Files
onebox/ts/classes/caddy-log-receiver.ts
2025-11-26 12:16:50 +00:00

418 lines
11 KiB
TypeScript

/**
* Caddy Log Receiver for Onebox
*
* TCP server that receives access logs from Caddy and broadcasts them to WebSocket clients.
* Supports per-client filtering by domain and adaptive sampling at high volume.
*/
import { logger } from '../logging.ts';
import { getErrorMessage } from '../utils/error.ts';
/**
* Filter applied to a WebSocket client's log stream
*/
export interface ILogFilter {
domain?: string;
service?: string;
sampleRate?: number; // 0.01 to 1.0, default 1.0
}
/**
* Caddy access log entry structure (from Caddy JSON format)
*/
export interface ICaddyAccessLog {
ts: number;
level?: string;
logger?: string;
msg?: string;
request: {
remote_ip: string;
remote_port?: string;
client_ip?: string;
proto: string;
method: string;
host: string;
uri: string;
headers?: Record<string, string[]>;
tls?: {
resumed: boolean;
version: number;
cipher_suite: number;
proto: string;
server_name: string;
};
};
bytes_read?: number;
user_id?: string;
duration: number;
size: number;
status: number;
resp_headers?: Record<string, string[]>;
}
/**
* WebSocket client with filter
*/
interface ILogClient {
id: string;
ws: WebSocket;
filter: ILogFilter;
}
/**
* CaddyLogReceiver - TCP server for Caddy access logs
*/
export class CaddyLogReceiver {
private server: Deno.TcpListener | null = null;
private clients: Map<string, ILogClient> = new Map();
private port: number;
private running = false;
private connections: Set<Deno.TcpConn> = new Set();
// Adaptive sampling state
private logCountWindow: number[] = []; // timestamps of recent logs
private windowSize = 1000; // track last 1 second
private currentSampleRate = 1.0;
private logCounter = 0;
// Ring buffer for recent logs (for late-joining clients)
private recentLogs: ICaddyAccessLog[] = [];
private maxRecentLogs = 100;
constructor(port = 9999) {
this.port = port;
}
/**
* Start the TCP server
*/
async start(): Promise<void> {
if (this.running) {
logger.warn('CaddyLogReceiver is already running');
return;
}
try {
this.server = Deno.listen({ port: this.port, transport: 'tcp' });
this.running = true;
logger.success(`CaddyLogReceiver started on TCP port ${this.port}`);
// Start accepting connections in background
this.acceptConnections();
} catch (error) {
logger.error(`Failed to start CaddyLogReceiver: ${getErrorMessage(error)}`);
throw error;
}
}
/**
* Accept incoming TCP connections from Caddy
*/
private async acceptConnections(): Promise<void> {
if (!this.server) return;
try {
for await (const conn of this.server) {
this.connections.add(conn);
this.handleConnection(conn);
}
} catch (error) {
if (this.running) {
logger.error(`CaddyLogReceiver accept error: ${getErrorMessage(error)}`);
}
}
}
/**
* Handle a single TCP connection from Caddy
*/
private async handleConnection(conn: Deno.TcpConn): Promise<void> {
const remoteAddr = conn.remoteAddr as Deno.NetAddr;
logger.debug(`CaddyLogReceiver: Connection from ${remoteAddr.hostname}:${remoteAddr.port}`);
const reader = conn.readable.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process complete lines (Caddy sends newline-delimited JSON)
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
this.processLogLine(line);
}
}
}
} catch (error) {
if (this.running) {
logger.debug(`CaddyLogReceiver connection closed: ${getErrorMessage(error)}`);
}
} finally {
this.connections.delete(conn);
try {
conn.close();
} catch {
// Already closed
}
}
}
/**
* Process a single log line from Caddy
*/
private processLogLine(line: string): void {
try {
const log = JSON.parse(line) as ICaddyAccessLog;
// Only process access logs (check for http.log.access or just access, or any log with request/status)
const isAccessLog = log.logger === 'http.log.access' ||
log.logger === 'access' ||
(log.request && typeof log.status === 'number');
if (!isAccessLog) {
logger.debug(`CaddyLogReceiver: Skipping non-access log: ${log.logger || 'unknown'}`);
return;
}
// Update adaptive sampling
this.updateSampling();
// Apply global sampling (skip if randomly filtered out)
if (this.currentSampleRate < 1.0 && Math.random() > this.currentSampleRate) {
return;
}
logger.debug(`CaddyLogReceiver: Access log received - ${log.request?.method} ${log.request?.host}${log.request?.uri} (status: ${log.status})`);
// Add to recent logs buffer
this.recentLogs.push(log);
if (this.recentLogs.length > this.maxRecentLogs) {
this.recentLogs.shift();
}
// Broadcast to WebSocket clients (log how many clients)
logger.debug(`CaddyLogReceiver: Broadcasting to ${this.clients.size} clients`);
this.broadcast(log);
} catch (error) {
logger.debug(`Failed to parse Caddy log line: ${getErrorMessage(error)}`);
}
}
/**
* Update adaptive sampling rate based on log volume
*/
private updateSampling(): void {
const now = Date.now();
// Add current timestamp
this.logCountWindow.push(now);
// Remove timestamps older than 1 second
const cutoff = now - this.windowSize;
while (this.logCountWindow.length > 0 && this.logCountWindow[0] < cutoff) {
this.logCountWindow.shift();
}
// Calculate logs per second
const logsPerSecond = this.logCountWindow.length;
// Adjust sample rate based on volume
if (logsPerSecond > 1000) {
this.currentSampleRate = 0.01; // 1%
} else if (logsPerSecond > 500) {
this.currentSampleRate = 0.1; // 10%
} else if (logsPerSecond > 100) {
this.currentSampleRate = 0.5; // 50%
} else {
this.currentSampleRate = 1.0; // 100%
}
}
/**
* Broadcast a log entry to all connected WebSocket clients
*/
private broadcast(log: ICaddyAccessLog): void {
const message = JSON.stringify({
type: 'access_log',
data: {
ts: log.ts,
request: {
remote_ip: log.request.remote_ip,
method: log.request.method,
host: log.request.host,
uri: log.request.uri,
proto: log.request.proto,
},
status: log.status,
duration: log.duration,
size: log.size,
},
timestamp: Date.now(),
});
for (const [clientId, client] of this.clients) {
try {
// Apply client-specific filter
if (!this.matchesFilter(log, client.filter)) {
continue;
}
// Apply client-specific sample rate
if (client.filter.sampleRate && client.filter.sampleRate < 1.0) {
if (Math.random() > client.filter.sampleRate) {
continue;
}
}
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.send(message);
} else {
// Remove dead clients
this.clients.delete(clientId);
}
} catch {
this.clients.delete(clientId);
}
}
}
/**
* Check if a log entry matches a client's filter
*/
private matchesFilter(log: ICaddyAccessLog, filter: ILogFilter): boolean {
// Domain filter
if (filter.domain) {
const logHost = log.request.host.toLowerCase();
const filterDomain = filter.domain.toLowerCase();
// Match exact domain or subdomain
if (logHost !== filterDomain && !logHost.endsWith(`.${filterDomain}`)) {
return false;
}
}
return true;
}
/**
* Add a WebSocket client to receive logs
*/
addClient(clientId: string, ws: WebSocket, filter: ILogFilter = {}): void {
this.clients.set(clientId, { id: clientId, ws, filter });
logger.debug(`CaddyLogReceiver: Added client ${clientId} (${this.clients.size} total)`);
// Send recent logs to new client
for (const log of this.recentLogs) {
if (this.matchesFilter(log, filter)) {
try {
ws.send(
JSON.stringify({
type: 'access_log',
data: {
ts: log.ts,
request: {
remote_ip: log.request.remote_ip,
method: log.request.method,
host: log.request.host,
uri: log.request.uri,
proto: log.request.proto,
},
status: log.status,
duration: log.duration,
size: log.size,
},
timestamp: Date.now(),
}),
);
} catch {
// Client disconnected
}
}
}
}
/**
* Remove a WebSocket client
*/
removeClient(clientId: string): void {
if (this.clients.delete(clientId)) {
logger.debug(`CaddyLogReceiver: Removed client ${clientId} (${this.clients.size} remaining)`);
}
}
/**
* Update a client's filter
*/
updateClientFilter(clientId: string, filter: ILogFilter): void {
const client = this.clients.get(clientId);
if (client) {
client.filter = filter;
logger.debug(`CaddyLogReceiver: Updated filter for client ${clientId}`);
}
}
/**
* Stop the TCP server
*/
async stop(): Promise<void> {
if (!this.running) {
return;
}
this.running = false;
// Close all connections
for (const conn of this.connections) {
try {
conn.close();
} catch {
// Already closed
}
}
this.connections.clear();
// Close server
if (this.server) {
try {
this.server.close();
} catch {
// Already closed
}
this.server = null;
}
// Clear clients
this.clients.clear();
logger.info('CaddyLogReceiver stopped');
}
/**
* Get current stats
*/
getStats(): {
running: boolean;
port: number;
clients: number;
connections: number;
sampleRate: number;
recentLogsCount: number;
} {
return {
running: this.running,
port: this.port,
clients: this.clients.size,
connections: this.connections.size,
sampleRate: this.currentSampleRate,
recentLogsCount: this.recentLogs.length,
};
}
}