import { tap, expect } from '@git.zone/tstest/tapbundle'; import { SmartProxy } from '../ts/index.js'; import * as http from 'http'; import WebSocket, { WebSocketServer } from 'ws'; import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js'; /** * Helper: create a WebSocket client that connects through the proxy. * Registers the message handler BEFORE awaiting open to avoid race conditions. */ function connectWs( url: string, headers: Record = {}, opts: WebSocket.ClientOptions = {}, ): { ws: WebSocket; messages: string[]; opened: Promise } { const messages: string[] = []; const ws = new WebSocket(url, { headers, ...opts }); // Register message handler immediately — before open fires ws.on('message', (data) => { messages.push(data.toString()); }); const opened = new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error('WebSocket open timeout')), 5000); ws.on('open', () => { clearTimeout(timeout); resolve(); }); ws.on('error', (err) => { clearTimeout(timeout); reject(err); }); }); return { ws, messages, opened }; } /** Wait until `predicate` returns true, with a hard timeout. */ function waitFor(predicate: () => boolean, timeoutMs = 5000): Promise { return new Promise((resolve, reject) => { const deadline = setTimeout(() => reject(new Error('waitFor timeout')), timeoutMs); const check = () => { if (predicate()) { clearTimeout(deadline); resolve(); } else setTimeout(check, 30); }; check(); }); } /** Graceful close helper */ function closeWs(ws: WebSocket): Promise { return new Promise((resolve) => { if (ws.readyState === WebSocket.CLOSED) return resolve(); ws.on('close', () => resolve()); ws.close(); setTimeout(resolve, 2000); // fallback }); } // ─── Test 1: Basic WebSocket upgrade and bidirectional messaging ─── tap.test('should proxy WebSocket connections with bidirectional messaging', async () => { const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); // Backend: echoes messages with prefix, sends greeting on connect const backendServer = http.createServer(); const wss = new WebSocketServer({ server: backendServer }); const backendMessages: string[] = []; wss.on('connection', (ws) => { ws.on('message', (data) => { const msg = data.toString(); backendMessages.push(msg); ws.send(`echo: ${msg}`); }); ws.send('hello from backend'); }); await new Promise((resolve) => { backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve()); }); const proxy = new SmartProxy({ routes: [{ name: 'ws-test-route', match: { ports: PROXY_PORT }, action: { type: 'forward', targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], websocket: { enabled: true }, }, }], }); await proxy.start(); // Connect client — message handler registered before open const { ws, messages, opened } = connectWs( `ws://127.0.0.1:${PROXY_PORT}/`, { Host: 'test.local' }, ); await opened; // Wait for the backend greeting await waitFor(() => messages.length >= 1); expect(messages[0]).toEqual('hello from backend'); // Send 3 messages, expect 3 echoes ws.send('ping 1'); ws.send('ping 2'); ws.send('ping 3'); await waitFor(() => messages.length >= 4); expect(messages).toContain('echo: ping 1'); expect(messages).toContain('echo: ping 2'); expect(messages).toContain('echo: ping 3'); expect(backendMessages).toInclude('ping 1'); expect(backendMessages).toInclude('ping 2'); expect(backendMessages).toInclude('ping 3'); await closeWs(ws); await proxy.stop(); await new Promise((resolve) => backendServer.close(() => resolve())); await new Promise((r) => setTimeout(r, 500)); await assertPortsFree([PROXY_PORT, BACKEND_PORT]); }); // ─── Test 2: Multiple concurrent WebSocket connections ─── tap.test('should handle multiple concurrent WebSocket connections', async () => { const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); const backendServer = http.createServer(); const wss = new WebSocketServer({ server: backendServer }); let connectionCount = 0; wss.on('connection', (ws) => { const id = ++connectionCount; ws.on('message', (data) => { ws.send(`conn${id}: ${data.toString()}`); }); }); await new Promise((resolve) => { backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve()); }); const proxy = new SmartProxy({ routes: [{ name: 'ws-multi-route', match: { ports: PROXY_PORT }, action: { type: 'forward', targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], websocket: { enabled: true }, }, }], }); await proxy.start(); const NUM_CLIENTS = 5; const clients: { ws: WebSocket; messages: string[] }[] = []; for (let i = 0; i < NUM_CLIENTS; i++) { const c = connectWs( `ws://127.0.0.1:${PROXY_PORT}/`, { Host: 'test.local' }, ); await c.opened; clients.push(c); } // Each client sends a unique message for (let i = 0; i < NUM_CLIENTS; i++) { clients[i].ws.send(`hello from client ${i}`); } // Wait for all replies await waitFor(() => clients.every((c) => c.messages.length >= 1)); for (let i = 0; i < NUM_CLIENTS; i++) { expect(clients[i].messages.length).toBeGreaterThanOrEqual(1); expect(clients[i].messages[0]).toInclude(`hello from client ${i}`); } expect(connectionCount).toEqual(NUM_CLIENTS); for (const c of clients) await closeWs(c.ws); await proxy.stop(); await new Promise((resolve) => backendServer.close(() => resolve())); await new Promise((r) => setTimeout(r, 500)); await assertPortsFree([PROXY_PORT, BACKEND_PORT]); }); // ─── Test 3: WebSocket with binary data ─── tap.test('should proxy binary WebSocket frames', async () => { const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); const backendServer = http.createServer(); const wss = new WebSocketServer({ server: backendServer }); wss.on('connection', (ws) => { ws.on('message', (data) => { ws.send(data, { binary: true }); }); }); await new Promise((resolve) => { backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve()); }); const proxy = new SmartProxy({ routes: [{ name: 'ws-binary-route', match: { ports: PROXY_PORT }, action: { type: 'forward', targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], websocket: { enabled: true }, }, }], }); await proxy.start(); const receivedBuffers: Buffer[] = []; const ws = new WebSocket(`ws://127.0.0.1:${PROXY_PORT}/`, { headers: { Host: 'test.local' }, }); ws.on('message', (data) => { receivedBuffers.push(Buffer.from(data as ArrayBuffer)); }); await new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error('timeout')), 5000); ws.on('open', () => { clearTimeout(timeout); resolve(); }); ws.on('error', (err) => { clearTimeout(timeout); reject(err); }); }); // Send a 256-byte buffer with known content const sentBuffer = Buffer.alloc(256); for (let i = 0; i < 256; i++) sentBuffer[i] = i; ws.send(sentBuffer); await waitFor(() => receivedBuffers.length >= 1); expect(receivedBuffers[0].length).toEqual(256); expect(Buffer.compare(receivedBuffers[0], sentBuffer)).toEqual(0); await closeWs(ws); await proxy.stop(); await new Promise((resolve) => backendServer.close(() => resolve())); await new Promise((r) => setTimeout(r, 500)); await assertPortsFree([PROXY_PORT, BACKEND_PORT]); }); // ─── Test 4: WebSocket path and query string preserved ─── tap.test('should preserve path and query string through proxy', async () => { const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); const backendServer = http.createServer(); const wss = new WebSocketServer({ server: backendServer }); let receivedUrl = ''; wss.on('connection', (ws, req) => { receivedUrl = req.url || ''; ws.send(`url: ${receivedUrl}`); }); await new Promise((resolve) => { backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve()); }); const proxy = new SmartProxy({ routes: [{ name: 'ws-path-route', match: { ports: PROXY_PORT }, action: { type: 'forward', targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], websocket: { enabled: true }, }, }], }); await proxy.start(); const { ws, messages, opened } = connectWs( `ws://127.0.0.1:${PROXY_PORT}/chat/room1?token=abc123`, { Host: 'test.local' }, ); await opened; await waitFor(() => messages.length >= 1); expect(receivedUrl).toEqual('/chat/room1?token=abc123'); expect(messages[0]).toEqual('url: /chat/room1?token=abc123'); await closeWs(ws); await proxy.stop(); await new Promise((resolve) => backendServer.close(() => resolve())); await new Promise((r) => setTimeout(r, 500)); await assertPortsFree([PROXY_PORT, BACKEND_PORT]); }); // ─── Test 5: Clean close propagation ─── tap.test('should handle clean WebSocket close from client', async () => { const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); const backendServer = http.createServer(); const wss = new WebSocketServer({ server: backendServer }); let backendGotClose = false; let backendCloseCode = 0; wss.on('connection', (ws) => { ws.on('close', (code) => { backendGotClose = true; backendCloseCode = code; }); ws.on('message', (data) => { ws.send(data); }); }); await new Promise((resolve) => { backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve()); }); const proxy = new SmartProxy({ routes: [{ name: 'ws-close-route', match: { ports: PROXY_PORT }, action: { type: 'forward', targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], websocket: { enabled: true }, }, }], }); await proxy.start(); const { ws, messages, opened } = connectWs( `ws://127.0.0.1:${PROXY_PORT}/`, { Host: 'test.local' }, ); await opened; // Confirm connection works with a round-trip ws.send('test'); await waitFor(() => messages.length >= 1); // Close with code 1000 let clientCloseCode = 0; const closed = new Promise((resolve) => { ws.on('close', (code) => { clientCloseCode = code; resolve(); }); setTimeout(resolve, 3000); }); ws.close(1000, 'done'); await closed; // Wait for backend to register await waitFor(() => backendGotClose, 3000); expect(backendGotClose).toBeTrue(); expect(clientCloseCode).toEqual(1000); await proxy.stop(); await new Promise((resolve) => backendServer.close(() => resolve())); await new Promise((r) => setTimeout(r, 500)); await assertPortsFree([PROXY_PORT, BACKEND_PORT]); }); // ─── Test 6: Large messages ─── tap.test('should handle large WebSocket messages', async () => { const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2); const backendServer = http.createServer(); const wss = new WebSocketServer({ server: backendServer, maxPayload: 5 * 1024 * 1024 }); wss.on('connection', (ws) => { ws.on('message', (data) => { const buf = Buffer.from(data as ArrayBuffer); ws.send(`received ${buf.length} bytes`); }); }); await new Promise((resolve) => { backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve()); }); const proxy = new SmartProxy({ routes: [{ name: 'ws-large-route', match: { ports: PROXY_PORT }, action: { type: 'forward', targets: [{ host: '127.0.0.1', port: BACKEND_PORT }], websocket: { enabled: true }, }, }], }); await proxy.start(); const { ws, messages, opened } = connectWs( `ws://127.0.0.1:${PROXY_PORT}/`, { Host: 'test.local' }, { maxPayload: 5 * 1024 * 1024 }, ); await opened; // Send a 1MB message const largePayload = Buffer.alloc(1024 * 1024, 0x42); ws.send(largePayload); await waitFor(() => messages.length >= 1); expect(messages[0]).toEqual(`received ${1024 * 1024} bytes`); await closeWs(ws); await proxy.stop(); await new Promise((resolve) => backendServer.close(() => resolve())); await new Promise((r) => setTimeout(r, 500)); await assertPortsFree([PROXY_PORT, BACKEND_PORT]); }); export default tap.start();