feat(edge,protocol,test): add configurable edge bind address and expand flow-control test coverage
This commit is contained in:
@@ -32,6 +32,10 @@ pub struct EdgeConfig {
|
||||
pub hub_port: u16,
|
||||
pub edge_id: String,
|
||||
pub secret: String,
|
||||
/// Optional bind address for TCP listeners (defaults to "0.0.0.0").
|
||||
/// Useful for testing on localhost where edge and upstream share the same machine.
|
||||
#[serde(default)]
|
||||
pub bind_address: Option<String>,
|
||||
}
|
||||
|
||||
/// Handshake config received from hub after authentication.
|
||||
@@ -416,6 +420,7 @@ async fn connect_to_hub_and_run(
|
||||
|
||||
// Start TCP listeners for initial ports (hot-reloadable)
|
||||
let mut port_listeners: HashMap<u16, JoinHandle<()>> = HashMap::new();
|
||||
let bind_address = config.bind_address.as_deref().unwrap_or("0.0.0.0");
|
||||
apply_port_config(
|
||||
&handshake.listen_ports,
|
||||
&mut port_listeners,
|
||||
@@ -426,6 +431,7 @@ async fn connect_to_hub_and_run(
|
||||
next_stream_id,
|
||||
&config.edge_id,
|
||||
connection_token,
|
||||
bind_address,
|
||||
);
|
||||
|
||||
// Heartbeat: liveness timeout detects silent hub failures
|
||||
@@ -492,6 +498,7 @@ async fn connect_to_hub_and_run(
|
||||
next_stream_id,
|
||||
&config.edge_id,
|
||||
connection_token,
|
||||
bind_address,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -557,6 +564,7 @@ fn apply_port_config(
|
||||
next_stream_id: &Arc<AtomicU32>,
|
||||
edge_id: &str,
|
||||
connection_token: &CancellationToken,
|
||||
bind_address: &str,
|
||||
) {
|
||||
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
|
||||
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
|
||||
@@ -579,8 +587,9 @@ fn apply_port_config(
|
||||
let edge_id = edge_id.to_string();
|
||||
let port_token = connection_token.child_token();
|
||||
|
||||
let bind_addr = bind_address.to_string();
|
||||
let handle = tokio::spawn(async move {
|
||||
let listener = match TcpListener::bind(("0.0.0.0", port)).await {
|
||||
let listener = match TcpListener::bind((bind_addr.as_str(), port)).await {
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
log::error!("Failed to bind port {}: {}", port, e);
|
||||
@@ -840,6 +849,7 @@ mod tests {
|
||||
hub_port: 9999,
|
||||
edge_id: "e1".to_string(),
|
||||
secret: "sec".to_string(),
|
||||
bind_address: None,
|
||||
};
|
||||
let json = serde_json::to_string(&config).unwrap();
|
||||
let back: EdgeConfig = serde_json::from_str(&json).unwrap();
|
||||
@@ -955,6 +965,7 @@ mod tests {
|
||||
hub_port: 8443,
|
||||
edge_id: "test-edge".to_string(),
|
||||
secret: "test-secret".to_string(),
|
||||
bind_address: None,
|
||||
});
|
||||
let status = edge.get_status().await;
|
||||
assert!(!status.running);
|
||||
@@ -971,6 +982,7 @@ mod tests {
|
||||
hub_port: 8443,
|
||||
edge_id: "e".to_string(),
|
||||
secret: "s".to_string(),
|
||||
bind_address: None,
|
||||
});
|
||||
let rx1 = edge.take_event_rx().await;
|
||||
assert!(rx1.is_some());
|
||||
@@ -985,6 +997,7 @@ mod tests {
|
||||
hub_port: 8443,
|
||||
edge_id: "e".to_string(),
|
||||
secret: "s".to_string(),
|
||||
bind_address: None,
|
||||
});
|
||||
edge.stop().await; // should not panic
|
||||
let status = edge.get_status().await;
|
||||
|
||||
@@ -5,3 +5,6 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["io-util"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["io-util", "macros", "rt"] }
|
||||
|
||||
@@ -345,4 +345,134 @@ mod tests {
|
||||
assert_eq!(&pong[0..4], &0u32.to_be_bytes());
|
||||
assert_eq!(pong.len(), FRAME_HEADER_SIZE);
|
||||
}
|
||||
|
||||
// --- compute_window_for_stream_count tests ---
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_zero_streams() {
|
||||
// 0 streams treated as 1: 32MB/1 = 32MB → clamped to 4MB max
|
||||
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_one_stream() {
|
||||
// 32MB/1 = 32MB → clamped to 4MB max
|
||||
assert_eq!(compute_window_for_stream_count(1), INITIAL_STREAM_WINDOW);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_at_max_boundary() {
|
||||
// 32MB/8 = 4MB = exactly INITIAL_STREAM_WINDOW
|
||||
assert_eq!(compute_window_for_stream_count(8), INITIAL_STREAM_WINDOW);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_just_below_max() {
|
||||
// 32MB/9 = 3,728,270 — first value below INITIAL_STREAM_WINDOW
|
||||
let w = compute_window_for_stream_count(9);
|
||||
assert!(w < INITIAL_STREAM_WINDOW);
|
||||
assert_eq!(w, (32 * 1024 * 1024u64 / 9) as u32);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_16_streams() {
|
||||
// 32MB/16 = 2MB
|
||||
assert_eq!(compute_window_for_stream_count(16), 2 * 1024 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_100_streams() {
|
||||
// 32MB/100 = 335,544 bytes (~327KB)
|
||||
let w = compute_window_for_stream_count(100);
|
||||
assert_eq!(w, (32 * 1024 * 1024u64 / 100) as u32);
|
||||
assert!(w > 64 * 1024); // above floor
|
||||
assert!(w < INITIAL_STREAM_WINDOW as u32); // below ceiling
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_200_streams() {
|
||||
// 32MB/200 = 167,772 bytes (~163KB), above 64KB floor
|
||||
let w = compute_window_for_stream_count(200);
|
||||
assert_eq!(w, (32 * 1024 * 1024u64 / 200) as u32);
|
||||
assert!(w > 64 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_500_streams() {
|
||||
// 32MB/500 = 67,108 bytes (~65.5KB), just above 64KB floor
|
||||
let w = compute_window_for_stream_count(500);
|
||||
assert_eq!(w, (32 * 1024 * 1024u64 / 500) as u32);
|
||||
assert!(w > 64 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_at_min_boundary() {
|
||||
// 32MB/512 = 65,536 = exactly 64KB floor
|
||||
assert_eq!(compute_window_for_stream_count(512), 64 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_below_min_clamped() {
|
||||
// 32MB/513 = 65,408 → clamped up to 64KB
|
||||
assert_eq!(compute_window_for_stream_count(513), 64 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_1000_streams() {
|
||||
// 32MB/1000 = 33,554 → clamped to 64KB
|
||||
assert_eq!(compute_window_for_stream_count(1000), 64 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_max_u32() {
|
||||
// Extreme: u32::MAX streams → tiny value → clamped to 64KB
|
||||
assert_eq!(compute_window_for_stream_count(u32::MAX), 64 * 1024);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_monotonically_decreasing() {
|
||||
// Window should decrease (or stay same) as stream count increases
|
||||
let mut prev = compute_window_for_stream_count(1);
|
||||
for n in [2, 5, 10, 50, 100, 200, 500, 512, 1000] {
|
||||
let w = compute_window_for_stream_count(n);
|
||||
assert!(w <= prev, "window increased from {} to {} at n={}", prev, w, n);
|
||||
prev = w;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adaptive_window_total_budget_bounded() {
|
||||
// active × per_stream_window should never exceed 32MB (+ clamp overhead for high N)
|
||||
for n in [1, 10, 50, 100, 200, 500] {
|
||||
let w = compute_window_for_stream_count(n);
|
||||
let total = w as u64 * n as u64;
|
||||
assert!(total <= 32 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
|
||||
}
|
||||
}
|
||||
|
||||
// --- encode/decode window_update roundtrip ---
|
||||
|
||||
#[test]
|
||||
fn test_window_update_roundtrip() {
|
||||
for &increment in &[0u32, 1, 64 * 1024, INITIAL_STREAM_WINDOW, MAX_WINDOW_SIZE, u32::MAX] {
|
||||
let frame = encode_window_update(42, FRAME_WINDOW_UPDATE, increment);
|
||||
assert_eq!(frame[4], FRAME_WINDOW_UPDATE);
|
||||
let decoded = decode_window_update(&frame[FRAME_HEADER_SIZE..]);
|
||||
assert_eq!(decoded, Some(increment));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_window_update_back_roundtrip() {
|
||||
let frame = encode_window_update(7, FRAME_WINDOW_UPDATE_BACK, 1234567);
|
||||
assert_eq!(frame[4], FRAME_WINDOW_UPDATE_BACK);
|
||||
assert_eq!(decode_window_update(&frame[FRAME_HEADER_SIZE..]), Some(1234567));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_window_update_malformed() {
|
||||
assert_eq!(decode_window_update(&[]), None);
|
||||
assert_eq!(decode_window_update(&[0, 0, 0]), None);
|
||||
assert_eq!(decode_window_update(&[0, 0, 0, 0, 0]), None);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user