import { expect, tap } from '@push.rocks/tapbundle'; import * as net from 'net'; import * as crypto from 'crypto'; import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js'; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /** Find N free ports by binding to port 0 and collecting OS-assigned ports. */ async function findFreePorts(count: number): Promise { const servers: net.Server[] = []; const ports: number[] = []; for (let i = 0; i < count; i++) { const server = net.createServer(); await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); ports.push((server.address() as net.AddressInfo).port); servers.push(server); } await Promise.all(servers.map((s) => new Promise((resolve) => s.close(() => resolve())))); return ports; } type TrackingServer = net.Server & { destroyAll: () => void }; /** Start a TCP echo server that tracks connections for force-close. */ function startEchoServer(port: number, host: string): Promise { return new Promise((resolve, reject) => { const connections = new Set(); const server = net.createServer((socket) => { connections.add(socket); socket.on('close', () => connections.delete(socket)); // Skip PROXY protocol v1 header line before echoing let proxyHeaderParsed = false; let pendingBuf = Buffer.alloc(0); socket.on('data', (data: Buffer) => { if (!proxyHeaderParsed) { pendingBuf = Buffer.concat([pendingBuf, data]); const idx = pendingBuf.indexOf('\r\n'); if (idx !== -1) { proxyHeaderParsed = true; const remainder = pendingBuf.subarray(idx + 2); if (remainder.length > 0) { socket.write(remainder); } } return; } socket.write(data); }); socket.on('error', () => {}); }) as TrackingServer; server.destroyAll = () => { for (const conn of connections) conn.destroy(); connections.clear(); }; server.on('error', reject); server.listen(port, host, () => resolve(server)); }); } /** * Start a server that sends a large response immediately on first data received. * Does NOT wait for end (the tunnel protocol has no half-close). * On receiving first data chunk after PROXY header, sends responseSize bytes then closes. */ function startLargeResponseServer(port: number, host: string, responseSize: number): Promise { return new Promise((resolve, reject) => { const connections = new Set(); const server = net.createServer((socket) => { connections.add(socket); socket.on('close', () => connections.delete(socket)); let proxyHeaderParsed = false; let pendingBuf = Buffer.alloc(0); let responseSent = false; socket.on('data', (data: Buffer) => { if (!proxyHeaderParsed) { pendingBuf = Buffer.concat([pendingBuf, data]); const idx = pendingBuf.indexOf('\r\n'); if (idx !== -1) { proxyHeaderParsed = true; const remainder = pendingBuf.subarray(idx + 2); if (remainder.length > 0 && !responseSent) { responseSent = true; sendLargeResponse(socket, responseSize); } } return; } if (!responseSent) { responseSent = true; sendLargeResponse(socket, responseSize); } }); socket.on('error', () => {}); }) as TrackingServer; server.destroyAll = () => { for (const conn of connections) conn.destroy(); connections.clear(); }; server.on('error', reject); server.listen(port, host, () => resolve(server)); }); } function sendLargeResponse(socket: net.Socket, totalBytes: number) { const chunkSize = 32 * 1024; let sent = 0; const writeChunk = () => { while (sent < totalBytes) { const toWrite = Math.min(chunkSize, totalBytes - sent); // Use a deterministic pattern for verification const chunk = Buffer.alloc(toWrite, (sent % 256) & 0xff); const canContinue = socket.write(chunk); sent += toWrite; if (!canContinue) { socket.once('drain', writeChunk); return; } } socket.end(); }; writeChunk(); } /** Force-close a server: destroy all connections, then close. */ async function forceCloseServer(server: TrackingServer): Promise { server.destroyAll(); await new Promise((resolve) => server.close(() => resolve())); } interface TestTunnel { hub: RemoteIngressHub; edge: RemoteIngressEdge; edgePort: number; cleanup: () => Promise; } /** * Start a full hub + edge tunnel. * Edge binds to 127.0.0.1, upstream server binds to 127.0.0.2. * Hub targetHost = 127.0.0.2 so hub -> upstream doesn't loop back to edge. */ async function startTunnel(edgePort: number, hubPort: number): Promise { const hub = new RemoteIngressHub(); const edge = new RemoteIngressEdge(); await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2', }); await hub.updateAllowedEdges([ { id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] }, ]); const connectedPromise = new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000); edge.once('tunnelConnected', () => { clearTimeout(timeout); resolve(); }); }); await edge.start({ hubHost: '127.0.0.1', hubPort, edgeId: 'test-edge', secret: 'test-secret', bindAddress: '127.0.0.1', }); await connectedPromise; await new Promise((resolve) => setTimeout(resolve, 500)); return { hub, edge, edgePort, cleanup: async () => { await edge.stop(); await hub.stop(); }, }; } /** * Send data through the tunnel and collect the echoed response. */ function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; let totalReceived = 0; const expectedLength = data.length; let settled = false; const client = net.createConnection({ host: '127.0.0.1', port }, () => { client.write(data); client.end(); }); const timer = setTimeout(() => { if (!settled) { settled = true; client.destroy(); reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`)); } }, timeoutMs); client.on('data', (chunk: Buffer) => { chunks.push(chunk); totalReceived += chunk.length; if (totalReceived >= expectedLength && !settled) { settled = true; clearTimeout(timer); client.destroy(); resolve(Buffer.concat(chunks)); } }); client.on('end', () => { if (!settled) { settled = true; clearTimeout(timer); resolve(Buffer.concat(chunks)); } }); client.on('error', (err) => { if (!settled) { settled = true; clearTimeout(timer); reject(err); } }); }); } /** * Connect to the tunnel, send a small request, and collect a large response. * Does NOT call end() — the tunnel has no half-close. * Instead, collects until expectedResponseSize bytes arrive. */ function sendAndReceiveLarge( port: number, data: Buffer, expectedResponseSize: number, timeoutMs = 60000, ): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; let totalReceived = 0; let settled = false; const client = net.createConnection({ host: '127.0.0.1', port }, () => { client.write(data); // Do NOT call client.end() — the server will respond immediately // and the tunnel CLOSE will happen when the download finishes }); const timer = setTimeout(() => { if (!settled) { settled = true; client.destroy(); reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedResponseSize} bytes`)); } }, timeoutMs); client.on('data', (chunk: Buffer) => { chunks.push(chunk); totalReceived += chunk.length; if (totalReceived >= expectedResponseSize && !settled) { settled = true; clearTimeout(timer); client.destroy(); resolve(Buffer.concat(chunks)); } }); client.on('end', () => { if (!settled) { settled = true; clearTimeout(timer); resolve(Buffer.concat(chunks)); } }); client.on('error', (err) => { if (!settled) { settled = true; clearTimeout(timer); reject(err); } }); }); } function sha256(buf: Buffer): string { return crypto.createHash('sha256').update(buf).digest('hex'); } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- let tunnel: TestTunnel; let echoServer: TrackingServer; let hubPort: number; let edgePort: number; tap.test('setup: start echo server and tunnel', async () => { [hubPort, edgePort] = await findFreePorts(2); echoServer = await startEchoServer(edgePort, '127.0.0.2'); tunnel = await startTunnel(edgePort, hubPort); expect(tunnel.hub.running).toBeTrue(); }); tap.test('single stream: 32MB transfer exceeding initial 4MB window', async () => { const size = 32 * 1024 * 1024; const data = crypto.randomBytes(size); const expectedHash = sha256(data); const received = await sendAndReceive(edgePort, data, 60000); expect(received.length).toEqual(size); expect(sha256(received)).toEqual(expectedHash); }); tap.test('200 concurrent streams with 64KB each', async () => { const streamCount = 200; const payloadSize = 64 * 1024; const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); const hash = sha256(data); return sendAndReceive(edgePort, data, 30000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); }); tap.test('512 concurrent streams at minimum window boundary (16KB each)', async () => { const streamCount = 512; const payloadSize = 16 * 1024; const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); const hash = sha256(data); return sendAndReceive(edgePort, data, 60000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); }); tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => { // Swap to large-response server await forceCloseServer(echoServer); const responseSize = 4 * 1024 * 1024; // 4 MB const largeServer = await startLargeResponseServer(edgePort, '127.0.0.2', responseSize); try { const requestData = crypto.randomBytes(4 * 1024); // 4 KB const received = await sendAndReceiveLarge(edgePort, requestData, responseSize, 60000); expect(received.length).toEqual(responseSize); } finally { // Always restore echo server even on failure await forceCloseServer(largeServer); echoServer = await startEchoServer(edgePort, '127.0.0.2'); } }); tap.test('100 streams x 1MB each (100MB total exceeding 32MB budget)', async () => { const streamCount = 100; const payloadSize = 1 * 1024 * 1024; const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); const hash = sha256(data); return sendAndReceive(edgePort, data, 120000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); }); tap.test('active stream counter tracks concurrent connections', async () => { const N = 50; // Open N connections and keep them alive (send data but don't close) const sockets: net.Socket[] = []; const connectPromises = Array.from({ length: N }, () => { return new Promise((resolve, reject) => { const sock = net.createConnection({ host: '127.0.0.1', port: edgePort }, () => { resolve(sock); }); sock.on('error', () => {}); setTimeout(() => reject(new Error('connect timeout')), 5000); }); }); const connected = await Promise.all(connectPromises); sockets.push(...connected); // Brief delay for stream registration to propagate await new Promise((resolve) => setTimeout(resolve, 500)); // Verify the edge reports >= N active streams. // This counter is the input to compute_window_for_stream_count(), // so its accuracy determines whether adaptive window sizing is correct. const status = await tunnel.edge.getStatus(); expect(status.activeStreams).toBeGreaterThanOrEqual(N); // Clean up: destroy all sockets (the tunnel's 300s stream timeout will handle cleanup) for (const sock of sockets) { sock.destroy(); } }); tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => { // At 50 concurrent streams: adaptive window = 32MB/50 = 655KB per stream // Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream const streamCount = 50; const payloadSize = 2 * 1024 * 1024; const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); const hash = sha256(data); return sendAndReceive(edgePort, data, 120000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); }); tap.test('teardown: stop tunnel and echo server', async () => { await tunnel.cleanup(); await forceCloseServer(echoServer); }); export default tap.start();