From eb30825f726e9701a9479f33547d2e96a58d830f Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 17 Mar 2026 19:15:43 +0000 Subject: [PATCH] feat(tests,client): add flow control and load test coverage and honor configured keepalive intervals --- changelog.md | 7 + rust/src/client.rs | 11 +- test/test.flowcontrol.node.ts | 271 ++++++++++++++++++++++++++ test/test.loadtest.node.ts | 357 ++++++++++++++++++++++++++++++++++ ts/00_commitinfo_data.ts | 2 +- 5 files changed, 645 insertions(+), 3 deletions(-) create mode 100644 test/test.flowcontrol.node.ts create mode 100644 test/test.loadtest.node.ts diff --git a/changelog.md b/changelog.md index 60b01a0..fdf0d73 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-17 - 1.3.0 - feat(tests,client) +add flow control and load test coverage and honor configured keepalive intervals + +- Adds end-to-end node tests for client/server flow control, keepalive exchange, connection quality telemetry, rate limiting, concurrent clients, and disconnect tracking. +- Adds load testing with throttled proxy scenarios to validate behavior under constrained bandwidth and repeated client churn. +- Updates the Rust client to pass configured keepaliveIntervalSecs into the adaptive keepalive monitor instead of always using defaults. + ## 2026-03-15 - 1.2.0 - feat(readme) document QoS, telemetry, MTU, and rate limiting capabilities in the README diff --git a/rust/src/client.rs b/rust/src/client.rs index 22c6bb2..9129bc4 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -167,8 +167,15 @@ impl VpnClient { info!("Connected to VPN, assigned IP: {}", assigned_ip); - // Create adaptive keepalive monitor - let (monitor, handle) = keepalive::create_keepalive(None); + // Create adaptive keepalive monitor (use custom interval if configured) + let ka_config = config.keepalive_interval_secs.map(|secs| { + let mut cfg = keepalive::AdaptiveKeepaliveConfig::default(); + cfg.degraded_interval = std::time::Duration::from_secs(secs); + cfg.healthy_interval = std::time::Duration::from_secs(secs * 2); + cfg.critical_interval = std::time::Duration::from_secs((secs / 3).max(1)); + cfg + }); + let (monitor, handle) = keepalive::create_keepalive(ka_config); self.quality_rx = Some(handle.quality_rx); // Spawn the keepalive monitor diff --git a/test/test.flowcontrol.node.ts b/test/test.flowcontrol.node.ts new file mode 100644 index 0000000..e7edcfa --- /dev/null +++ b/test/test.flowcontrol.node.ts @@ -0,0 +1,271 @@ +import { tap, expect } from '@git.zone/tstest/tapbundle'; +import * as net from 'net'; +import { VpnClient, VpnServer } from '../ts/index.js'; +import type { IVpnClientOptions, IVpnServerOptions, IVpnKeypair, IVpnServerConfig } from '../ts/index.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async function findFreePort(): Promise { + const server = net.createServer(); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const port = (server.address() as net.AddressInfo).port; + await new Promise((resolve) => server.close(() => resolve())); + return port; +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitFor( + fn: () => Promise, + timeoutMs: number = 10000, + pollMs: number = 500, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (await fn()) return; + await delay(pollMs); + } + throw new Error(`waitFor timed out after ${timeoutMs}ms`); +} + +// --------------------------------------------------------------------------- +// Test state +// --------------------------------------------------------------------------- + +let server: VpnServer; +let serverPort: number; +let keypair: IVpnKeypair; +let client: VpnClient; +const extraClients: VpnClient[] = []; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +tap.test('setup: start VPN server', async () => { + serverPort = await findFreePort(); + + const options: IVpnServerOptions = { + transport: { transport: 'stdio' }, + }; + server = new VpnServer(options); + + // Phase 1: start the daemon bridge + const started = await server['bridge'].start(); + expect(started).toBeTrue(); + expect(server.running).toBeTrue(); + + // Phase 2: generate a keypair + keypair = await server.generateKeypair(); + expect(keypair.publicKey).toBeTypeofString(); + expect(keypair.privateKey).toBeTypeofString(); + + // Phase 3: start the VPN listener + const serverConfig: IVpnServerConfig = { + listenAddr: `127.0.0.1:${serverPort}`, + privateKey: keypair.privateKey, + publicKey: keypair.publicKey, + subnet: '10.8.0.0/24', + }; + await server['bridge'].sendCommand('start', { config: serverConfig }); + + // Verify server is now running + const status = await server.getStatus(); + expect(status.state).toEqual('connected'); +}); + +tap.test('single client connects and gets IP', async () => { + const options: IVpnClientOptions = { + transport: { transport: 'stdio' }, + }; + client = new VpnClient(options); + const started = await client.start(); + expect(started).toBeTrue(); + + const result = await client.connect({ + serverUrl: `ws://127.0.0.1:${serverPort}`, + serverPublicKey: keypair.publicKey, + keepaliveIntervalSecs: 3, + }); + + expect(result.assignedIp).toBeTypeofString(); + expect(result.assignedIp).toStartWith('10.8.0.'); + + // Verify client status + const clientStatus = await client.getStatus(); + expect(clientStatus.state).toEqual('connected'); + + // Verify server sees the client + await waitFor(async () => { + const clients = await server.listClients(); + return clients.length === 1; + }); + const clients = await server.listClients(); + expect(clients.length).toEqual(1); + expect(clients[0].assignedIp).toEqual(result.assignedIp); +}); + +tap.test('keepalive exchange', async () => { + // Wait for at least 2 keepalive cycles (interval=3s, so 8s should be enough) + await delay(8000); + + const clientStats = await client.getStatistics(); + expect(clientStats.keepalivesSent).toBeGreaterThanOrEqual(1); + expect(clientStats.keepalivesReceived).toBeGreaterThanOrEqual(1); + + const serverStats = await server.getStatistics(); + expect(serverStats.keepalivesReceived).toBeGreaterThanOrEqual(1); + expect(serverStats.keepalivesSent).toBeGreaterThanOrEqual(1); + + // Verify per-client keepalive tracking + const clients = await server.listClients(); + expect(clients[0].keepalivesReceived).toBeGreaterThanOrEqual(1); +}); + +tap.test('connection quality telemetry', async () => { + const quality = await client.getConnectionQuality(); + + expect(quality.srttMs).toBeGreaterThanOrEqual(0); + expect(quality.jitterMs).toBeTypeofNumber(); + expect(quality.minRttMs).toBeGreaterThanOrEqual(0); + expect(quality.maxRttMs).toBeGreaterThanOrEqual(0); + expect(quality.lossRatio).toBeTypeofNumber(); + expect(['healthy', 'degraded', 'critical']).toContain(quality.linkHealth); +}); + +tap.test('rate limiting: set and verify', async () => { + const clients = await server.listClients(); + const clientId = clients[0].clientId; + + // Set a tight rate limit + await server.setClientRateLimit(clientId, 100, 100); + + // Verify via telemetry + const telemetry = await server.getClientTelemetry(clientId); + expect(telemetry.rateLimitBytesPerSec).toEqual(100); + expect(telemetry.burstBytes).toEqual(100); + expect(telemetry.clientId).toEqual(clientId); +}); + +tap.test('rate limiting: removal', async () => { + const clients = await server.listClients(); + const clientId = clients[0].clientId; + + await server.removeClientRateLimit(clientId); + + // Verify telemetry no longer shows rate limit + const telemetry = await server.getClientTelemetry(clientId); + expect(telemetry.rateLimitBytesPerSec).toBeNullOrUndefined(); + expect(telemetry.burstBytes).toBeNullOrUndefined(); + + // Connection still healthy + const status = await client.getStatus(); + expect(status.state).toEqual('connected'); +}); + +tap.test('5 concurrent clients', async () => { + const assignedIps = new Set(); + + // Get the first client's IP + const existingClients = await server.listClients(); + assignedIps.add(existingClients[0].assignedIp); + + for (let i = 0; i < 5; i++) { + const c = new VpnClient({ transport: { transport: 'stdio' } }); + await c.start(); + const result = await c.connect({ + serverUrl: `ws://127.0.0.1:${serverPort}`, + serverPublicKey: keypair.publicKey, + keepaliveIntervalSecs: 3, + }); + expect(result.assignedIp).toStartWith('10.8.0.'); + assignedIps.add(result.assignedIp); + extraClients.push(c); + } + + // All IPs should be unique (6 total: original + 5 new) + expect(assignedIps.size).toEqual(6); + + // Server should see 6 clients + await waitFor(async () => { + const clients = await server.listClients(); + return clients.length === 6; + }); + const allClients = await server.listClients(); + expect(allClients.length).toEqual(6); +}); + +tap.test('client disconnect tracking', async () => { + // Disconnect 3 of the 5 extra clients + for (let i = 0; i < 3; i++) { + const c = extraClients[i]; + await c.disconnect(); + c.stop(); + } + + // Wait for server to detect disconnections + await waitFor(async () => { + const clients = await server.listClients(); + return clients.length === 3; + }, 15000); + + const clients = await server.listClients(); + expect(clients.length).toEqual(3); + + const stats = await server.getStatistics(); + expect(stats.totalConnections).toBeGreaterThanOrEqual(6); +}); + +tap.test('server-side client disconnection', async () => { + const clients = await server.listClients(); + // Pick one of the remaining extra clients (not the original) + const targetClient = clients.find((c) => { + // Find a client that belongs to extraClients[3] or extraClients[4] + return c.clientId !== clients[0].clientId; + }); + expect(targetClient).toBeTruthy(); + + await server.disconnectClient(targetClient!.clientId); + + // Wait for server to update + await waitFor(async () => { + const remaining = await server.listClients(); + return remaining.length === 2; + }); + + const remaining = await server.listClients(); + expect(remaining.length).toEqual(2); +}); + +tap.test('teardown: stop all', async () => { + // Stop the original client + await client.disconnect(); + client.stop(); + + // Stop remaining extra clients + for (const c of extraClients) { + if (c.running) { + try { + await c.disconnect(); + } catch { + // May already be disconnected + } + c.stop(); + } + } + + await delay(500); + + // Stop the server + await server.stopServer(); + server.stop(); + await delay(500); + + expect(server.running).toBeFalse(); +}); + +export default tap.start(); diff --git a/test/test.loadtest.node.ts b/test/test.loadtest.node.ts new file mode 100644 index 0000000..c6d20c8 --- /dev/null +++ b/test/test.loadtest.node.ts @@ -0,0 +1,357 @@ +import { tap, expect } from '@git.zone/tstest/tapbundle'; +import * as net from 'net'; +import * as stream from 'stream'; +import { VpnClient, VpnServer } from '../ts/index.js'; +import type { IVpnKeypair, IVpnServerConfig } from '../ts/index.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async function findFreePort(): Promise { + const server = net.createServer(); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const port = (server.address() as net.AddressInfo).port; + await new Promise((resolve) => server.close(() => resolve())); + return port; +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitFor( + fn: () => Promise, + timeoutMs: number = 10000, + pollMs: number = 500, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (await fn()) return; + await delay(pollMs); + } + throw new Error(`waitFor timed out after ${timeoutMs}ms`); +} + +// --------------------------------------------------------------------------- +// ThrottleProxy (adapted from remoteingress) +// --------------------------------------------------------------------------- + +class ThrottleTransform extends stream.Transform { + private bytesPerSec: number; + private bucket: number; + private lastRefill: number; + private destroyed_: boolean = false; + + constructor(bytesPerSecond: number) { + super(); + this.bytesPerSec = bytesPerSecond; + this.bucket = bytesPerSecond; + this.lastRefill = Date.now(); + } + + _transform(chunk: Buffer, _encoding: BufferEncoding, callback: stream.TransformCallback) { + if (this.destroyed_) return; + + const now = Date.now(); + const elapsed = (now - this.lastRefill) / 1000; + this.bucket = Math.min(this.bytesPerSec, this.bucket + elapsed * this.bytesPerSec); + this.lastRefill = now; + + if (chunk.length <= this.bucket) { + this.bucket -= chunk.length; + callback(null, chunk); + } else { + const deficit = chunk.length - this.bucket; + this.bucket = 0; + const delayMs = Math.min((deficit / this.bytesPerSec) * 1000, 1000); + setTimeout(() => { + if (this.destroyed_) { callback(); return; } + this.lastRefill = Date.now(); + this.bucket = 0; + callback(null, chunk); + }, delayMs); + } + } + + _destroy(err: Error | null, callback: (error: Error | null) => void) { + this.destroyed_ = true; + callback(err); + } +} + +interface ThrottleProxy { + server: net.Server; + close: () => Promise; +} + +async function startThrottleProxy( + listenPort: number, + targetHost: string, + targetPort: number, + bytesPerSecond: number, +): Promise { + const connections = new Set(); + const server = net.createServer((clientSock) => { + connections.add(clientSock); + const upstream = net.createConnection({ host: targetHost, port: targetPort }); + connections.add(upstream); + + const throttleUp = new ThrottleTransform(bytesPerSecond); + const throttleDown = new ThrottleTransform(bytesPerSecond); + + clientSock.pipe(throttleUp).pipe(upstream); + upstream.pipe(throttleDown).pipe(clientSock); + + let cleaned = false; + const cleanup = () => { + if (cleaned) return; + cleaned = true; + throttleUp.destroy(); + throttleDown.destroy(); + clientSock.destroy(); + upstream.destroy(); + connections.delete(clientSock); + connections.delete(upstream); + }; + clientSock.on('error', () => cleanup()); + upstream.on('error', () => cleanup()); + throttleUp.on('error', () => cleanup()); + throttleDown.on('error', () => cleanup()); + clientSock.on('close', () => cleanup()); + upstream.on('close', () => cleanup()); + }); + + await new Promise((resolve) => server.listen(listenPort, '127.0.0.1', resolve)); + return { + server, + close: async () => { + for (const c of connections) c.destroy(); + connections.clear(); + await new Promise((resolve) => server.close(() => resolve())); + }, + }; +} + +// --------------------------------------------------------------------------- +// Test state +// --------------------------------------------------------------------------- + +let server: VpnServer; +let serverPort: number; +let proxyPort: number; +let keypair: IVpnKeypair; +let throttle: ThrottleProxy; +const allClients: VpnClient[] = []; + +async function createConnectedClient(port: number): Promise { + const c = new VpnClient({ transport: { transport: 'stdio' } }); + await c.start(); + await c.connect({ + serverUrl: `ws://127.0.0.1:${port}`, + serverPublicKey: keypair.publicKey, + keepaliveIntervalSecs: 3, + }); + allClients.push(c); + return c; +} + +async function stopClient(c: VpnClient): Promise { + if (c.running) { + try { await c.disconnect(); } catch { /* already disconnected */ } + c.stop(); + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +tap.test('setup: start throttled VPN tunnel (1 MB/s)', async () => { + serverPort = await findFreePort(); + proxyPort = await findFreePort(); + + // Start VPN server + server = new VpnServer({ transport: { transport: 'stdio' } }); + const started = await server['bridge'].start(); + expect(started).toBeTrue(); + + keypair = await server.generateKeypair(); + const serverConfig: IVpnServerConfig = { + listenAddr: `127.0.0.1:${serverPort}`, + privateKey: keypair.privateKey, + publicKey: keypair.publicKey, + subnet: '10.8.0.0/24', + }; + await server['bridge'].sendCommand('start', { config: serverConfig }); + + const status = await server.getStatus(); + expect(status.state).toEqual('connected'); + + // Start throttle proxy: 1 MB/s + throttle = await startThrottleProxy(proxyPort, '127.0.0.1', serverPort, 1024 * 1024); +}); + +tap.test('throttled connection: handshake succeeds through throttle', async () => { + const client = await createConnectedClient(proxyPort); + + const status = await client.getStatus(); + expect(status.state).toEqual('connected'); + expect(status.assignedIp).toStartWith('10.8.0.'); + + await waitFor(async () => { + const clients = await server.listClients(); + return clients.length === 1; + }); +}); + +tap.test('sustained keepalive under throttle', async () => { + // Wait for at least 2 keepalive cycles (3s interval) + await delay(8000); + + const client = allClients[0]; + const stats = await client.getStatistics(); + expect(stats.keepalivesSent).toBeGreaterThanOrEqual(1); + expect(stats.keepalivesReceived).toBeGreaterThanOrEqual(1); + + // Throttle adds latency — RTT should be measurable + const quality = await client.getConnectionQuality(); + expect(quality.srttMs).toBeGreaterThanOrEqual(0); + expect(quality.jitterMs).toBeTypeofNumber(); +}); + +tap.test('3 concurrent throttled clients', async () => { + for (let i = 0; i < 3; i++) { + await createConnectedClient(proxyPort); + } + + // All 4 clients should be visible + await waitFor(async () => { + const clients = await server.listClients(); + return clients.length === 4; + }); + + const clients = await server.listClients(); + expect(clients.length).toEqual(4); + + // Verify all IPs are unique + const ips = new Set(clients.map((c) => c.assignedIp)); + expect(ips.size).toEqual(4); +}); + +tap.test('rate limiting combined with network throttle', async () => { + const clients = await server.listClients(); + const targetId = clients[0].clientId; + + // Set rate limit on first client + await server.setClientRateLimit(targetId, 500, 500); + const telemetry = await server.getClientTelemetry(targetId); + expect(telemetry.rateLimitBytesPerSec).toEqual(500); + expect(telemetry.burstBytes).toEqual(500); + + // Verify another client has no rate limit + const otherTelemetry = await server.getClientTelemetry(clients[1].clientId); + expect(otherTelemetry.rateLimitBytesPerSec).toBeNullOrUndefined(); + + // Clean up the rate limit + await server.removeClientRateLimit(targetId); +}); + +tap.test('burst waves: 3 waves of 3 clients', async () => { + const initialCount = (await server.listClients()).length; + + for (let wave = 0; wave < 3; wave++) { + const waveClients: VpnClient[] = []; + + // Connect 3 clients + for (let i = 0; i < 3; i++) { + const c = await createConnectedClient(proxyPort); + waveClients.push(c); + } + + // Verify all connected + await waitFor(async () => { + const all = await server.listClients(); + return all.length === initialCount + 3; + }); + + // Disconnect all wave clients + for (const c of waveClients) { + await stopClient(c); + } + + // Wait for server to detect disconnections + await waitFor(async () => { + const all = await server.listClients(); + return all.length === initialCount; + }, 15000); + + await delay(500); + } + + // Verify total connections accumulated + const stats = await server.getStatistics(); + expect(stats.totalConnections).toBeGreaterThanOrEqual(9 + initialCount); + + // Original clients still connected + const remaining = await server.listClients(); + expect(remaining.length).toEqual(initialCount); +}); + +tap.test('aggressive throttle: 10 KB/s', async () => { + // Close current throttle proxy and start an aggressive one + await throttle.close(); + const aggressivePort = await findFreePort(); + throttle = await startThrottleProxy(aggressivePort, '127.0.0.1', serverPort, 10 * 1024); + + // Connect a client through the aggressive throttle + const client = await createConnectedClient(aggressivePort); + const status = await client.getStatus(); + expect(status.state).toEqual('connected'); + + // Wait for keepalive exchange (might take longer due to throttle) + await delay(10000); + + const stats = await client.getStatistics(); + expect(stats.keepalivesSent).toBeGreaterThanOrEqual(1); + expect(stats.keepalivesReceived).toBeGreaterThanOrEqual(1); +}); + +tap.test('post-load health: direct connection still works', async () => { + // Server should still be healthy after all load tests + const serverStatus = await server.getStatus(); + expect(serverStatus.state).toEqual('connected'); + + // Connect one more client directly (no throttle) + const directClient = await createConnectedClient(serverPort); + const status = await directClient.getStatus(); + expect(status.state).toEqual('connected'); + + await delay(5000); + + const stats = await directClient.getStatistics(); + expect(stats.keepalivesSent).toBeGreaterThanOrEqual(1); +}); + +tap.test('teardown: stop all', async () => { + // Stop all clients + for (const c of allClients) { + await stopClient(c); + } + + await delay(500); + + // Close throttle proxy + if (throttle) { + await throttle.close(); + } + + // Stop server + await server.stopServer(); + server.stop(); + await delay(500); + + expect(server.running).toBeFalse(); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 7aa0685..4d78117 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartvpn', - version: '1.2.0', + version: '1.3.0', description: 'A VPN solution with TypeScript control plane and Rust data plane daemon' }