update
This commit is contained in:
417
ts/classes/caddy-log-receiver.ts
Normal file
417
ts/classes/caddy-log-receiver.ts
Normal file
@@ -0,0 +1,417 @@
|
||||
/**
|
||||
* Caddy Log Receiver for Onebox
|
||||
*
|
||||
* TCP server that receives access logs from Caddy and broadcasts them to WebSocket clients.
|
||||
* Supports per-client filtering by domain and adaptive sampling at high volume.
|
||||
*/
|
||||
|
||||
import { logger } from '../logging.ts';
|
||||
import { getErrorMessage } from '../utils/error.ts';
|
||||
|
||||
/**
|
||||
* Filter applied to a WebSocket client's log stream
|
||||
*/
|
||||
export interface ILogFilter {
|
||||
domain?: string;
|
||||
service?: string;
|
||||
sampleRate?: number; // 0.01 to 1.0, default 1.0
|
||||
}
|
||||
|
||||
/**
|
||||
* Caddy access log entry structure (from Caddy JSON format)
|
||||
*/
|
||||
export interface ICaddyAccessLog {
|
||||
ts: number;
|
||||
level?: string;
|
||||
logger?: string;
|
||||
msg?: string;
|
||||
request: {
|
||||
remote_ip: string;
|
||||
remote_port?: string;
|
||||
client_ip?: string;
|
||||
proto: string;
|
||||
method: string;
|
||||
host: string;
|
||||
uri: string;
|
||||
headers?: Record<string, string[]>;
|
||||
tls?: {
|
||||
resumed: boolean;
|
||||
version: number;
|
||||
cipher_suite: number;
|
||||
proto: string;
|
||||
server_name: string;
|
||||
};
|
||||
};
|
||||
bytes_read?: number;
|
||||
user_id?: string;
|
||||
duration: number;
|
||||
size: number;
|
||||
status: number;
|
||||
resp_headers?: Record<string, string[]>;
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket client with filter
|
||||
*/
|
||||
interface ILogClient {
|
||||
id: string;
|
||||
ws: WebSocket;
|
||||
filter: ILogFilter;
|
||||
}
|
||||
|
||||
/**
|
||||
* CaddyLogReceiver - TCP server for Caddy access logs
|
||||
*/
|
||||
export class CaddyLogReceiver {
|
||||
private server: Deno.TcpListener | null = null;
|
||||
private clients: Map<string, ILogClient> = new Map();
|
||||
private port: number;
|
||||
private running = false;
|
||||
private connections: Set<Deno.TcpConn> = new Set();
|
||||
|
||||
// Adaptive sampling state
|
||||
private logCountWindow: number[] = []; // timestamps of recent logs
|
||||
private windowSize = 1000; // track last 1 second
|
||||
private currentSampleRate = 1.0;
|
||||
private logCounter = 0;
|
||||
|
||||
// Ring buffer for recent logs (for late-joining clients)
|
||||
private recentLogs: ICaddyAccessLog[] = [];
|
||||
private maxRecentLogs = 100;
|
||||
|
||||
constructor(port = 9999) {
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the TCP server
|
||||
*/
|
||||
async start(): Promise<void> {
|
||||
if (this.running) {
|
||||
logger.warn('CaddyLogReceiver is already running');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.server = Deno.listen({ port: this.port, transport: 'tcp' });
|
||||
this.running = true;
|
||||
logger.success(`CaddyLogReceiver started on TCP port ${this.port}`);
|
||||
|
||||
// Start accepting connections in background
|
||||
this.acceptConnections();
|
||||
} catch (error) {
|
||||
logger.error(`Failed to start CaddyLogReceiver: ${getErrorMessage(error)}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Accept incoming TCP connections from Caddy
|
||||
*/
|
||||
private async acceptConnections(): Promise<void> {
|
||||
if (!this.server) return;
|
||||
|
||||
try {
|
||||
for await (const conn of this.server) {
|
||||
this.connections.add(conn);
|
||||
this.handleConnection(conn);
|
||||
}
|
||||
} catch (error) {
|
||||
if (this.running) {
|
||||
logger.error(`CaddyLogReceiver accept error: ${getErrorMessage(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a single TCP connection from Caddy
|
||||
*/
|
||||
private async handleConnection(conn: Deno.TcpConn): Promise<void> {
|
||||
const remoteAddr = conn.remoteAddr as Deno.NetAddr;
|
||||
logger.debug(`CaddyLogReceiver: Connection from ${remoteAddr.hostname}:${remoteAddr.port}`);
|
||||
|
||||
const reader = conn.readable.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Process complete lines (Caddy sends newline-delimited JSON)
|
||||
const lines = buffer.split('\n');
|
||||
buffer = lines.pop() || ''; // Keep incomplete line in buffer
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.trim()) {
|
||||
this.processLogLine(line);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
if (this.running) {
|
||||
logger.debug(`CaddyLogReceiver connection closed: ${getErrorMessage(error)}`);
|
||||
}
|
||||
} finally {
|
||||
this.connections.delete(conn);
|
||||
try {
|
||||
conn.close();
|
||||
} catch {
|
||||
// Already closed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single log line from Caddy
|
||||
*/
|
||||
private processLogLine(line: string): void {
|
||||
try {
|
||||
const log = JSON.parse(line) as ICaddyAccessLog;
|
||||
|
||||
// Only process access logs (check for http.log.access or just access, or any log with request/status)
|
||||
const isAccessLog = log.logger === 'http.log.access' ||
|
||||
log.logger === 'access' ||
|
||||
(log.request && typeof log.status === 'number');
|
||||
if (!isAccessLog) {
|
||||
logger.debug(`CaddyLogReceiver: Skipping non-access log: ${log.logger || 'unknown'}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Update adaptive sampling
|
||||
this.updateSampling();
|
||||
|
||||
// Apply global sampling (skip if randomly filtered out)
|
||||
if (this.currentSampleRate < 1.0 && Math.random() > this.currentSampleRate) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug(`CaddyLogReceiver: Access log received - ${log.request?.method} ${log.request?.host}${log.request?.uri} (status: ${log.status})`);
|
||||
|
||||
// Add to recent logs buffer
|
||||
this.recentLogs.push(log);
|
||||
if (this.recentLogs.length > this.maxRecentLogs) {
|
||||
this.recentLogs.shift();
|
||||
}
|
||||
|
||||
// Broadcast to WebSocket clients (log how many clients)
|
||||
logger.debug(`CaddyLogReceiver: Broadcasting to ${this.clients.size} clients`);
|
||||
this.broadcast(log);
|
||||
} catch (error) {
|
||||
logger.debug(`Failed to parse Caddy log line: ${getErrorMessage(error)}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update adaptive sampling rate based on log volume
|
||||
*/
|
||||
private updateSampling(): void {
|
||||
const now = Date.now();
|
||||
|
||||
// Add current timestamp
|
||||
this.logCountWindow.push(now);
|
||||
|
||||
// Remove timestamps older than 1 second
|
||||
const cutoff = now - this.windowSize;
|
||||
while (this.logCountWindow.length > 0 && this.logCountWindow[0] < cutoff) {
|
||||
this.logCountWindow.shift();
|
||||
}
|
||||
|
||||
// Calculate logs per second
|
||||
const logsPerSecond = this.logCountWindow.length;
|
||||
|
||||
// Adjust sample rate based on volume
|
||||
if (logsPerSecond > 1000) {
|
||||
this.currentSampleRate = 0.01; // 1%
|
||||
} else if (logsPerSecond > 500) {
|
||||
this.currentSampleRate = 0.1; // 10%
|
||||
} else if (logsPerSecond > 100) {
|
||||
this.currentSampleRate = 0.5; // 50%
|
||||
} else {
|
||||
this.currentSampleRate = 1.0; // 100%
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast a log entry to all connected WebSocket clients
|
||||
*/
|
||||
private broadcast(log: ICaddyAccessLog): void {
|
||||
const message = JSON.stringify({
|
||||
type: 'access_log',
|
||||
data: {
|
||||
ts: log.ts,
|
||||
request: {
|
||||
remote_ip: log.request.remote_ip,
|
||||
method: log.request.method,
|
||||
host: log.request.host,
|
||||
uri: log.request.uri,
|
||||
proto: log.request.proto,
|
||||
},
|
||||
status: log.status,
|
||||
duration: log.duration,
|
||||
size: log.size,
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
for (const [clientId, client] of this.clients) {
|
||||
try {
|
||||
// Apply client-specific filter
|
||||
if (!this.matchesFilter(log, client.filter)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Apply client-specific sample rate
|
||||
if (client.filter.sampleRate && client.filter.sampleRate < 1.0) {
|
||||
if (Math.random() > client.filter.sampleRate) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (client.ws.readyState === WebSocket.OPEN) {
|
||||
client.ws.send(message);
|
||||
} else {
|
||||
// Remove dead clients
|
||||
this.clients.delete(clientId);
|
||||
}
|
||||
} catch {
|
||||
this.clients.delete(clientId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a log entry matches a client's filter
|
||||
*/
|
||||
private matchesFilter(log: ICaddyAccessLog, filter: ILogFilter): boolean {
|
||||
// Domain filter
|
||||
if (filter.domain) {
|
||||
const logHost = log.request.host.toLowerCase();
|
||||
const filterDomain = filter.domain.toLowerCase();
|
||||
|
||||
// Match exact domain or subdomain
|
||||
if (logHost !== filterDomain && !logHost.endsWith(`.${filterDomain}`)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a WebSocket client to receive logs
|
||||
*/
|
||||
addClient(clientId: string, ws: WebSocket, filter: ILogFilter = {}): void {
|
||||
this.clients.set(clientId, { id: clientId, ws, filter });
|
||||
logger.debug(`CaddyLogReceiver: Added client ${clientId} (${this.clients.size} total)`);
|
||||
|
||||
// Send recent logs to new client
|
||||
for (const log of this.recentLogs) {
|
||||
if (this.matchesFilter(log, filter)) {
|
||||
try {
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: 'access_log',
|
||||
data: {
|
||||
ts: log.ts,
|
||||
request: {
|
||||
remote_ip: log.request.remote_ip,
|
||||
method: log.request.method,
|
||||
host: log.request.host,
|
||||
uri: log.request.uri,
|
||||
proto: log.request.proto,
|
||||
},
|
||||
status: log.status,
|
||||
duration: log.duration,
|
||||
size: log.size,
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
}),
|
||||
);
|
||||
} catch {
|
||||
// Client disconnected
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a WebSocket client
|
||||
*/
|
||||
removeClient(clientId: string): void {
|
||||
if (this.clients.delete(clientId)) {
|
||||
logger.debug(`CaddyLogReceiver: Removed client ${clientId} (${this.clients.size} remaining)`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a client's filter
|
||||
*/
|
||||
updateClientFilter(clientId: string, filter: ILogFilter): void {
|
||||
const client = this.clients.get(clientId);
|
||||
if (client) {
|
||||
client.filter = filter;
|
||||
logger.debug(`CaddyLogReceiver: Updated filter for client ${clientId}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the TCP server
|
||||
*/
|
||||
async stop(): Promise<void> {
|
||||
if (!this.running) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.running = false;
|
||||
|
||||
// Close all connections
|
||||
for (const conn of this.connections) {
|
||||
try {
|
||||
conn.close();
|
||||
} catch {
|
||||
// Already closed
|
||||
}
|
||||
}
|
||||
this.connections.clear();
|
||||
|
||||
// Close server
|
||||
if (this.server) {
|
||||
try {
|
||||
this.server.close();
|
||||
} catch {
|
||||
// Already closed
|
||||
}
|
||||
this.server = null;
|
||||
}
|
||||
|
||||
// Clear clients
|
||||
this.clients.clear();
|
||||
|
||||
logger.info('CaddyLogReceiver stopped');
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current stats
|
||||
*/
|
||||
getStats(): {
|
||||
running: boolean;
|
||||
port: number;
|
||||
clients: number;
|
||||
connections: number;
|
||||
sampleRate: number;
|
||||
recentLogsCount: number;
|
||||
} {
|
||||
return {
|
||||
running: this.running,
|
||||
port: this.port,
|
||||
clients: this.clients.size,
|
||||
connections: this.connections.size,
|
||||
sampleRate: this.currentSampleRate,
|
||||
recentLogsCount: this.recentLogs.length,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user