diff --git a/changelog.md b/changelog.md index 5144116..bdbee88 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 4.8.12 - fix(tunnel) +prevent tunnel backpressure buffering from exhausting memory and cancel stream handlers before TLS shutdown + +- stop self-waking and writing new frames while a flush is pending to avoid unbounded TLS session buffer growth under load +- reorder edge and hub shutdown cleanup so stream cancellation happens before TLS close_notify, preventing handlers from blocking on dead channels +- add load tests covering sustained large transfers, burst traffic, and rapid stream churn to verify tunnel stability + ## 2026-03-17 - 4.8.11 - fix(remoteingress-core) stop data frame send loops promptly when stream cancellation is triggered diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 9aee291..4c30979 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -587,21 +587,23 @@ async fn connect_to_hub_and_run( } }; - // Graceful TLS shutdown: send close_notify so the hub sees a clean disconnect - // instead of "peer closed connection without sending TLS close_notify". - let mut tls_stream = tunnel_io.into_inner(); - let _ = tokio::time::timeout( - Duration::from_secs(2), - tls_stream.shutdown(), - ).await; - - // Cleanup + // Cancel stream tokens FIRST so stream handlers exit immediately. + // If we TLS-shutdown first, stream handlers are stuck sending to dead channels + // for up to 2 seconds while the shutdown times out on a dead connection. connection_token.cancel(); stun_handle.abort(); for (_, h) in port_listeners.drain() { h.abort(); } + // Graceful TLS shutdown: send close_notify so the hub sees a clean disconnect. + // Stream handlers are already cancelled, so no new data is being produced. + let mut tls_stream = tunnel_io.into_inner(); + let _ = tokio::time::timeout( + Duration::from_secs(2), + tls_stream.shutdown(), + ).await; + result } diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index dfe7185..7e9abd9 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -844,17 +844,19 @@ async fn handle_edge_connection( } } - // Graceful TLS shutdown: send close_notify so the edge sees a clean disconnect - // instead of "peer closed connection without sending TLS close_notify". + // Cancel stream tokens FIRST so stream handlers exit immediately. + // If we TLS-shutdown first, stream handlers are stuck sending to dead channels + // for up to 2 seconds while the shutdown times out on a dead connection. + edge_token.cancel(); + config_handle.abort(); + + // Graceful TLS shutdown: send close_notify so the edge sees a clean disconnect. + // Stream handlers are already cancelled, so no new data is being produced. let mut tls_stream = tunnel_io.into_inner(); let _ = tokio::time::timeout( Duration::from_secs(2), tls_stream.shutdown(), ).await; - - // Cleanup: cancel edge token to propagate to all child tasks - edge_token.cancel(); - config_handle.abort(); { let mut edges = connected.lock().await; edges.remove(&edge_id); diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index b4d14dc..a5c8595 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -312,11 +312,12 @@ impl TunnelIo { cancel_token: &tokio_util::sync::CancellationToken, ) -> Poll { // 1. WRITE: drain ctrl queue first, then data queue. - // TLS poll_write writes plaintext to session buffer (always Ready). - // Batch up to 16 frames per poll cycle. + // Only write when flush is complete — otherwise the TLS session buffer + // grows without bound (poll_write always returns Ready, buffering plaintext + // in the TLS session even when TCP can't keep up). // Safe: `self.write` and `self.stream` are disjoint fields. let mut writes = 0; - while self.write.has_work() && writes < 16 { + while self.write.has_work() && writes < 16 && !self.write.flush_needed { let from_ctrl = !self.write.ctrl_queue.is_empty(); let frame = if from_ctrl { self.write.ctrl_queue.front().unwrap() @@ -424,10 +425,12 @@ impl TunnelIo { return Poll::Ready(TunnelEvent::Cancelled); } - // 6. SELF-WAKE: only when we have frames AND flush is done. + // 6. SELF-WAKE: only when flush is complete AND we have work. // If flush is pending, the TCP write-readiness waker will notify us. - // If we got new channel frames, wake to write them. - if got_new || (!self.write.flush_needed && self.write.has_work()) { + // CRITICAL: do NOT self-wake when flush_needed — this causes unbounded + // TLS session buffer growth (poll_write always accepts plaintext, but TCP + // can't drain it fast enough → OOM → process killed → ECONNRESET). + if !self.write.flush_needed && (got_new || self.write.has_work()) { cx.waker().wake_by_ref(); } diff --git a/test/test.loadtest.node.ts b/test/test.loadtest.node.ts new file mode 100644 index 0000000..450f1d6 --- /dev/null +++ b/test/test.loadtest.node.ts @@ -0,0 +1,393 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import * as net from 'net'; +import * as stream from 'stream'; +import * as crypto from 'crypto'; +import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js'; + +// --------------------------------------------------------------------------- +// Helpers (self-contained — same patterns as test.flowcontrol.node.ts) +// --------------------------------------------------------------------------- + +async function findFreePorts(count: number): Promise { + const servers: net.Server[] = []; + const ports: number[] = []; + for (let i = 0; i < count; i++) { + const server = net.createServer(); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + ports.push((server.address() as net.AddressInfo).port); + servers.push(server); + } + await Promise.all(servers.map((s) => new Promise((resolve) => s.close(() => resolve())))); + return ports; +} + +type TrackingServer = net.Server & { destroyAll: () => void }; + +function startEchoServer(port: number, host: string): Promise { + return new Promise((resolve, reject) => { + const connections = new Set(); + const server = net.createServer((socket) => { + connections.add(socket); + socket.on('close', () => connections.delete(socket)); + let proxyHeaderParsed = false; + let pendingBuf = Buffer.alloc(0); + socket.on('data', (data: Buffer) => { + if (!proxyHeaderParsed) { + pendingBuf = Buffer.concat([pendingBuf, data]); + const idx = pendingBuf.indexOf('\r\n'); + if (idx !== -1) { + proxyHeaderParsed = true; + const remainder = pendingBuf.subarray(idx + 2); + if (remainder.length > 0) socket.write(remainder); + } + return; + } + socket.write(data); + }); + socket.on('error', () => {}); + }) as TrackingServer; + server.destroyAll = () => { + for (const conn of connections) conn.destroy(); + connections.clear(); + }; + server.on('error', reject); + server.listen(port, host, () => resolve(server)); + }); +} + +function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let totalReceived = 0; + const expectedLength = data.length; + let settled = false; + + const client = net.createConnection({ host: '127.0.0.1', port }, () => { + client.write(data); + client.end(); + }); + + const timer = setTimeout(() => { + if (!settled) { + settled = true; + client.destroy(); + reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`)); + } + }, timeoutMs); + + client.on('data', (chunk: Buffer) => { + chunks.push(chunk); + totalReceived += chunk.length; + if (totalReceived >= expectedLength && !settled) { + settled = true; + clearTimeout(timer); + client.destroy(); + resolve(Buffer.concat(chunks)); + } + }); + + client.on('end', () => { + if (!settled) { + settled = true; + clearTimeout(timer); + resolve(Buffer.concat(chunks)); + } + }); + + client.on('error', (err) => { + if (!settled) { + settled = true; + clearTimeout(timer); + reject(err); + } + }); + }); +} + +function sha256(buf: Buffer): string { + return crypto.createHash('sha256').update(buf).digest('hex'); +} + +// --------------------------------------------------------------------------- +// Throttle Proxy: rate-limits TCP traffic between edge and hub +// --------------------------------------------------------------------------- + +class ThrottleTransform extends stream.Transform { + private bytesPerSec: number; + private bucket: number; + private lastRefill: number; + private destroyed_: boolean = false; + + constructor(bytesPerSecond: number) { + super(); + this.bytesPerSec = bytesPerSecond; + this.bucket = bytesPerSecond; + this.lastRefill = Date.now(); + } + + _transform(chunk: Buffer, _encoding: BufferEncoding, callback: stream.TransformCallback) { + if (this.destroyed_) return; + + const now = Date.now(); + const elapsed = (now - this.lastRefill) / 1000; + this.bucket = Math.min(this.bytesPerSec, this.bucket + elapsed * this.bytesPerSec); + this.lastRefill = now; + + if (chunk.length <= this.bucket) { + this.bucket -= chunk.length; + callback(null, chunk); + } else { + // Not enough budget — delay the entire chunk (don't split) + const deficit = chunk.length - this.bucket; + this.bucket = 0; + const delayMs = Math.min((deficit / this.bytesPerSec) * 1000, 1000); + setTimeout(() => { + if (this.destroyed_) return; + this.lastRefill = Date.now(); + this.bucket = 0; + callback(null, chunk); + }, delayMs); + } + } + + _destroy(err: Error | null, callback: (error: Error | null) => void) { + this.destroyed_ = true; + callback(err); + } +} + +interface ThrottleProxy { + server: net.Server; + close: () => Promise; +} + +async function startThrottleProxy( + listenPort: number, + targetHost: string, + targetPort: number, + bytesPerSecond: number, +): Promise { + const connections = new Set(); + const server = net.createServer((clientSock) => { + connections.add(clientSock); + const upstream = net.createConnection({ host: targetHost, port: targetPort }); + connections.add(upstream); + + const throttleUp = new ThrottleTransform(bytesPerSecond); + const throttleDown = new ThrottleTransform(bytesPerSecond); + + clientSock.pipe(throttleUp).pipe(upstream); + upstream.pipe(throttleDown).pipe(clientSock); + + const cleanup = () => { + throttleUp.destroy(); + throttleDown.destroy(); + clientSock.destroy(); + upstream.destroy(); + connections.delete(clientSock); + connections.delete(upstream); + }; + clientSock.on('error', cleanup); + upstream.on('error', cleanup); + throttleUp.on('error', cleanup); + throttleDown.on('error', cleanup); + clientSock.on('close', cleanup); + upstream.on('close', cleanup); + }); + + await new Promise((resolve) => server.listen(listenPort, '127.0.0.1', resolve)); + return { + server, + close: async () => { + for (const c of connections) c.destroy(); + connections.clear(); + await new Promise((resolve) => server.close(() => resolve())); + }, + }; +} + +// --------------------------------------------------------------------------- +// Test state +// --------------------------------------------------------------------------- + +let hub: RemoteIngressHub; +let edge: RemoteIngressEdge; +let echoServer: TrackingServer; +let throttle: ThrottleProxy; +let hubPort: number; +let proxyPort: number; +let edgePort: number; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +tap.test('setup: start throttled tunnel (20 Mbit/s)', async () => { + [hubPort, proxyPort, edgePort] = await findFreePorts(3); + + echoServer = await startEchoServer(edgePort, '127.0.0.2'); + + // Throttle proxy: edge → proxy → hub at 20 Mbit/s (2.5 MB/s) + throttle = await startThrottleProxy(proxyPort, '127.0.0.1', hubPort, 2.5 * 1024 * 1024); + + hub = new RemoteIngressHub(); + edge = new RemoteIngressEdge(); + + await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2' }); + await hub.updateAllowedEdges([ + { id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] }, + ]); + + const connectedPromise = new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000); + edge.once('tunnelConnected', () => { + clearTimeout(timeout); + resolve(); + }); + }); + + // Edge connects to proxy, not hub directly + await edge.start({ + hubHost: '127.0.0.1', + hubPort: proxyPort, + edgeId: 'test-edge', + secret: 'test-secret', + bindAddress: '127.0.0.1', + }); + + await connectedPromise; + await new Promise((resolve) => setTimeout(resolve, 500)); + + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('throttled: 10 streams x 50MB each through 10MB/s tunnel', async () => { + const streamCount = 10; + const payloadSize = 50 * 1024 * 1024; // 50MB per stream = 500MB total round-trip + + const promises = Array.from({ length: streamCount }, () => { + const data = crypto.randomBytes(payloadSize); + const hash = sha256(data); + return sendAndReceive(edgePort, data, 300000).then((received) => ({ + sent: hash, + received: sha256(received), + sizeOk: received.length === payloadSize, + })); + }); + + const results = await Promise.all(promises); + const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); + expect(failures.length).toEqual(0); + + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('throttled: slow consumer with 50MB does not kill other streams', async () => { + // Open a connection that creates massive download-direction backpressure: + // send 50MB but DON'T read the response — client TCP receive buffer fills + const slowSock = net.createConnection({ host: '127.0.0.1', port: edgePort }); + await new Promise((resolve) => slowSock.on('connect', resolve)); + const slowData = crypto.randomBytes(50 * 1024 * 1024); + slowSock.write(slowData); + slowSock.end(); + // Don't read — backpressure builds on the download path + + // Wait for backpressure to develop + await new Promise((r) => setTimeout(r, 3000)); + + // Meanwhile, 10 normal echo streams with 50MB each must complete + const payload = crypto.randomBytes(50 * 1024 * 1024); + const hash = sha256(payload); + const promises = Array.from({ length: 10 }, () => + sendAndReceive(edgePort, payload, 300000).then((r) => ({ + hash: sha256(r), + sizeOk: r.length === payload.length, + })) + ); + const results = await Promise.all(promises); + const failures = results.filter((r) => !r.sizeOk || r.hash !== hash); + expect(failures.length).toEqual(0); + + // Tunnel still alive + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); + + slowSock.destroy(); +}); + +tap.test('throttled: rapid churn — 5 x 50MB long + 200 x 1MB short streams', async () => { + // 5 long streams (50MB each) running alongside 200 short streams (1MB each) + const longPayload = crypto.randomBytes(50 * 1024 * 1024); + const longHash = sha256(longPayload); + const longPromises = Array.from({ length: 5 }, () => + sendAndReceive(edgePort, longPayload, 300000).then((r) => ({ + hash: sha256(r), + sizeOk: r.length === longPayload.length, + })) + ); + + const shortPayload = crypto.randomBytes(1024 * 1024); + const shortHash = sha256(shortPayload); + const shortPromises = Array.from({ length: 200 }, () => + sendAndReceive(edgePort, shortPayload, 300000).then((r) => ({ + hash: sha256(r), + sizeOk: r.length === shortPayload.length, + })) + ); + + const [longResults, shortResults] = await Promise.all([ + Promise.all(longPromises), + Promise.all(shortPromises), + ]); + + const longFails = longResults.filter((r) => !r.sizeOk || r.hash !== longHash); + const shortFails = shortResults.filter((r) => !r.sizeOk || r.hash !== shortHash); + expect(longFails.length).toEqual(0); + expect(shortFails.length).toEqual(0); + + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('throttled: 5 burst waves of 20 streams x 50MB each', async () => { + for (let wave = 0; wave < 5; wave++) { + const streamCount = 20; + const payloadSize = 50 * 1024 * 1024; // 50MB per stream = 1GB per wave + + const promises = Array.from({ length: streamCount }, () => { + const data = crypto.randomBytes(payloadSize); + return sendAndReceive(edgePort, data, 300000).then((r) => r.length === payloadSize); + }); + + const results = await Promise.all(promises); + const ok = results.filter(Boolean).length; + expect(ok).toEqual(streamCount); + + // Brief pause between waves + await new Promise((r) => setTimeout(r, 500)); + + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); + } +}); + +tap.test('throttled: tunnel still works after all load tests', async () => { + const data = crypto.randomBytes(1024); + const hash = sha256(data); + const received = await sendAndReceive(edgePort, data, 30000); + expect(sha256(received)).toEqual(hash); + + const status = await edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('teardown: stop tunnel', async () => { + await edge.stop(); + await hub.stop(); + if (throttle) await throttle.close(); + await new Promise((resolve) => echoServer.close(() => resolve())); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 8307d0d..2fe93d8 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.8.11', + version: '4.8.12', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }