feat(edge,hub): add hub-controlled nftables firewall configuration for remote ingress edges
This commit is contained in:
@@ -316,6 +316,12 @@ async fn handle_request(
|
||||
serde_json::json!({ "listenPorts": listen_ports }),
|
||||
);
|
||||
}
|
||||
EdgeEvent::FirewallConfigUpdated { firewall_config } => {
|
||||
send_event(
|
||||
"firewallConfigUpdated",
|
||||
serde_json::json!({ "firewallConfig": firewall_config }),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -67,6 +67,8 @@ struct HandshakeConfig {
|
||||
listen_ports_udp: Vec<u16>,
|
||||
#[serde(default = "default_stun_interval")]
|
||||
stun_interval_secs: u64,
|
||||
#[serde(default)]
|
||||
firewall_config: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
fn default_stun_interval() -> u64 {
|
||||
@@ -80,6 +82,8 @@ struct ConfigUpdate {
|
||||
listen_ports: Vec<u16>,
|
||||
#[serde(default)]
|
||||
listen_ports_udp: Vec<u16>,
|
||||
#[serde(default)]
|
||||
firewall_config: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Events emitted by the edge.
|
||||
@@ -96,6 +100,8 @@ pub enum EdgeEvent {
|
||||
PortsAssigned { listen_ports: Vec<u16> },
|
||||
#[serde(rename_all = "camelCase")]
|
||||
PortsUpdated { listen_ports: Vec<u16> },
|
||||
#[serde(rename_all = "camelCase")]
|
||||
FirewallConfigUpdated { firewall_config: serde_json::Value },
|
||||
}
|
||||
|
||||
/// Edge status response.
|
||||
@@ -439,6 +445,11 @@ async fn handle_edge_frame(
|
||||
connection_token,
|
||||
bind_address,
|
||||
);
|
||||
if let Some(fw_config) = update.firewall_config {
|
||||
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
|
||||
firewall_config: fw_config,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
FRAME_PING => {
|
||||
@@ -569,6 +580,13 @@ async fn connect_to_hub_and_run(
|
||||
listen_ports: handshake.listen_ports.clone(),
|
||||
});
|
||||
|
||||
// Emit firewall config if present in handshake
|
||||
if let Some(fw_config) = handshake.firewall_config {
|
||||
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
|
||||
firewall_config: fw_config,
|
||||
});
|
||||
}
|
||||
|
||||
// Start STUN discovery
|
||||
let stun_interval = handshake.stun_interval_secs;
|
||||
let public_ip_clone = public_ip.clone();
|
||||
@@ -1309,6 +1327,13 @@ async fn connect_to_hub_and_run_quic_with_connection(
|
||||
listen_ports: handshake.listen_ports.clone(),
|
||||
});
|
||||
|
||||
// Emit firewall config if present in handshake
|
||||
if let Some(fw_config) = handshake.firewall_config {
|
||||
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
|
||||
firewall_config: fw_config,
|
||||
});
|
||||
}
|
||||
|
||||
// Start STUN discovery
|
||||
let stun_interval = handshake.stun_interval_secs;
|
||||
let public_ip_clone = public_ip.clone();
|
||||
|
||||
@@ -80,6 +80,8 @@ pub struct AllowedEdge {
|
||||
#[serde(default)]
|
||||
pub listen_ports_udp: Vec<u16>,
|
||||
pub stun_interval_secs: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub firewall_config: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Handshake response sent to edge after authentication.
|
||||
@@ -90,6 +92,8 @@ struct HandshakeResponse {
|
||||
#[serde(default)]
|
||||
listen_ports_udp: Vec<u16>,
|
||||
stun_interval_secs: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
firewall_config: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Configuration update pushed to a connected edge at runtime.
|
||||
@@ -99,6 +103,8 @@ pub struct EdgeConfigUpdate {
|
||||
pub listen_ports: Vec<u16>,
|
||||
#[serde(default)]
|
||||
pub listen_ports_udp: Vec<u16>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub firewall_config: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Runtime status of a connected edge.
|
||||
@@ -192,14 +198,17 @@ impl TunnelHub {
|
||||
for edge in &edges {
|
||||
if let Some(info) = connected.get(&edge.id) {
|
||||
// Check if ports changed compared to old config
|
||||
let ports_changed = match map.get(&edge.id) {
|
||||
Some(old) => old.listen_ports != edge.listen_ports || old.listen_ports_udp != edge.listen_ports_udp,
|
||||
let config_changed = match map.get(&edge.id) {
|
||||
Some(old) => old.listen_ports != edge.listen_ports
|
||||
|| old.listen_ports_udp != edge.listen_ports_udp
|
||||
|| old.firewall_config != edge.firewall_config,
|
||||
None => true, // newly allowed edge that's already connected
|
||||
};
|
||||
if ports_changed {
|
||||
if config_changed {
|
||||
let update = EdgeConfigUpdate {
|
||||
listen_ports: edge.listen_ports.clone(),
|
||||
listen_ports_udp: edge.listen_ports_udp.clone(),
|
||||
firewall_config: edge.firewall_config.clone(),
|
||||
};
|
||||
let _ = info.config_tx.try_send(update);
|
||||
}
|
||||
@@ -861,14 +870,14 @@ async fn handle_edge_connection(
|
||||
let secret = parts[2];
|
||||
|
||||
// Verify credentials and extract edge config
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config) = {
|
||||
let edges = allowed.read().await;
|
||||
match edges.get(&edge_id) {
|
||||
Some(edge) => {
|
||||
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
|
||||
return Err(format!("invalid secret for edge {}", edge_id).into());
|
||||
}
|
||||
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
|
||||
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300), edge.firewall_config.clone())
|
||||
}
|
||||
None => {
|
||||
return Err(format!("unknown edge {}", edge_id).into());
|
||||
@@ -887,6 +896,7 @@ async fn handle_edge_connection(
|
||||
listen_ports: listen_ports.clone(),
|
||||
listen_ports_udp: listen_ports_udp.clone(),
|
||||
stun_interval_secs,
|
||||
firewall_config,
|
||||
};
|
||||
let mut handshake_json = serde_json::to_string(&handshake)?;
|
||||
handshake_json.push('\n');
|
||||
@@ -1228,14 +1238,14 @@ async fn handle_edge_connection_quic(
|
||||
let secret = parts[2];
|
||||
|
||||
// Verify credentials
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config) = {
|
||||
let edges = allowed.read().await;
|
||||
match edges.get(&edge_id) {
|
||||
Some(edge) => {
|
||||
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
|
||||
return Err(format!("invalid secret for edge {}", edge_id).into());
|
||||
}
|
||||
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
|
||||
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300), edge.firewall_config.clone())
|
||||
}
|
||||
None => return Err(format!("unknown edge {}", edge_id).into()),
|
||||
}
|
||||
@@ -1252,6 +1262,7 @@ async fn handle_edge_connection_quic(
|
||||
listen_ports: listen_ports.clone(),
|
||||
listen_ports_udp: listen_ports_udp.clone(),
|
||||
stun_interval_secs,
|
||||
firewall_config,
|
||||
};
|
||||
let mut handshake_json = serde_json::to_string(&handshake)?;
|
||||
handshake_json.push('\n');
|
||||
@@ -1787,6 +1798,7 @@ mod tests {
|
||||
listen_ports: vec![443, 8080],
|
||||
listen_ports_udp: vec![],
|
||||
stun_interval_secs: 300,
|
||||
firewall_config: None,
|
||||
};
|
||||
let json = serde_json::to_value(&resp).unwrap();
|
||||
assert_eq!(json["listenPorts"], serde_json::json!([443, 8080]));
|
||||
@@ -1801,6 +1813,7 @@ mod tests {
|
||||
let update = EdgeConfigUpdate {
|
||||
listen_ports: vec![80, 443],
|
||||
listen_ports_udp: vec![53],
|
||||
firewall_config: None,
|
||||
};
|
||||
let json = serde_json::to_value(&update).unwrap();
|
||||
assert_eq!(json["listenPorts"], serde_json::json!([80, 443]));
|
||||
|
||||
Reference in New Issue
Block a user