import { expect, tap } from '@push.rocks/tapbundle'; import * as net from 'net'; import * as crypto from 'crypto'; import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js'; // --------------------------------------------------------------------------- // Helpers (same patterns as test.flowcontrol.node.ts) // --------------------------------------------------------------------------- async function findFreePorts(count: number): Promise { const servers: net.Server[] = []; const ports: number[] = []; for (let i = 0; i < count; i++) { const server = net.createServer(); await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); ports.push((server.address() as net.AddressInfo).port); servers.push(server); } await Promise.all(servers.map((s) => new Promise((resolve) => s.close(() => resolve())))); return ports; } type TrackingServer = net.Server & { destroyAll: () => void }; function startEchoServer(port: number, host: string): Promise { return new Promise((resolve, reject) => { const connections = new Set(); const server = net.createServer((socket) => { connections.add(socket); socket.on('close', () => connections.delete(socket)); let proxyHeaderParsed = false; let pendingBuf = Buffer.alloc(0); socket.on('data', (data: Buffer) => { if (!proxyHeaderParsed) { pendingBuf = Buffer.concat([pendingBuf, data]); const idx = pendingBuf.indexOf('\r\n'); if (idx !== -1) { proxyHeaderParsed = true; const remainder = pendingBuf.subarray(idx + 2); if (remainder.length > 0) socket.write(remainder); } return; } socket.write(data); }); socket.on('error', () => {}); }) as TrackingServer; server.destroyAll = () => { for (const conn of connections) conn.destroy(); connections.clear(); }; server.on('error', reject); server.listen(port, host, () => resolve(server)); }); } async function forceCloseServer(server: TrackingServer): Promise { server.destroyAll(); await new Promise((resolve) => server.close(() => resolve())); } interface TestTunnel { hub: RemoteIngressHub; edge: RemoteIngressEdge; edgePort: number; cleanup: () => Promise; } /** * Start a full hub + edge tunnel using QUIC transport. * Edge binds to 127.0.0.1, upstream server binds to 127.0.0.2. */ async function startQuicTunnel(edgePort: number, hubPort: number): Promise { const hub = new RemoteIngressHub(); const edge = new RemoteIngressEdge(); await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2', }); await hub.updateAllowedEdges([ { id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] }, ]); const connectedPromise = new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000); edge.once('tunnelConnected', () => { clearTimeout(timeout); resolve(); }); }); await edge.start({ hubHost: '127.0.0.1', hubPort, edgeId: 'test-edge', secret: 'test-secret', bindAddress: '127.0.0.1', transportMode: 'quic', }); await connectedPromise; await new Promise((resolve) => setTimeout(resolve, 500)); return { hub, edge, edgePort, cleanup: async () => { await edge.stop(); await hub.stop(); }, }; } function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; let totalReceived = 0; const expectedLength = data.length; let settled = false; const client = net.createConnection({ host: '127.0.0.1', port }, () => { client.write(data); client.end(); }); const timer = setTimeout(() => { if (!settled) { settled = true; client.destroy(); reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`)); } }, timeoutMs); client.on('data', (chunk: Buffer) => { chunks.push(chunk); totalReceived += chunk.length; if (totalReceived >= expectedLength && !settled) { settled = true; clearTimeout(timer); client.destroy(); resolve(Buffer.concat(chunks)); } }); client.on('end', () => { if (!settled) { settled = true; clearTimeout(timer); resolve(Buffer.concat(chunks)); } }); client.on('error', (err) => { if (!settled) { settled = true; clearTimeout(timer); reject(err); } }); }); } function sha256(buf: Buffer): string { return crypto.createHash('sha256').update(buf).digest('hex'); } // --------------------------------------------------------------------------- // QUIC Transport E2E Tests // --------------------------------------------------------------------------- let tunnel: TestTunnel; let echoServer: TrackingServer; let hubPort: number; let edgePort: number; tap.test('QUIC setup: start TCP echo server and QUIC tunnel', async () => { [hubPort, edgePort] = await findFreePorts(2); echoServer = await startEchoServer(edgePort, '127.0.0.2'); tunnel = await startQuicTunnel(edgePort, hubPort); expect(tunnel.hub.running).toBeTrue(); const status = await tunnel.edge.getStatus(); expect(status.connected).toBeTrue(); }); tap.test('QUIC: single TCP stream echo — 1KB', async () => { const data = crypto.randomBytes(1024); const hash = sha256(data); const received = await sendAndReceive(edgePort, data, 10000); expect(received.length).toEqual(1024); expect(sha256(received)).toEqual(hash); }); tap.test('QUIC: single TCP stream echo — 1MB', async () => { const size = 1024 * 1024; const data = crypto.randomBytes(size); const hash = sha256(data); const received = await sendAndReceive(edgePort, data, 30000); expect(received.length).toEqual(size); expect(sha256(received)).toEqual(hash); }); tap.test('QUIC: single TCP stream echo — 16MB', async () => { const size = 16 * 1024 * 1024; const data = crypto.randomBytes(size); const hash = sha256(data); const received = await sendAndReceive(edgePort, data, 60000); expect(received.length).toEqual(size); expect(sha256(received)).toEqual(hash); }); tap.test('QUIC: 10 concurrent TCP streams x 1MB each', async () => { const streamCount = 10; const payloadSize = 1024 * 1024; const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); const hash = sha256(data); return sendAndReceive(edgePort, data, 30000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); }); tap.test('QUIC: 50 concurrent TCP streams x 64KB each', async () => { const streamCount = 50; const payloadSize = 64 * 1024; const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); const hash = sha256(data); return sendAndReceive(edgePort, data, 30000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); }); tap.test('QUIC: 200 concurrent TCP streams x 16KB each', async () => { const streamCount = 200; const payloadSize = 16 * 1024; const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); const hash = sha256(data); return sendAndReceive(edgePort, data, 60000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); }); tap.test('QUIC: TCP tunnel still connected after all tests', async () => { const status = await tunnel.edge.getStatus(); expect(status.connected).toBeTrue(); }); tap.test('QUIC teardown: stop TCP tunnel and echo server', async () => { await tunnel.cleanup(); await forceCloseServer(echoServer); }); export default tap.start();