Compare commits

..

6 Commits

Author SHA1 Message Date
5304bbb486 v4.15.3
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-27 11:34:31 +00:00
ac993dd5a3 fix(core): harden UDP session handling, QUIC control message validation, and bridge process cleanup 2026-03-27 11:34:31 +00:00
0b2a83ddb6 v4.15.2
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:06:28 +00:00
3c5ea6bdc5 fix(readme): adjust tunnel diagram alignment in the README 2026-03-26 17:06:28 +00:00
3dea43400b v4.15.1
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:05:38 +00:00
8fa3d414dd fix(readme): clarify unified runtime configuration and firewall update behavior 2026-03-26 17:05:38 +00:00
10 changed files with 156 additions and 62 deletions

View File

@@ -1,5 +1,24 @@
# Changelog # Changelog
## 2026-03-27 - 4.15.3 - fix(core)
harden UDP session handling, QUIC control message validation, and bridge process cleanup
- cap UDP session creation and drop excess datagrams with warnings to prevent unbounded session growth
- periodically prune closed datagram sessions on the hub and reject oversized QUIC control messages to avoid resource exhaustion
- clean up spawned edge and hub bridge processes on startup failure, remove listeners on stop, and avoid restarting after shutdown during backoff
## 2026-03-26 - 4.15.2 - fix(readme)
adjust tunnel diagram alignment in the README
- Improves formatting consistency in the Hub/Edge topology diagram.
## 2026-03-26 - 4.15.1 - fix(readme)
clarify unified runtime configuration and firewall update behavior
- Updates the architecture and feature descriptions to reflect that ports, firewall rules, and rate limits are pushed together in a single config update
- Clarifies that firewall configuration is delivered via FRAME_CONFIG on handshake and subsequent updates, with atomic full-rule replacement at the edge
- Simplifies and reorganizes README wording around edge and hub responsibilities without changing implementation behavior
## 2026-03-26 - 4.15.0 - feat(edge,hub) ## 2026-03-26 - 4.15.0 - feat(edge,hub)
add hub-controlled nftables firewall configuration for remote ingress edges add hub-controlled nftables firewall configuration for remote ingress edges

View File

@@ -1,6 +1,6 @@
{ {
"name": "@serve.zone/remoteingress", "name": "@serve.zone/remoteingress",
"version": "4.15.0", "version": "4.15.3",
"private": false, "private": false,
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.", "description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

View File

@@ -17,28 +17,25 @@ pnpm install @serve.zone/remoteingress
`@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface: `@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface:
``` ```
TLS or QUIC Tunnel TLS or QUIC Tunnel
┌─────────────────────┐ ◄══════════════════════════► ┌─────────────────────┐ ┌─────────────────────┐ ◄══════════════════════════► ┌─────────────────────┐
│ Network Edge │ TCP+TLS: frame mux │ Private Cluster │ │ Network Edge │ TCP+TLS: frame mux │ Private Cluster │
│ │ QUIC: native streams │ │ │ │ QUIC: native streams │ │
│ RemoteIngressEdge │ UDP: QUIC datagrams │ RemoteIngressHub │ │ RemoteIngressEdge │ UDP: QUIC datagrams │ RemoteIngressHub │
│ │ │ │ │ │ │ │
Accepts TCP & UDP │ Forwards to TCP/UDP listeners│ ◄─── FRAME_CONFIG pushes ─── • Port assignments
on hub-assigned │ │ SmartProxy on • nftables firewall│ ports + firewall rules │ • Firewall config
ports │ local ports • Rate limitingat any time │ • Rate limit rules
│ │ └─────────────────────┘ └─────────────────────┘
│ 🔥 nftables rules ◄── firewall config pushed ── Configures edge
│ applied locally │ via FRAME_CONFIG firewalls remotely │ │ TCP + UDP from end users
└─────────────────────┘ └─────────────────────┘ Internet DcRouter / SmartProxy
▲ │
│ TCP + UDP from end users ▼
Internet DcRouter / SmartProxy
``` ```
| Component | Role | | Component | Role |
|-----------|------| |-----------|------|
| **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Runs as root. Listens on TCP and UDP ports assigned by the hub, accepts connections/datagrams, and tunnels them to the hub. Applies nftables firewall rules pushed by the hub for IP blocking and rate limiting. Ports and firewall config are hot-reloadable at runtime. | | **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Runs as root. Listens on hub-assigned TCP/UDP ports, tunnels traffic to the hub, and applies hub-pushed nftables rules (IP blocking, rate limiting). All config is hot-reloadable at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. Pushes firewall configuration to edges. | | **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. Pushes all edge config (ports, firewall) via a single API. |
| **Rust Binary** (`remoteingress-bin`) | The performance-critical networking core. Managed via `@push.rocks/smartrust` RustBridge IPC — you never interact with it directly. Cross-compiled for `linux/amd64` and `linux/arm64`. | | **Rust Binary** (`remoteingress-bin`) | The performance-critical networking core. Managed via `@push.rocks/smartrust` RustBridge IPC — you never interact with it directly. Cross-compiled for `linux/amd64` and `linux/arm64`. |
### ⚡ Key Features ### ⚡ Key Features
@@ -46,14 +43,14 @@ pnpm install @serve.zone/remoteingress
- **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking) - **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair - **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary) - **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- **Hub-controlled firewall** — push nftables rules (IP blocking, rate limiting, custom rules) from the hub to edges via `@push.rocks/smartnftables` - **Hub-controlled firewall** — push nftables rules (IP blocking, rate limiting, custom firewall rules) to edges as part of the same config update that assigns ports — powered by `@push.rocks/smartnftables`
- **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel - **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel
- **QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency - **QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- **Shared-secret authentication** — edges must present valid credentials to connect - **Shared-secret authentication** — edges must present valid credentials to connect
- **Connection tokens** — encode all connection details into a single opaque base64url string - **Connection tokens** — encode all connection details into a single opaque base64url string
- **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN - **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- **Auto-reconnect** with exponential backoff if the tunnel drops - **Auto-reconnect** with exponential backoff if the tunnel drops
- **Dynamic port configuration** — the hub assigns TCP and UDP listen ports per edge, hot-reloadable at runtime - **Dynamic runtime configuration** — the hub pushes ports, firewall rules, and rate limits to edges at any time via a single `updateAllowedEdges()` call
- **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring - **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
- **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue - **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse - **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
@@ -205,18 +202,13 @@ const data = decodeConnectionToken(token);
Tokens are base64url-encoded — safe for environment variables, CLI arguments, and config files. Tokens are base64url-encoded — safe for environment variables, CLI arguments, and config files.
## 🔥 Hub-Controlled Firewall ## 🔥 Firewall Config
Edges run as root and use `@push.rocks/smartnftables` to apply nftables rules pushed from the hub. This gives you centralized control over network-level security at every edge node. The `firewallConfig` field in `updateAllowedEdges()` works exactly like `listenPorts` — it travels in the same `FRAME_CONFIG` frame, is delivered on initial handshake and on every subsequent update, and is applied atomically at the edge using `@push.rocks/smartnftables`. Each update fully replaces the previous ruleset.
### How It Works Since edges run as root, the rules are applied directly to the Linux kernel via nftables. If the edge isn't root or nftables is unavailable, it logs a warning and continues — the tunnel works fine, just without kernel-level firewall rules.
1. The hub includes `firewallConfig` when calling `updateAllowedEdges()` ### Config Structure
2. The config flows through the Rust binary as an opaque JSON blob via `FRAME_CONFIG`
3. The edge TypeScript layer receives it and applies the rules using `SmartNftables`
4. On each config update, all previous rules are replaced atomically (full replacement, not incremental)
### Firewall Config Structure
```typescript ```typescript
interface IFirewallConfig { interface IFirewallConfig {
@@ -272,10 +264,6 @@ await hub.updateAllowedEdges([
]); ]);
``` ```
### Graceful Degradation
If the edge isn't running as root or nftables is unavailable, the SmartNftables initialization logs a warning and continues operating normally — the tunnel works fine, just without kernel-level firewall rules.
## API Reference ## API Reference
### `RemoteIngressHub` ### `RemoteIngressHub`

View File

@@ -954,7 +954,10 @@ fn apply_udp_port_config(
} else { } else {
// New session — allocate stream_id and send UDP_OPEN // New session — allocate stream_id and send UDP_OPEN
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed); let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid); if sessions.insert(key, sid).is_none() {
log::warn!("UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string(); let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port(); let client_port = client_addr.port();
@@ -1681,7 +1684,10 @@ fn apply_udp_port_config_quic(
} else { } else {
// New session — send PROXY v2 header via control-style datagram // New session — send PROXY v2 header via control-style datagram
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed); let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid); if sessions.insert(key, sid).is_none() {
log::warn!("QUIC UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string(); let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port(); let client_port = client_addr.port();

View File

@@ -1374,8 +1374,15 @@ async fn handle_edge_connection_quic(
let dgram_edge_id = edge_id.clone(); let dgram_edge_id = edge_id.clone();
let dgram_token = edge_token.clone(); let dgram_token = edge_token.clone();
let dgram_handle = tokio::spawn(async move { let dgram_handle = tokio::spawn(async move {
let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
cleanup_interval.tick().await; // consume initial tick
loop { loop {
tokio::select! { tokio::select! {
// Periodic sweep: prune sessions whose task has exited (receiver dropped)
_ = cleanup_interval.tick() => {
let mut s = dgram_sessions.lock().await;
s.retain(|_id, tx| !tx.is_closed());
}
datagram = dgram_conn.read_datagram() => { datagram = dgram_conn.read_datagram() => {
match datagram { match datagram {
Ok(data) => { Ok(data) => {

View File

@@ -76,6 +76,11 @@ pub async fn write_ctrl_message(
Ok(()) Ok(())
} }
/// Maximum size for a QUIC control message payload (64 KB).
/// Control messages (CONFIG, PING, PONG) are small; this guards against
/// a malicious peer sending a crafted length field to trigger OOM.
const MAX_CTRL_MESSAGE_SIZE: usize = 65536;
/// Read a control message from a QUIC recv stream. /// Read a control message from a QUIC recv stream.
/// Returns (msg_type, payload). Returns None on EOF. /// Returns (msg_type, payload). Returns None on EOF.
pub async fn read_ctrl_message( pub async fn read_ctrl_message(
@@ -93,6 +98,12 @@ pub async fn read_ctrl_message(
} }
let msg_type = header[0]; let msg_type = header[0];
let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize; let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize;
if len > MAX_CTRL_MESSAGE_SIZE {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("control message too large: {} bytes (max {})", len, MAX_CTRL_MESSAGE_SIZE),
));
}
let mut payload = vec![0u8; len]; let mut payload = vec![0u8; len];
if len > 0 { if len > 0 {
recv.read_exact(&mut payload).await.map_err(|e| { recv.read_exact(&mut payload).await.map_err(|e| {

View File

@@ -17,7 +17,7 @@ pub struct UdpSession {
pub last_activity: Instant, pub last_activity: Instant,
} }
/// Manages UDP sessions with idle timeout expiry. /// Manages UDP sessions with idle timeout expiry and a maximum session count.
pub struct UdpSessionManager { pub struct UdpSessionManager {
/// Forward map: session key → session data. /// Forward map: session key → session data.
sessions: HashMap<UdpSessionKey, UdpSession>, sessions: HashMap<UdpSessionKey, UdpSession>,
@@ -25,14 +25,21 @@ pub struct UdpSessionManager {
by_stream_id: HashMap<u32, UdpSessionKey>, by_stream_id: HashMap<u32, UdpSessionKey>,
/// Idle timeout duration. /// Idle timeout duration.
idle_timeout: std::time::Duration, idle_timeout: std::time::Duration,
/// Maximum number of concurrent sessions (prevents unbounded growth from floods).
max_sessions: usize,
} }
impl UdpSessionManager { impl UdpSessionManager {
pub fn new(idle_timeout: std::time::Duration) -> Self { pub fn new(idle_timeout: std::time::Duration) -> Self {
Self::with_max_sessions(idle_timeout, 65536)
}
pub fn with_max_sessions(idle_timeout: std::time::Duration, max_sessions: usize) -> Self {
Self { Self {
sessions: HashMap::new(), sessions: HashMap::new(),
by_stream_id: HashMap::new(), by_stream_id: HashMap::new(),
idle_timeout, idle_timeout,
max_sessions,
} }
} }
@@ -57,8 +64,12 @@ impl UdpSessionManager {
Some(session) Some(session)
} }
/// Insert a new session. Returns a mutable reference to it. /// Insert a new session. Returns `None` if the session limit has been reached.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession { pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> Option<&mut UdpSession> {
// Allow re-insertion of existing keys (update), but reject truly new sessions at capacity
if !self.sessions.contains_key(&key) && self.sessions.len() >= self.max_sessions {
return None;
}
let session = UdpSession { let session = UdpSession {
stream_id, stream_id,
client_addr: key.client_addr, client_addr: key.client_addr,
@@ -66,7 +77,7 @@ impl UdpSessionManager {
last_activity: Instant::now(), last_activity: Instant::now(),
}; };
self.by_stream_id.insert(stream_id, key); self.by_stream_id.insert(stream_id, key);
self.sessions.entry(key).or_insert(session) Some(self.sessions.entry(key).or_insert(session))
} }
/// Remove a session by stream_id. /// Remove a session by stream_id.
@@ -118,7 +129,7 @@ mod tests {
fn test_insert_and_lookup() { fn test_insert_and_lookup() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1); assert!(mgr.insert(key, 1).is_some());
assert_eq!(mgr.len(), 1); assert_eq!(mgr.len(), 1);
assert!(mgr.get_mut(&key).is_some()); assert!(mgr.get_mut(&key).is_some());
@@ -129,7 +140,7 @@ mod tests {
fn test_client_addr_for_stream() { fn test_client_addr_for_stream() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 42); assert!(mgr.insert(key, 42).is_some());
assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000))); assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000)));
assert_eq!(mgr.client_addr_for_stream(99), None); assert_eq!(mgr.client_addr_for_stream(99), None);
@@ -139,7 +150,7 @@ mod tests {
fn test_remove_by_stream_id() { fn test_remove_by_stream_id() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1); assert!(mgr.insert(key, 1).is_some());
let removed = mgr.remove_by_stream_id(1); let removed = mgr.remove_by_stream_id(1);
assert!(removed.is_some()); assert!(removed.is_some());
@@ -159,8 +170,8 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_millis(50)); let mut mgr = UdpSessionManager::new(Duration::from_millis(50));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 }; let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
mgr.insert(key1, 1); assert!(mgr.insert(key1, 1).is_some());
mgr.insert(key2, 2); assert!(mgr.insert(key2, 2).is_some());
// Nothing expired yet // Nothing expired yet
assert!(mgr.expire_idle().is_empty()); assert!(mgr.expire_idle().is_empty());
@@ -178,7 +189,7 @@ mod tests {
async fn test_activity_prevents_expiry() { async fn test_activity_prevents_expiry() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(100)); let mut mgr = UdpSessionManager::new(Duration::from_millis(100));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1); assert!(mgr.insert(key, 1).is_some());
// Touch session at 50ms (before 100ms timeout) // Touch session at 50ms (before 100ms timeout)
tokio::time::sleep(Duration::from_millis(50)).await; tokio::time::sleep(Duration::from_millis(50)).await;
@@ -200,11 +211,35 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60)); let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 }; let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 }; let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 };
mgr.insert(key1, 1); assert!(mgr.insert(key1, 1).is_some());
mgr.insert(key2, 2); assert!(mgr.insert(key2, 2).is_some());
assert_eq!(mgr.len(), 2); assert_eq!(mgr.len(), 2);
assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1); assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1);
assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2); assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2);
} }
#[test]
fn test_max_sessions_limit() {
let mut mgr = UdpSessionManager::with_max_sessions(Duration::from_secs(60), 2);
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
let key3 = UdpSessionKey { client_addr: addr(5002), dest_port: 53 };
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
// Third insert should be rejected (at capacity)
assert!(mgr.insert(key3, 3).is_none());
assert_eq!(mgr.len(), 2);
// Re-inserting an existing key should succeed (update, not new)
assert!(mgr.insert(key1, 1).is_some());
assert_eq!(mgr.len(), 2);
// After removing one, a new insert should succeed
mgr.remove_by_stream_id(1);
assert_eq!(mgr.len(), 1);
assert!(mgr.insert(key3, 3).is_some());
assert_eq!(mgr.len(), 2);
}
} }

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@serve.zone/remoteingress', name: '@serve.zone/remoteingress',
version: '4.15.0', version: '4.15.3',
description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.' description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.'
} }

View File

@@ -222,14 +222,21 @@ export class RemoteIngressEdge extends EventEmitter {
this.bridge.removeListener('exit', this.handleCrashRecovery); this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery); this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', { try {
hubHost: edgeConfig.hubHost, await this.bridge.sendCommand('startEdge', {
hubPort: edgeConfig.hubPort ?? 8443, hubHost: edgeConfig.hubHost,
edgeId: edgeConfig.edgeId, hubPort: edgeConfig.hubPort ?? 8443,
secret: edgeConfig.secret, edgeId: edgeConfig.edgeId,
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}), secret: edgeConfig.secret,
...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}), ...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
}); ...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}),
});
} catch (err) {
// Clean up the spawned process to avoid orphaning it
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.kill();
throw err;
}
this.started = true; this.started = true;
this.restartAttempts = 0; this.restartAttempts = 0;
@@ -282,6 +289,9 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = false; this.started = false;
} }
this.savedConfig = null; this.savedConfig = null;
// Remove all listeners to prevent memory buildup
this.bridge.removeAllListeners();
this.removeAllListeners();
} }
/** /**
@@ -326,6 +336,10 @@ export class RemoteIngressEdge extends EventEmitter {
} }
await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs)); await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs));
// Re-check after backoff — stop() may have been called during the wait
if (this.stopping || !this.savedConfig) {
return;
}
this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS); this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS);
this.restartAttempts++; this.restartAttempts++;

View File

@@ -156,13 +156,20 @@ export class RemoteIngressHub extends EventEmitter {
this.bridge.removeListener('exit', this.handleCrashRecovery); this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery); this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startHub', { try {
tunnelPort: config.tunnelPort ?? 8443, await this.bridge.sendCommand('startHub', {
targetHost: config.targetHost ?? '127.0.0.1', tunnelPort: config.tunnelPort ?? 8443,
...(config.tls?.certPem && config.tls?.keyPem targetHost: config.targetHost ?? '127.0.0.1',
? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem } ...(config.tls?.certPem && config.tls?.keyPem
: {}), ? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem }
}); : {}),
});
} catch (err) {
// Clean up the spawned process to avoid orphaning it
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.kill();
throw err;
}
this.started = true; this.started = true;
this.restartAttempts = 0; this.restartAttempts = 0;
@@ -186,6 +193,9 @@ export class RemoteIngressHub extends EventEmitter {
} }
this.savedConfig = null; this.savedConfig = null;
this.savedEdges = []; this.savedEdges = [];
// Remove all listeners to prevent memory buildup
this.bridge.removeAllListeners();
this.removeAllListeners();
} }
/** /**
@@ -232,6 +242,10 @@ export class RemoteIngressHub extends EventEmitter {
} }
await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs)); await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs));
// Re-check after backoff — stop() may have been called during the wait
if (this.stopping || !this.savedConfig) {
return;
}
this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS); this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS);
this.restartAttempts++; this.restartAttempts++;