import { expect, tap } from '@push.rocks/tapbundle'; import * as net from 'net'; import * as stream from 'stream'; import * as crypto from 'crypto'; import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js'; // --------------------------------------------------------------------------- // Helpers (self-contained — same patterns as test.flowcontrol.node.ts) // --------------------------------------------------------------------------- async function findFreePorts(count: number): Promise { const servers: net.Server[] = []; const ports: number[] = []; for (let i = 0; i < count; i++) { const server = net.createServer(); await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); ports.push((server.address() as net.AddressInfo).port); servers.push(server); } await Promise.all(servers.map((s) => new Promise((resolve) => s.close(() => resolve())))); return ports; } type TrackingServer = net.Server & { destroyAll: () => void }; function startEchoServer(port: number, host: string): Promise { return new Promise((resolve, reject) => { const connections = new Set(); const server = net.createServer((socket) => { connections.add(socket); socket.on('close', () => connections.delete(socket)); let proxyHeaderParsed = false; let pendingBuf = Buffer.alloc(0); socket.on('data', (data: Buffer) => { if (!proxyHeaderParsed) { pendingBuf = Buffer.concat([pendingBuf, data]); const idx = pendingBuf.indexOf('\r\n'); if (idx !== -1) { proxyHeaderParsed = true; const remainder = pendingBuf.subarray(idx + 2); if (remainder.length > 0) socket.write(remainder); } return; } socket.write(data); }); socket.on('error', () => {}); }) as TrackingServer; server.destroyAll = () => { for (const conn of connections) conn.destroy(); connections.clear(); }; server.on('error', reject); server.listen(port, host, () => resolve(server)); }); } function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; let totalReceived = 0; const expectedLength = data.length; let settled = false; const client = net.createConnection({ host: '127.0.0.1', port }, () => { client.write(data); client.end(); }); const timer = setTimeout(() => { if (!settled) { settled = true; client.destroy(); reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`)); } }, timeoutMs); client.on('data', (chunk: Buffer) => { chunks.push(chunk); totalReceived += chunk.length; if (totalReceived >= expectedLength && !settled) { settled = true; clearTimeout(timer); client.destroy(); resolve(Buffer.concat(chunks)); } }); client.on('end', () => { if (!settled) { settled = true; clearTimeout(timer); resolve(Buffer.concat(chunks)); } }); client.on('error', (err) => { if (!settled) { settled = true; clearTimeout(timer); reject(err); } }); }); } function sha256(buf: Buffer): string { return crypto.createHash('sha256').update(buf).digest('hex'); } // --------------------------------------------------------------------------- // Throttle Proxy: rate-limits TCP traffic between edge and hub // --------------------------------------------------------------------------- class ThrottleTransform extends stream.Transform { private bytesPerSec: number; private bucket: number; private lastRefill: number; private destroyed_: boolean = false; constructor(bytesPerSecond: number) { super(); this.bytesPerSec = bytesPerSecond; this.bucket = bytesPerSecond; this.lastRefill = Date.now(); } _transform(chunk: Buffer, _encoding: BufferEncoding, callback: stream.TransformCallback) { if (this.destroyed_) return; const now = Date.now(); const elapsed = (now - this.lastRefill) / 1000; this.bucket = Math.min(this.bytesPerSec, this.bucket + elapsed * this.bytesPerSec); this.lastRefill = now; if (chunk.length <= this.bucket) { this.bucket -= chunk.length; callback(null, chunk); } else { // Not enough budget — delay the entire chunk (don't split) const deficit = chunk.length - this.bucket; this.bucket = 0; const delayMs = Math.min((deficit / this.bytesPerSec) * 1000, 1000); setTimeout(() => { if (this.destroyed_) { callback(); return; } this.lastRefill = Date.now(); this.bucket = 0; callback(null, chunk); }, delayMs); } } _destroy(err: Error | null, callback: (error: Error | null) => void) { this.destroyed_ = true; callback(err); } } interface ThrottleProxy { server: net.Server; close: () => Promise; } async function startThrottleProxy( listenPort: number, targetHost: string, targetPort: number, bytesPerSecond: number, ): Promise { const connections = new Set(); const server = net.createServer((clientSock) => { connections.add(clientSock); const upstream = net.createConnection({ host: targetHost, port: targetPort }); connections.add(upstream); const throttleUp = new ThrottleTransform(bytesPerSecond); const throttleDown = new ThrottleTransform(bytesPerSecond); clientSock.pipe(throttleUp).pipe(upstream); upstream.pipe(throttleDown).pipe(clientSock); let cleaned = false; const cleanup = (source: string, err?: Error) => { if (cleaned) return; cleaned = true; if (err) { console.error(`[ThrottleProxy] cleanup triggered by ${source}: ${err.message}`); } else { console.error(`[ThrottleProxy] cleanup triggered by ${source} (no error)`); } console.error(`[ThrottleProxy] stack:`, new Error().stack); throttleUp.destroy(); throttleDown.destroy(); clientSock.destroy(); upstream.destroy(); connections.delete(clientSock); connections.delete(upstream); }; clientSock.on('error', (e) => cleanup('clientSock.error', e)); upstream.on('error', (e) => cleanup('upstream.error', e)); throttleUp.on('error', (e) => cleanup('throttleUp.error', e)); throttleDown.on('error', (e) => cleanup('throttleDown.error', e)); clientSock.on('close', () => cleanup('clientSock.close')); upstream.on('close', () => cleanup('upstream.close')); }); await new Promise((resolve) => server.listen(listenPort, '127.0.0.1', resolve)); return { server, close: async () => { for (const c of connections) c.destroy(); connections.clear(); await new Promise((resolve) => server.close(() => resolve())); }, }; } // --------------------------------------------------------------------------- // Test state // --------------------------------------------------------------------------- let hub: RemoteIngressHub; let edge: RemoteIngressEdge; let echoServer: TrackingServer; let throttle: ThrottleProxy; let hubPort: number; let proxyPort: number; let edgePort: number; // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- tap.test('setup: start throttled tunnel (100 Mbit/s)', async () => { [hubPort, proxyPort, edgePort] = await findFreePorts(3); echoServer = await startEchoServer(edgePort, '127.0.0.2'); // Throttle proxy: edge → proxy → hub at 100 Mbit/s (12.5 MB/s) throttle = await startThrottleProxy(proxyPort, '127.0.0.1', hubPort, 12.5 * 1024 * 1024); hub = new RemoteIngressHub(); edge = new RemoteIngressEdge(); await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2' }); await hub.updateAllowedEdges([ { id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] }, ]); const connectedPromise = new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000); edge.once('tunnelConnected', () => { clearTimeout(timeout); resolve(); }); }); // Edge connects through throttle proxy await edge.start({ hubHost: '127.0.0.1', hubPort: proxyPort, edgeId: 'test-edge', secret: 'test-secret', bindAddress: '127.0.0.1', }); await connectedPromise; await new Promise((resolve) => setTimeout(resolve, 500)); const status = await edge.getStatus(); expect(status.connected).toBeTrue(); }); tap.test('throttled: 5 streams x 20MB each through 100Mbit tunnel', async () => { const streamCount = 5; const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB total round-trip const payloads = Array.from({ length: streamCount }, () => crypto.randomBytes(payloadSize)); const promises = payloads.map((data) => { const hash = sha256(data); return sendAndReceive(edgePort, data, 300000).then((received) => ({ sent: hash, received: sha256(received), sizeOk: received.length === payloadSize, })); }); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); expect(failures.length).toEqual(0); const status = await edge.getStatus(); expect(status.connected).toBeTrue(); }); tap.test('throttled: slow consumer with 20MB does not kill other streams', async () => { // Open a connection that creates download-direction backpressure: // send 20MB but DON'T read the response — client TCP receive buffer fills const slowSock = net.createConnection({ host: '127.0.0.1', port: edgePort }); await new Promise((resolve) => slowSock.on('connect', resolve)); const slowData = crypto.randomBytes(20 * 1024 * 1024); slowSock.write(slowData); slowSock.end(); // Don't read — backpressure builds on the download path // Wait for backpressure to develop await new Promise((r) => setTimeout(r, 2000)); // Meanwhile, 5 normal echo streams with 20MB each must complete const payload = crypto.randomBytes(20 * 1024 * 1024); const hash = sha256(payload); const promises = Array.from({ length: 5 }, () => sendAndReceive(edgePort, payload, 300000).then((r) => ({ hash: sha256(r), sizeOk: r.length === payload.length, })) ); const results = await Promise.all(promises); const failures = results.filter((r) => !r.sizeOk || r.hash !== hash); expect(failures.length).toEqual(0); // Tunnel still alive const status = await edge.getStatus(); expect(status.connected).toBeTrue(); slowSock.destroy(); }); tap.test('throttled: rapid churn — 3 x 20MB long + 50 x 1MB short streams', async () => { // 3 long streams (20MB each) running alongside 50 short streams (1MB each) const longPayload = crypto.randomBytes(20 * 1024 * 1024); const longHash = sha256(longPayload); const longPromises = Array.from({ length: 3 }, () => sendAndReceive(edgePort, longPayload, 300000).then((r) => ({ hash: sha256(r), sizeOk: r.length === longPayload.length, })) ); const shortPayload = crypto.randomBytes(1024 * 1024); const shortHash = sha256(shortPayload); const shortPromises = Array.from({ length: 50 }, () => sendAndReceive(edgePort, shortPayload, 300000).then((r) => ({ hash: sha256(r), sizeOk: r.length === shortPayload.length, })) ); const [longResults, shortResults] = await Promise.all([ Promise.all(longPromises), Promise.all(shortPromises), ]); const longFails = longResults.filter((r) => !r.sizeOk || r.hash !== longHash); const shortFails = shortResults.filter((r) => !r.sizeOk || r.hash !== shortHash); expect(longFails.length).toEqual(0); expect(shortFails.length).toEqual(0); const status = await edge.getStatus(); expect(status.connected).toBeTrue(); }); tap.test('throttled: 3 burst waves of 5 streams x 20MB each', async () => { for (let wave = 0; wave < 3; wave++) { const streamCount = 5; const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB per wave const promises = Array.from({ length: streamCount }, () => { const data = crypto.randomBytes(payloadSize); return sendAndReceive(edgePort, data, 300000).then((r) => r.length === payloadSize); }); const results = await Promise.all(promises); const ok = results.filter(Boolean).length; expect(ok).toEqual(streamCount); // Brief pause between waves await new Promise((r) => setTimeout(r, 500)); const status = await edge.getStatus(); expect(status.connected).toBeTrue(); } }); tap.test('throttled: tunnel still works after all load tests', async () => { const data = crypto.randomBytes(1024); const hash = sha256(data); const received = await sendAndReceive(edgePort, data, 30000); expect(sha256(received)).toEqual(hash); const status = await edge.getStatus(); expect(status.connected).toBeTrue(); }); tap.test('teardown: stop tunnel', async () => { await edge.stop(); await hub.stop(); if (throttle) await throttle.close(); await new Promise((resolve) => echoServer.close(() => resolve())); }); export default tap.start();