418 lines
11 KiB
TypeScript
418 lines
11 KiB
TypeScript
|
|
/**
|
||
|
|
* Caddy Log Receiver for Onebox
|
||
|
|
*
|
||
|
|
* TCP server that receives access logs from Caddy and broadcasts them to WebSocket clients.
|
||
|
|
* Supports per-client filtering by domain and adaptive sampling at high volume.
|
||
|
|
*/
|
||
|
|
|
||
|
|
import { logger } from '../logging.ts';
|
||
|
|
import { getErrorMessage } from '../utils/error.ts';
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Filter applied to a WebSocket client's log stream
|
||
|
|
*/
|
||
|
|
export interface ILogFilter {
|
||
|
|
domain?: string;
|
||
|
|
service?: string;
|
||
|
|
sampleRate?: number; // 0.01 to 1.0, default 1.0
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Caddy access log entry structure (from Caddy JSON format)
|
||
|
|
*/
|
||
|
|
export interface ICaddyAccessLog {
|
||
|
|
ts: number;
|
||
|
|
level?: string;
|
||
|
|
logger?: string;
|
||
|
|
msg?: string;
|
||
|
|
request: {
|
||
|
|
remote_ip: string;
|
||
|
|
remote_port?: string;
|
||
|
|
client_ip?: string;
|
||
|
|
proto: string;
|
||
|
|
method: string;
|
||
|
|
host: string;
|
||
|
|
uri: string;
|
||
|
|
headers?: Record<string, string[]>;
|
||
|
|
tls?: {
|
||
|
|
resumed: boolean;
|
||
|
|
version: number;
|
||
|
|
cipher_suite: number;
|
||
|
|
proto: string;
|
||
|
|
server_name: string;
|
||
|
|
};
|
||
|
|
};
|
||
|
|
bytes_read?: number;
|
||
|
|
user_id?: string;
|
||
|
|
duration: number;
|
||
|
|
size: number;
|
||
|
|
status: number;
|
||
|
|
resp_headers?: Record<string, string[]>;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* WebSocket client with filter
|
||
|
|
*/
|
||
|
|
interface ILogClient {
|
||
|
|
id: string;
|
||
|
|
ws: WebSocket;
|
||
|
|
filter: ILogFilter;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* CaddyLogReceiver - TCP server for Caddy access logs
|
||
|
|
*/
|
||
|
|
export class CaddyLogReceiver {
|
||
|
|
private server: Deno.TcpListener | null = null;
|
||
|
|
private clients: Map<string, ILogClient> = new Map();
|
||
|
|
private port: number;
|
||
|
|
private running = false;
|
||
|
|
private connections: Set<Deno.TcpConn> = new Set();
|
||
|
|
|
||
|
|
// Adaptive sampling state
|
||
|
|
private logCountWindow: number[] = []; // timestamps of recent logs
|
||
|
|
private windowSize = 1000; // track last 1 second
|
||
|
|
private currentSampleRate = 1.0;
|
||
|
|
private logCounter = 0;
|
||
|
|
|
||
|
|
// Ring buffer for recent logs (for late-joining clients)
|
||
|
|
private recentLogs: ICaddyAccessLog[] = [];
|
||
|
|
private maxRecentLogs = 100;
|
||
|
|
|
||
|
|
constructor(port = 9999) {
|
||
|
|
this.port = port;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Start the TCP server
|
||
|
|
*/
|
||
|
|
async start(): Promise<void> {
|
||
|
|
if (this.running) {
|
||
|
|
logger.warn('CaddyLogReceiver is already running');
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
try {
|
||
|
|
this.server = Deno.listen({ port: this.port, transport: 'tcp' });
|
||
|
|
this.running = true;
|
||
|
|
logger.success(`CaddyLogReceiver started on TCP port ${this.port}`);
|
||
|
|
|
||
|
|
// Start accepting connections in background
|
||
|
|
this.acceptConnections();
|
||
|
|
} catch (error) {
|
||
|
|
logger.error(`Failed to start CaddyLogReceiver: ${getErrorMessage(error)}`);
|
||
|
|
throw error;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Accept incoming TCP connections from Caddy
|
||
|
|
*/
|
||
|
|
private async acceptConnections(): Promise<void> {
|
||
|
|
if (!this.server) return;
|
||
|
|
|
||
|
|
try {
|
||
|
|
for await (const conn of this.server) {
|
||
|
|
this.connections.add(conn);
|
||
|
|
this.handleConnection(conn);
|
||
|
|
}
|
||
|
|
} catch (error) {
|
||
|
|
if (this.running) {
|
||
|
|
logger.error(`CaddyLogReceiver accept error: ${getErrorMessage(error)}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Handle a single TCP connection from Caddy
|
||
|
|
*/
|
||
|
|
private async handleConnection(conn: Deno.TcpConn): Promise<void> {
|
||
|
|
const remoteAddr = conn.remoteAddr as Deno.NetAddr;
|
||
|
|
logger.debug(`CaddyLogReceiver: Connection from ${remoteAddr.hostname}:${remoteAddr.port}`);
|
||
|
|
|
||
|
|
const reader = conn.readable.getReader();
|
||
|
|
const decoder = new TextDecoder();
|
||
|
|
let buffer = '';
|
||
|
|
|
||
|
|
try {
|
||
|
|
while (true) {
|
||
|
|
const { done, value } = await reader.read();
|
||
|
|
if (done) break;
|
||
|
|
|
||
|
|
buffer += decoder.decode(value, { stream: true });
|
||
|
|
|
||
|
|
// Process complete lines (Caddy sends newline-delimited JSON)
|
||
|
|
const lines = buffer.split('\n');
|
||
|
|
buffer = lines.pop() || ''; // Keep incomplete line in buffer
|
||
|
|
|
||
|
|
for (const line of lines) {
|
||
|
|
if (line.trim()) {
|
||
|
|
this.processLogLine(line);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
} catch (error) {
|
||
|
|
if (this.running) {
|
||
|
|
logger.debug(`CaddyLogReceiver connection closed: ${getErrorMessage(error)}`);
|
||
|
|
}
|
||
|
|
} finally {
|
||
|
|
this.connections.delete(conn);
|
||
|
|
try {
|
||
|
|
conn.close();
|
||
|
|
} catch {
|
||
|
|
// Already closed
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Process a single log line from Caddy
|
||
|
|
*/
|
||
|
|
private processLogLine(line: string): void {
|
||
|
|
try {
|
||
|
|
const log = JSON.parse(line) as ICaddyAccessLog;
|
||
|
|
|
||
|
|
// Only process access logs (check for http.log.access or just access, or any log with request/status)
|
||
|
|
const isAccessLog = log.logger === 'http.log.access' ||
|
||
|
|
log.logger === 'access' ||
|
||
|
|
(log.request && typeof log.status === 'number');
|
||
|
|
if (!isAccessLog) {
|
||
|
|
logger.debug(`CaddyLogReceiver: Skipping non-access log: ${log.logger || 'unknown'}`);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Update adaptive sampling
|
||
|
|
this.updateSampling();
|
||
|
|
|
||
|
|
// Apply global sampling (skip if randomly filtered out)
|
||
|
|
if (this.currentSampleRate < 1.0 && Math.random() > this.currentSampleRate) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
logger.debug(`CaddyLogReceiver: Access log received - ${log.request?.method} ${log.request?.host}${log.request?.uri} (status: ${log.status})`);
|
||
|
|
|
||
|
|
// Add to recent logs buffer
|
||
|
|
this.recentLogs.push(log);
|
||
|
|
if (this.recentLogs.length > this.maxRecentLogs) {
|
||
|
|
this.recentLogs.shift();
|
||
|
|
}
|
||
|
|
|
||
|
|
// Broadcast to WebSocket clients (log how many clients)
|
||
|
|
logger.debug(`CaddyLogReceiver: Broadcasting to ${this.clients.size} clients`);
|
||
|
|
this.broadcast(log);
|
||
|
|
} catch (error) {
|
||
|
|
logger.debug(`Failed to parse Caddy log line: ${getErrorMessage(error)}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Update adaptive sampling rate based on log volume
|
||
|
|
*/
|
||
|
|
private updateSampling(): void {
|
||
|
|
const now = Date.now();
|
||
|
|
|
||
|
|
// Add current timestamp
|
||
|
|
this.logCountWindow.push(now);
|
||
|
|
|
||
|
|
// Remove timestamps older than 1 second
|
||
|
|
const cutoff = now - this.windowSize;
|
||
|
|
while (this.logCountWindow.length > 0 && this.logCountWindow[0] < cutoff) {
|
||
|
|
this.logCountWindow.shift();
|
||
|
|
}
|
||
|
|
|
||
|
|
// Calculate logs per second
|
||
|
|
const logsPerSecond = this.logCountWindow.length;
|
||
|
|
|
||
|
|
// Adjust sample rate based on volume
|
||
|
|
if (logsPerSecond > 1000) {
|
||
|
|
this.currentSampleRate = 0.01; // 1%
|
||
|
|
} else if (logsPerSecond > 500) {
|
||
|
|
this.currentSampleRate = 0.1; // 10%
|
||
|
|
} else if (logsPerSecond > 100) {
|
||
|
|
this.currentSampleRate = 0.5; // 50%
|
||
|
|
} else {
|
||
|
|
this.currentSampleRate = 1.0; // 100%
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Broadcast a log entry to all connected WebSocket clients
|
||
|
|
*/
|
||
|
|
private broadcast(log: ICaddyAccessLog): void {
|
||
|
|
const message = JSON.stringify({
|
||
|
|
type: 'access_log',
|
||
|
|
data: {
|
||
|
|
ts: log.ts,
|
||
|
|
request: {
|
||
|
|
remote_ip: log.request.remote_ip,
|
||
|
|
method: log.request.method,
|
||
|
|
host: log.request.host,
|
||
|
|
uri: log.request.uri,
|
||
|
|
proto: log.request.proto,
|
||
|
|
},
|
||
|
|
status: log.status,
|
||
|
|
duration: log.duration,
|
||
|
|
size: log.size,
|
||
|
|
},
|
||
|
|
timestamp: Date.now(),
|
||
|
|
});
|
||
|
|
|
||
|
|
for (const [clientId, client] of this.clients) {
|
||
|
|
try {
|
||
|
|
// Apply client-specific filter
|
||
|
|
if (!this.matchesFilter(log, client.filter)) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Apply client-specific sample rate
|
||
|
|
if (client.filter.sampleRate && client.filter.sampleRate < 1.0) {
|
||
|
|
if (Math.random() > client.filter.sampleRate) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if (client.ws.readyState === WebSocket.OPEN) {
|
||
|
|
client.ws.send(message);
|
||
|
|
} else {
|
||
|
|
// Remove dead clients
|
||
|
|
this.clients.delete(clientId);
|
||
|
|
}
|
||
|
|
} catch {
|
||
|
|
this.clients.delete(clientId);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Check if a log entry matches a client's filter
|
||
|
|
*/
|
||
|
|
private matchesFilter(log: ICaddyAccessLog, filter: ILogFilter): boolean {
|
||
|
|
// Domain filter
|
||
|
|
if (filter.domain) {
|
||
|
|
const logHost = log.request.host.toLowerCase();
|
||
|
|
const filterDomain = filter.domain.toLowerCase();
|
||
|
|
|
||
|
|
// Match exact domain or subdomain
|
||
|
|
if (logHost !== filterDomain && !logHost.endsWith(`.${filterDomain}`)) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Add a WebSocket client to receive logs
|
||
|
|
*/
|
||
|
|
addClient(clientId: string, ws: WebSocket, filter: ILogFilter = {}): void {
|
||
|
|
this.clients.set(clientId, { id: clientId, ws, filter });
|
||
|
|
logger.debug(`CaddyLogReceiver: Added client ${clientId} (${this.clients.size} total)`);
|
||
|
|
|
||
|
|
// Send recent logs to new client
|
||
|
|
for (const log of this.recentLogs) {
|
||
|
|
if (this.matchesFilter(log, filter)) {
|
||
|
|
try {
|
||
|
|
ws.send(
|
||
|
|
JSON.stringify({
|
||
|
|
type: 'access_log',
|
||
|
|
data: {
|
||
|
|
ts: log.ts,
|
||
|
|
request: {
|
||
|
|
remote_ip: log.request.remote_ip,
|
||
|
|
method: log.request.method,
|
||
|
|
host: log.request.host,
|
||
|
|
uri: log.request.uri,
|
||
|
|
proto: log.request.proto,
|
||
|
|
},
|
||
|
|
status: log.status,
|
||
|
|
duration: log.duration,
|
||
|
|
size: log.size,
|
||
|
|
},
|
||
|
|
timestamp: Date.now(),
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
} catch {
|
||
|
|
// Client disconnected
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Remove a WebSocket client
|
||
|
|
*/
|
||
|
|
removeClient(clientId: string): void {
|
||
|
|
if (this.clients.delete(clientId)) {
|
||
|
|
logger.debug(`CaddyLogReceiver: Removed client ${clientId} (${this.clients.size} remaining)`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Update a client's filter
|
||
|
|
*/
|
||
|
|
updateClientFilter(clientId: string, filter: ILogFilter): void {
|
||
|
|
const client = this.clients.get(clientId);
|
||
|
|
if (client) {
|
||
|
|
client.filter = filter;
|
||
|
|
logger.debug(`CaddyLogReceiver: Updated filter for client ${clientId}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Stop the TCP server
|
||
|
|
*/
|
||
|
|
async stop(): Promise<void> {
|
||
|
|
if (!this.running) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
this.running = false;
|
||
|
|
|
||
|
|
// Close all connections
|
||
|
|
for (const conn of this.connections) {
|
||
|
|
try {
|
||
|
|
conn.close();
|
||
|
|
} catch {
|
||
|
|
// Already closed
|
||
|
|
}
|
||
|
|
}
|
||
|
|
this.connections.clear();
|
||
|
|
|
||
|
|
// Close server
|
||
|
|
if (this.server) {
|
||
|
|
try {
|
||
|
|
this.server.close();
|
||
|
|
} catch {
|
||
|
|
// Already closed
|
||
|
|
}
|
||
|
|
this.server = null;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Clear clients
|
||
|
|
this.clients.clear();
|
||
|
|
|
||
|
|
logger.info('CaddyLogReceiver stopped');
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Get current stats
|
||
|
|
*/
|
||
|
|
getStats(): {
|
||
|
|
running: boolean;
|
||
|
|
port: number;
|
||
|
|
clients: number;
|
||
|
|
connections: number;
|
||
|
|
sampleRate: number;
|
||
|
|
recentLogsCount: number;
|
||
|
|
} {
|
||
|
|
return {
|
||
|
|
running: this.running,
|
||
|
|
port: this.port,
|
||
|
|
clients: this.clients.size,
|
||
|
|
connections: this.connections.size,
|
||
|
|
sampleRate: this.currentSampleRate,
|
||
|
|
recentLogsCount: this.recentLogs.length,
|
||
|
|
};
|
||
|
|
}
|
||
|
|
}
|