From 6abfd2ff2a7a9c6f042aa876bdd9c58b55fc42a2 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 19 Mar 2026 10:44:22 +0000 Subject: [PATCH] feat(core,edge,hub,transport): add QUIC tunnel transport support with optional edge transport selection --- changelog.md | 8 + rust/Cargo.lock | 555 +++++++++++++++++ rust/crates/remoteingress-core/Cargo.toml | 1 + rust/crates/remoteingress-core/src/edge.rs | 561 +++++++++++++++++- rust/crates/remoteingress-core/src/hub.rs | 442 +++++++++++++- rust/crates/remoteingress-core/src/lib.rs | 1 + .../remoteingress-core/src/transport/mod.rs | 22 + .../remoteingress-core/src/transport/quic.rs | 191 ++++++ test/test.quic.node.ts | 283 +++++++++ ts/00_commitinfo_data.ts | 2 +- ts/classes.remoteingressedge.ts | 4 + 11 files changed, 2051 insertions(+), 19 deletions(-) create mode 100644 rust/crates/remoteingress-core/src/transport/mod.rs create mode 100644 rust/crates/remoteingress-core/src/transport/quic.rs create mode 100644 test/test.quic.node.ts diff --git a/changelog.md b/changelog.md index e85163f..9cec4c5 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-19 - 4.10.0 - feat(core,edge,hub,transport) +add QUIC tunnel transport support with optional edge transport selection + +- adds a shared transport module with QUIC configuration helpers, control message framing, and PROXY header handling +- enables the hub to accept QUIC connections on the tunnel port alongside existing TCP/TLS support +- adds edge transportMode configuration with quic and quicWithFallback options and propagates it through restarts +- includes end-to-end QUIC transport tests covering large payloads and concurrent streams + ## 2026-03-18 - 4.9.1 - fix(readme) document QoS tiers, heartbeat frames, and adaptive flow control in the protocol overview diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 9f7dfcb..aed8671 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -95,6 +95,12 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + [[package]] name = "bytes" version = "1.11.1" @@ -113,12 +119,24 @@ dependencies = [ "shlex", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "clap" version = "4.5.58" @@ -174,6 +192,32 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "deranged" version = "0.5.6" @@ -222,6 +266,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fastbloom" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7f34442dbe69c60fe8eaf58a8cafff81a1f278816d8ab4db255b3bef4ac3c4" +dependencies = [ + "getrandom 0.3.4", + "libm", + "rand", + "siphasher", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -253,8 +309,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -264,9 +322,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", ] [[package]] @@ -311,6 +371,28 @@ dependencies = [ "syn", ] +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.34" @@ -321,12 +403,28 @@ dependencies = [ "libc", ] +[[package]] +name = "js-sys" +version = "0.3.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +[[package]] +name = "libm" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" + [[package]] name = "libmimalloc-sys" version = "0.1.44" @@ -352,6 +450,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "memchr" version = "2.8.0" @@ -396,6 +500,12 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "parking_lot" version = "0.12.5" @@ -456,6 +566,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -465,6 +584,63 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2 0.6.2", + "thiserror 2.0.18", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "bytes", + "fastbloom", + "getrandom 0.3.4", + "lru-slab", + "rand", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "slab", + "thiserror 2.0.18", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2 0.6.2", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.44" @@ -480,6 +656,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +dependencies = [ + "getrandom 0.3.4", +] + [[package]] name = "rcgen" version = "0.13.2" @@ -553,6 +758,7 @@ version = "2.0.0" dependencies = [ "bytes", "log", + "quinn", "rcgen", "remoteingress-protocol", "rustls", @@ -589,6 +795,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustls" version = "0.23.36" @@ -605,6 +817,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -620,9 +844,37 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ + "web-time", "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +dependencies = [ + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.61.2", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.103.9" @@ -635,12 +887,59 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "schannel" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.228" @@ -700,6 +999,18 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" @@ -749,6 +1060,46 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.47" @@ -768,6 +1119,21 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" +[[package]] +name = "tinyvec" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.49.0" @@ -819,6 +1185,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "log", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", +] + [[package]] name = "unicode-ident" version = "1.0.24" @@ -837,6 +1223,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -852,12 +1248,94 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-root-certs" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -885,6 +1363,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -918,6 +1411,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -930,6 +1429,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -942,6 +1447,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -966,6 +1477,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -978,6 +1495,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -990,6 +1513,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -1002,6 +1531,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -1029,6 +1564,26 @@ dependencies = [ "time", ] +[[package]] +name = "zerocopy" +version = "0.8.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zeroize" version = "1.8.2" diff --git a/rust/crates/remoteingress-core/Cargo.toml b/rust/crates/remoteingress-core/Cargo.toml index 5c92934..2291c18 100644 --- a/rust/crates/remoteingress-core/Cargo.toml +++ b/rust/crates/remoteingress-core/Cargo.toml @@ -16,3 +16,4 @@ log = "0.4" rustls-pemfile = "2" tokio-util = "0.7" socket2 = "0.5" +quinn = "0.11" diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index a498de6..521d427 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize}; use bytes::Bytes; use remoteingress_protocol::*; +use crate::transport::TransportMode; +use crate::transport::quic as quic_transport; type EdgeTlsStream = tokio_rustls::client::TlsStream; @@ -47,6 +49,9 @@ pub struct EdgeConfig { /// Useful for testing on localhost where edge and upstream share the same machine. #[serde(default)] pub bind_address: Option, + /// Transport mode for the tunnel connection (defaults to TcpTls). + #[serde(default)] + pub transport_mode: Option, } /// Handshake config received from hub after authentication. @@ -210,6 +215,8 @@ async fn edge_main_loop( let mut backoff_ms: u64 = 1000; let max_backoff_ms: u64 = 30000; + let transport_mode = config.transport_mode.unwrap_or(TransportMode::TcpTls); + // Build TLS config ONCE outside the reconnect loop — preserves session // cache across reconnections for TLS session resumption (saves 1 RTT). let tls_config = rustls::ClientConfig::builder() @@ -218,24 +225,77 @@ async fn edge_main_loop( .with_no_client_auth(); let connector = TlsConnector::from(Arc::new(tls_config)); + // Build QUIC client config ONCE (shares session cache across reconnections). + let quic_client_config = quic_transport::build_quic_client_config(); + let quic_endpoint = if matches!(transport_mode, TransportMode::Quic | TransportMode::QuicWithFallback) { + match quinn::Endpoint::client("0.0.0.0:0".parse().unwrap()) { + Ok(mut ep) => { + ep.set_default_client_config(quic_client_config); + Some(ep) + } + Err(e) => { + log::error!("Failed to create QUIC endpoint: {}", e); + None + } + } + } else { + None + }; + loop { // Create a per-connection child token let connection_token = cancel_token.child_token(); - // Try to connect to hub - let result = connect_to_hub_and_run( - &config, - &connected, - &public_ip, - &active_streams, - &next_stream_id, - &event_tx, - &listen_ports, - &mut shutdown_rx, - &connection_token, - &connector, - ) - .await; + // Try to connect to hub using the configured transport + let result = match transport_mode { + TransportMode::TcpTls => { + connect_to_hub_and_run( + &config, &connected, &public_ip, &active_streams, &next_stream_id, + &event_tx, &listen_ports, &mut shutdown_rx, &connection_token, &connector, + ).await + } + TransportMode::Quic => { + if let Some(ep) = &quic_endpoint { + connect_to_hub_and_run_quic( + &config, &connected, &public_ip, &active_streams, &next_stream_id, + &event_tx, &listen_ports, &mut shutdown_rx, &connection_token, ep, + ).await + } else { + EdgeLoopResult::Reconnect("quic_endpoint_unavailable".to_string()) + } + } + TransportMode::QuicWithFallback => { + if let Some(ep) = &quic_endpoint { + // Try QUIC first with a 5s timeout + let quic_result = tokio::time::timeout( + Duration::from_secs(5), + connect_to_hub_quic_handshake(&config, ep, &connection_token), + ).await; + match quic_result { + Ok(Ok(quic_conn)) => { + connect_to_hub_and_run_quic_with_connection( + &config, &connected, &public_ip, &active_streams, &next_stream_id, + &event_tx, &listen_ports, &mut shutdown_rx, &connection_token, + quic_conn, + ).await + } + _ => { + log::info!("QUIC connect failed or timed out, falling back to TCP+TLS"); + connect_to_hub_and_run( + &config, &connected, &public_ip, &active_streams, &next_stream_id, + &event_tx, &listen_ports, &mut shutdown_rx, &connection_token, &connector, + ).await + } + } + } else { + // No QUIC endpoint, fall back to TCP+TLS + connect_to_hub_and_run( + &config, &connected, &public_ip, &active_streams, &next_stream_id, + &event_tx, &listen_ports, &mut shutdown_rx, &connection_token, &connector, + ).await + } + } + }; // Cancel connection token to kill all orphaned tasks from this cycle connection_token.cancel(); @@ -942,6 +1002,437 @@ async fn handle_client_connection( let _ = edge_id; // used for logging context } +// ===== QUIC transport functions ===== + +/// Perform QUIC handshake only (used by QuicWithFallback to test connectivity). +async fn connect_to_hub_quic_handshake( + config: &EdgeConfig, + endpoint: &quinn::Endpoint, + _connection_token: &CancellationToken, +) -> Result> { + let addr = format!("{}:{}", config.hub_host, config.hub_port); + let server_addr: std::net::SocketAddr = tokio::net::lookup_host(&addr) + .await? + .next() + .ok_or("DNS resolution failed")?; + // QUIC/TLS SNI requires a hostname, not an IP address. + // If hub_host is an IP, use the same fallback as the TCP+TLS path. + let server_name = match rustls::pki_types::ServerName::try_from(config.hub_host.as_str()) { + Ok(rustls::pki_types::ServerName::DnsName(_)) => config.hub_host.clone(), + _ => "remoteingress-hub".to_string(), + }; + let connection = endpoint.connect(server_addr, &server_name)?.await?; + Ok(connection) +} + +/// QUIC edge: connect to hub, authenticate, and run the stream multiplexer. +async fn connect_to_hub_and_run_quic( + config: &EdgeConfig, + connected: &Arc>, + public_ip: &Arc>>, + active_streams: &Arc, + next_stream_id: &Arc, + event_tx: &mpsc::Sender, + listen_ports: &Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + connection_token: &CancellationToken, + endpoint: &quinn::Endpoint, +) -> EdgeLoopResult { + // Establish QUIC connection + let quic_conn = match connect_to_hub_quic_handshake(config, endpoint, connection_token).await { + Ok(c) => c, + Err(e) => { + log::error!("QUIC connect failed: {}", e); + return EdgeLoopResult::Reconnect(format!("quic_connect_failed: {}", e)); + } + }; + + connect_to_hub_and_run_quic_with_connection( + config, connected, public_ip, active_streams, next_stream_id, + event_tx, listen_ports, shutdown_rx, connection_token, quic_conn, + ).await +} + +/// QUIC edge: run with an already-established QUIC connection. +async fn connect_to_hub_and_run_quic_with_connection( + config: &EdgeConfig, + connected: &Arc>, + public_ip: &Arc>>, + active_streams: &Arc, + next_stream_id: &Arc, + event_tx: &mpsc::Sender, + listen_ports: &Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + connection_token: &CancellationToken, + quic_conn: quinn::Connection, +) -> EdgeLoopResult { + log::info!("QUIC connection established to {}", quic_conn.remote_address()); + + // Open control stream (first bidirectional stream) + let (mut ctrl_send, mut ctrl_recv) = match quic_conn.open_bi().await { + Ok(s) => s, + Err(e) => { + log::error!("Failed to open QUIC control stream: {}", e); + return EdgeLoopResult::Reconnect(format!("quic_ctrl_open_failed: {}", e)); + } + }; + + // Auth handshake on control stream (same protocol as TCP+TLS) + let auth_line = format!("EDGE {} {}\n", config.edge_id, config.secret); + if let Err(e) = ctrl_send.write_all(auth_line.as_bytes()).await { + return EdgeLoopResult::Reconnect(format!("quic_auth_write_failed: {}", e)); + } + + // Read handshake response (newline-delimited JSON) + let mut handshake_bytes = Vec::with_capacity(512); + let mut byte = [0u8; 1]; + loop { + match ctrl_recv.read_exact(&mut byte).await { + Ok(()) => { + handshake_bytes.push(byte[0]); + if byte[0] == b'\n' { break; } + if handshake_bytes.len() > 8192 { + return EdgeLoopResult::Reconnect("quic_handshake_too_long".to_string()); + } + } + Err(e) => { + log::error!("QUIC handshake read failed: {}", e); + return EdgeLoopResult::Reconnect(format!("quic_handshake_read_failed: {}", e)); + } + } + } + + let handshake_line = String::from_utf8_lossy(&handshake_bytes); + let handshake: HandshakeConfig = match serde_json::from_str(handshake_line.trim()) { + Ok(h) => h, + Err(e) => { + log::error!("Invalid QUIC handshake response: {}", e); + return EdgeLoopResult::Reconnect(format!("quic_handshake_invalid: {}", e)); + } + }; + + log::info!( + "QUIC handshake from hub: ports {:?}, stun_interval {}s", + handshake.listen_ports, + handshake.stun_interval_secs + ); + + *connected.write().await = true; + let _ = event_tx.try_send(EdgeEvent::TunnelConnected); + log::info!("Connected to hub via QUIC at {}", quic_conn.remote_address()); + + *listen_ports.write().await = handshake.listen_ports.clone(); + let _ = event_tx.try_send(EdgeEvent::PortsAssigned { + listen_ports: handshake.listen_ports.clone(), + }); + + // Start STUN discovery + let stun_interval = handshake.stun_interval_secs; + let public_ip_clone = public_ip.clone(); + let event_tx_clone = event_tx.clone(); + let stun_token = connection_token.clone(); + let stun_handle = tokio::spawn(async move { + loop { + tokio::select! { + ip_result = crate::stun::discover_public_ip() => { + if let Some(ip) = ip_result { + let mut pip = public_ip_clone.write().await; + let changed = pip.as_ref() != Some(&ip); + *pip = Some(ip.clone()); + if changed { + let _ = event_tx_clone.try_send(EdgeEvent::PublicIpDiscovered { ip }); + } + } + } + _ = stun_token.cancelled() => break, + } + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(stun_interval)) => {} + _ = stun_token.cancelled() => break, + } + } + }); + + // Start TCP listeners for the assigned ports. + // For QUIC, each client connection opens a new QUIC bidirectional stream. + let mut port_listeners: HashMap> = HashMap::new(); + let bind_address = config.bind_address.as_deref().unwrap_or("0.0.0.0"); + apply_port_config_quic( + &handshake.listen_ports, + &mut port_listeners, + &quic_conn, + active_streams, + next_stream_id, + &config.edge_id, + connection_token, + bind_address, + ); + + // Monitor control stream for config updates, and connection health. + // Also handle shutdown signals. + let result = 'quic_loop: loop { + tokio::select! { + // Read control messages from hub + ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => { + match ctrl_msg { + Ok(Some((msg_type, payload))) => { + match msg_type { + quic_transport::CTRL_CONFIG => { + if let Ok(update) = serde_json::from_slice::(&payload) { + log::info!("QUIC config update from hub: ports {:?}", update.listen_ports); + *listen_ports.write().await = update.listen_ports.clone(); + let _ = event_tx.try_send(EdgeEvent::PortsUpdated { + listen_ports: update.listen_ports.clone(), + }); + apply_port_config_quic( + &update.listen_ports, + &mut port_listeners, + &quic_conn, + active_streams, + next_stream_id, + &config.edge_id, + connection_token, + bind_address, + ); + } + } + quic_transport::CTRL_PING => { + // Respond with PONG on control stream + if let Err(e) = quic_transport::write_ctrl_message( + &mut ctrl_send, quic_transport::CTRL_PONG, &[], + ).await { + log::error!("Failed to send QUIC PONG: {}", e); + break 'quic_loop EdgeLoopResult::Reconnect( + format!("quic_pong_failed: {}", e), + ); + } + } + _ => { + log::warn!("Unknown QUIC control message type: {}", msg_type); + } + } + } + Ok(None) => { + log::info!("Hub closed QUIC control stream (EOF)"); + break 'quic_loop EdgeLoopResult::Reconnect("quic_ctrl_eof".to_string()); + } + Err(e) => { + log::error!("QUIC control stream read error: {}", e); + break 'quic_loop EdgeLoopResult::Reconnect( + format!("quic_ctrl_error: {}", e), + ); + } + } + } + // QUIC connection closed + reason = quic_conn.closed() => { + log::info!("QUIC connection closed: {}", reason); + break 'quic_loop EdgeLoopResult::Reconnect(format!("quic_closed: {}", reason)); + } + // Shutdown signal + _ = connection_token.cancelled() => { + if shutdown_rx.try_recv().is_ok() { + break 'quic_loop EdgeLoopResult::Shutdown; + } + break 'quic_loop EdgeLoopResult::Shutdown; + } + } + }; + + // Cleanup + connection_token.cancel(); + stun_handle.abort(); + for (_, h) in port_listeners.drain() { + h.abort(); + } + + // Graceful QUIC close + quic_conn.close(quinn::VarInt::from_u32(0), b"shutdown"); + + result +} + +/// Apply port config for QUIC transport: spawn TCP listeners that open QUIC streams. +fn apply_port_config_quic( + new_ports: &[u16], + port_listeners: &mut HashMap>, + quic_conn: &quinn::Connection, + active_streams: &Arc, + next_stream_id: &Arc, + edge_id: &str, + connection_token: &CancellationToken, + bind_address: &str, +) { + let new_set: std::collections::HashSet = new_ports.iter().copied().collect(); + let old_set: std::collections::HashSet = port_listeners.keys().copied().collect(); + + // Remove ports no longer needed + for &port in old_set.difference(&new_set) { + if let Some(handle) = port_listeners.remove(&port) { + log::info!("Stopping QUIC listener on port {}", port); + handle.abort(); + } + } + + // Add new ports + for &port in new_set.difference(&old_set) { + let quic_conn = quic_conn.clone(); + let active_streams = active_streams.clone(); + let next_stream_id = next_stream_id.clone(); + let _edge_id = edge_id.to_string(); + let port_token = connection_token.child_token(); + let bind_addr = bind_address.to_string(); + + let handle = tokio::spawn(async move { + let listener = match TcpListener::bind((bind_addr.as_str(), port)).await { + Ok(l) => l, + Err(e) => { + log::error!("Failed to bind port {} (QUIC): {}", port, e); + return; + } + }; + log::info!("Listening on port {} (QUIC transport)", port); + + loop { + tokio::select! { + accept_result = listener.accept() => { + match accept_result { + Ok((client_stream, client_addr)) => { + let _ = client_stream.set_nodelay(true); + let ka = socket2::TcpKeepalive::new() + .with_time(Duration::from_secs(60)); + #[cfg(target_os = "linux")] + let ka = ka.with_interval(Duration::from_secs(60)); + let _ = socket2::SockRef::from(&client_stream).set_tcp_keepalive(&ka); + + let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed); + let quic_conn = quic_conn.clone(); + let active_streams = active_streams.clone(); + let client_token = port_token.child_token(); + + active_streams.fetch_add(1, Ordering::Relaxed); + + tokio::spawn(async move { + handle_client_connection_quic( + client_stream, + client_addr, + stream_id, + port, + quic_conn, + client_token, + ).await; + // Saturating decrement + loop { + let current = active_streams.load(Ordering::Relaxed); + if current == 0 { break; } + if active_streams.compare_exchange_weak( + current, current - 1, + Ordering::Relaxed, Ordering::Relaxed, + ).is_ok() { + break; + } + } + }); + } + Err(e) => { + log::error!("Accept error on port {} (QUIC): {}", port, e); + } + } + } + _ = port_token.cancelled() => { + log::info!("Port {} QUIC listener cancelled", port); + break; + } + } + } + }); + port_listeners.insert(port, handle); + } +} + +/// Handle a single client connection via QUIC transport. +/// Opens a new QUIC bidirectional stream, sends the PROXY header, +/// then bidirectionally copies data between the client TCP socket and the QUIC stream. +async fn handle_client_connection_quic( + client_stream: TcpStream, + client_addr: std::net::SocketAddr, + stream_id: u32, + dest_port: u16, + quic_conn: quinn::Connection, + client_token: CancellationToken, +) { + let client_ip = client_addr.ip().to_string(); + let client_port = client_addr.port(); + let edge_ip = "0.0.0.0"; + + // Open a new QUIC bidirectional stream for this client connection + let (mut quic_send, mut quic_recv) = match quic_conn.open_bi().await { + Ok(s) => s, + Err(e) => { + log::error!("Stream {} failed to open QUIC bi stream: {}", stream_id, e); + return; + } + }; + + // Send PROXY header as first bytes on the stream + let proxy_header = build_proxy_v1_header(&client_ip, edge_ip, client_port, dest_port); + if let Err(e) = quic_transport::write_proxy_header(&mut quic_send, &proxy_header).await { + log::error!("Stream {} failed to write PROXY header: {}", stream_id, e); + return; + } + + let (mut client_read, mut client_write) = client_stream.into_split(); + + // Task: QUIC -> client (download direction) + let dl_token = client_token.clone(); + let mut dl_task = tokio::spawn(async move { + let mut buf = vec![0u8; 32768]; + loop { + tokio::select! { + read_result = quic_recv.read(&mut buf) => { + match read_result { + Ok(Some(n)) => { + if client_write.write_all(&buf[..n]).await.is_err() { + break; + } + } + Ok(None) => break, // QUIC stream finished + Err(_) => break, + } + } + _ = dl_token.cancelled() => break, + } + } + let _ = client_write.shutdown().await; + }); + + // Task: client -> QUIC (upload direction) + let mut buf = vec![0u8; 32768]; + loop { + tokio::select! { + read_result = client_read.read(&mut buf) => { + match read_result { + Ok(0) => break, // client EOF + Ok(n) => { + if quic_send.write_all(&buf[..n]).await.is_err() { + break; + } + } + Err(_) => break, + } + } + _ = client_token.cancelled() => break, + } + } + + // Wait for download task to finish before closing the QUIC stream + let _ = tokio::time::timeout(Duration::from_secs(300), &mut dl_task).await; + + // Gracefully close the QUIC send stream + let _ = quic_send.finish(); + dl_task.abort(); +} + #[cfg(test)] mod tests { use super::*; @@ -971,6 +1462,7 @@ mod tests { edge_id: "e1".to_string(), secret: "sec".to_string(), bind_address: None, + transport_mode: None, }; let json = serde_json::to_string(&config).unwrap(); let back: EdgeConfig = serde_json::from_str(&json).unwrap(); @@ -988,6 +1480,44 @@ mod tests { assert_eq!(hc.stun_interval_secs, 120); } + #[test] + fn test_edge_config_transport_mode_deserialize() { + let json = r#"{ + "hubHost": "hub.test", + "hubPort": 8443, + "edgeId": "e1", + "secret": "s", + "transportMode": "quic" + }"#; + let config: EdgeConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.transport_mode, Some(TransportMode::Quic)); + } + + #[test] + fn test_edge_config_transport_mode_default() { + let json = r#"{ + "hubHost": "hub.test", + "hubPort": 8443, + "edgeId": "e1", + "secret": "s" + }"#; + let config: EdgeConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.transport_mode, None); + } + + #[test] + fn test_edge_config_transport_mode_quic_with_fallback() { + let json = r#"{ + "hubHost": "hub.test", + "hubPort": 8443, + "edgeId": "e1", + "secret": "s", + "transportMode": "quicWithFallback" + }"#; + let config: EdgeConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.transport_mode, Some(TransportMode::QuicWithFallback)); + } + #[test] fn test_handshake_config_default_stun_interval() { let json = r#"{"listenPorts": [443]}"#; @@ -1088,6 +1618,7 @@ mod tests { edge_id: "test-edge".to_string(), secret: "test-secret".to_string(), bind_address: None, + transport_mode: None, }); let status = edge.get_status().await; assert!(!status.running); @@ -1105,6 +1636,7 @@ mod tests { edge_id: "e".to_string(), secret: "s".to_string(), bind_address: None, + transport_mode: None, }); let rx1 = edge.take_event_rx().await; assert!(rx1.is_some()); @@ -1120,6 +1652,7 @@ mod tests { edge_id: "e".to_string(), secret: "s".to_string(), bind_address: None, + transport_mode: None, }); edge.stop().await; // should not panic let status = edge.get_status().await; diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 91b3c68..7706c36 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; use bytes::Bytes; use remoteingress_protocol::*; +use crate::transport::quic as quic_transport; type HubTlsStream = tokio_rustls::server::TlsStream; @@ -216,14 +217,35 @@ impl TunnelHub { } } - /// Start the hub — listen for TLS connections from edges. + /// Start the hub — listen for TLS connections (TCP) and QUIC connections (UDP) from edges. pub async fn start(&self) -> Result<(), Box> { let config = self.config.read().await.clone(); let tls_config = build_tls_config(&config)?; - let acceptor = TlsAcceptor::from(Arc::new(tls_config)); + let acceptor = TlsAcceptor::from(Arc::new(tls_config.clone())); let listener = TcpListener::bind(("0.0.0.0", config.tunnel_port)).await?; - log::info!("Hub listening on port {}", config.tunnel_port); + log::info!("Hub listening on TCP port {}", config.tunnel_port); + + // Start QUIC endpoint on the same port (UDP) + let quic_endpoint = match quic_transport::build_quic_server_config(tls_config) { + Ok(quic_server_config) => { + let bind_addr: std::net::SocketAddr = ([0, 0, 0, 0], config.tunnel_port).into(); + match quinn::Endpoint::server(quic_server_config, bind_addr) { + Ok(ep) => { + log::info!("Hub listening on QUIC/UDP port {}", config.tunnel_port); + Some(ep) + } + Err(e) => { + log::warn!("Failed to start QUIC endpoint: {} (QUIC disabled)", e); + None + } + } + } + Err(e) => { + log::warn!("Failed to build QUIC server config: {} (QUIC disabled)", e); + None + } + }; let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); *self.shutdown_tx.lock().await = Some(shutdown_tx); @@ -236,12 +258,62 @@ impl TunnelHub { let hub_token = self.cancel_token.clone(); tokio::spawn(async move { + // Spawn QUIC acceptor as a separate task + let quic_handle = if let Some(quic_ep) = quic_endpoint { + let allowed_q = allowed.clone(); + let connected_q = connected.clone(); + let event_tx_q = event_tx.clone(); + let target_q = target_host.clone(); + let hub_token_q = hub_token.clone(); + Some(tokio::spawn(async move { + loop { + tokio::select! { + incoming = quic_ep.accept() => { + match incoming { + Some(incoming) => { + let allowed = allowed_q.clone(); + let connected = connected_q.clone(); + let event_tx = event_tx_q.clone(); + let target = target_q.clone(); + let edge_token = hub_token_q.child_token(); + let peer_addr = incoming.remote_address().ip().to_string(); + tokio::spawn(async move { + // Accept the QUIC connection + let quic_conn = match incoming.await { + Ok(c) => c, + Err(e) => { + log::error!("QUIC connection error: {}", e); + return; + } + }; + if let Err(e) = handle_edge_connection_quic( + quic_conn, allowed, connected, event_tx, target, edge_token, peer_addr, + ).await { + log::error!("QUIC edge connection error: {}", e); + } + }); + } + None => { + log::info!("QUIC endpoint closed"); + break; + } + } + } + _ = hub_token_q.cancelled() => break, + } + } + })) + } else { + None + }; + + // TCP+TLS acceptor loop loop { tokio::select! { result = listener.accept() => { match result { Ok((stream, addr)) => { - log::info!("Edge connection from {}", addr); + log::info!("Edge connection from {} (TCP+TLS)", addr); let acceptor = acceptor.clone(); let allowed = allowed.clone(); let connected = connected.clone(); @@ -272,6 +344,11 @@ impl TunnelHub { } } } + + // Abort QUIC acceptor if running + if let Some(h) = quic_handle { + h.abort(); + } }); Ok(()) @@ -956,6 +1033,363 @@ fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { diff == 0 } +// ===== QUIC transport functions for hub ===== + +/// Handle an edge connection arriving via QUIC. +/// The first bidirectional stream is the control stream (auth + config). +/// Subsequent bidirectional streams are tunneled client connections. +async fn handle_edge_connection_quic( + quic_conn: quinn::Connection, + allowed: Arc>>, + connected: Arc>>, + event_tx: mpsc::Sender, + target_host: String, + edge_token: CancellationToken, + peer_addr: String, +) -> Result<(), Box> { + log::info!("QUIC edge connection from {}", peer_addr); + + // Accept the control stream (first bidirectional stream from edge) + let (mut ctrl_send, mut ctrl_recv) = match quic_conn.accept_bi().await { + Ok(s) => s, + Err(e) => return Err(format!("QUIC control stream accept failed: {}", e).into()), + }; + + // Read auth line from control stream + let mut auth_buf = Vec::with_capacity(512); + loop { + let mut byte = [0u8; 1]; + match ctrl_recv.read_exact(&mut byte).await { + Ok(()) => { + if byte[0] == b'\n' { break; } + auth_buf.push(byte[0]); + if auth_buf.len() > 4096 { + return Err("QUIC auth line too long".into()); + } + } + Err(e) => return Err(format!("QUIC auth read failed: {}", e).into()), + } + } + let auth_line = String::from_utf8(auth_buf) + .map_err(|_| "QUIC auth line not valid UTF-8")?; + let auth_line = auth_line.trim(); + + let parts: Vec<&str> = auth_line.splitn(3, ' ').collect(); + if parts.len() != 3 || parts[0] != "EDGE" { + return Err("invalid QUIC auth line".into()); + } + + let edge_id = parts[1].to_string(); + let secret = parts[2]; + + // Verify credentials + let (listen_ports, stun_interval_secs) = { + let edges = allowed.read().await; + match edges.get(&edge_id) { + Some(edge) => { + if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) { + return Err(format!("invalid secret for edge {}", edge_id).into()); + } + (edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300)) + } + None => return Err(format!("unknown edge {}", edge_id).into()), + } + }; + + log::info!("QUIC edge {} authenticated from {}", edge_id, peer_addr); + let _ = event_tx.try_send(HubEvent::EdgeConnected { + edge_id: edge_id.clone(), + peer_addr: peer_addr.clone(), + }); + + // Send handshake response on control stream + let handshake = HandshakeResponse { + listen_ports: listen_ports.clone(), + stun_interval_secs, + }; + let mut handshake_json = serde_json::to_string(&handshake)?; + handshake_json.push('\n'); + ctrl_send.write_all(handshake_json.as_bytes()).await + .map_err(|e| format!("QUIC handshake write failed: {}", e))?; + + // Track this edge + let edge_stream_count = Arc::new(AtomicU32::new(0)); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let (config_tx, mut config_rx) = mpsc::channel::(16); + + { + let mut edges = connected.lock().await; + if let Some(old) = edges.remove(&edge_id) { + log::info!("QUIC edge {} reconnected, cancelling old connection", edge_id); + old.cancel_token.cancel(); + } + edges.insert( + edge_id.clone(), + ConnectedEdgeInfo { + connected_at: now, + peer_addr, + edge_stream_count: edge_stream_count.clone(), + config_tx, + cancel_token: edge_token.clone(), + }, + ); + } + + let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE)); + + // Spawn task to accept data streams (tunneled client connections) + let data_stream_conn = quic_conn.clone(); + let data_target = target_host.clone(); + let data_edge_id = edge_id.clone(); + let data_event_tx = event_tx.clone(); + let data_semaphore = stream_semaphore.clone(); + let data_stream_count = edge_stream_count.clone(); + let data_token = edge_token.clone(); + let data_handle = tokio::spawn(async move { + let mut stream_id_counter: u32 = 0; + loop { + tokio::select! { + bi_result = data_stream_conn.accept_bi() => { + match bi_result { + Ok((quic_send, quic_recv)) => { + // Check stream limit + let permit = match data_semaphore.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => { + log::warn!("QUIC edge {} exceeded max streams, rejecting", data_edge_id); + // Drop the streams to reject + drop(quic_send); + drop(quic_recv); + continue; + } + }; + + stream_id_counter += 1; + let stream_id = stream_id_counter; + let target = data_target.clone(); + let edge_id = data_edge_id.clone(); + let event_tx = data_event_tx.clone(); + let stream_count = data_stream_count.clone(); + let stream_token = data_token.child_token(); + + let _ = event_tx.try_send(HubEvent::StreamOpened { + edge_id: edge_id.clone(), + stream_id, + }); + + stream_count.fetch_add(1, Ordering::Relaxed); + tokio::spawn(async move { + let _permit = permit; + handle_quic_stream( + quic_send, quic_recv, stream_id, + &target, &edge_id, stream_token, + ).await; + stream_count.fetch_sub(1, Ordering::Relaxed); + let _ = event_tx.try_send(HubEvent::StreamClosed { + edge_id, + stream_id, + }); + }); + } + Err(e) => { + log::info!("QUIC edge {} accept_bi ended: {}", data_edge_id, e); + break; + } + } + } + _ = data_token.cancelled() => break, + } + } + }); + + // Control stream loop: forward config updates and handle PONG + let disconnect_reason; + loop { + tokio::select! { + // Send config updates from hub to edge + update = config_rx.recv() => { + match update { + Some(update) => { + if let Ok(payload) = serde_json::to_vec(&update) { + if let Err(e) = quic_transport::write_ctrl_message( + &mut ctrl_send, quic_transport::CTRL_CONFIG, &payload, + ).await { + log::error!("QUIC config send to edge {} failed: {}", edge_id, e); + disconnect_reason = format!("quic_config_send_failed: {}", e); + break; + } + log::info!("Sent QUIC config update to edge {}: ports {:?}", edge_id, update.listen_ports); + } + } + None => { + disconnect_reason = "config_channel_closed".to_string(); + break; + } + } + } + // Read control messages from edge (mainly PONG responses) + ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => { + match ctrl_msg { + Ok(Some((msg_type, _payload))) => { + match msg_type { + quic_transport::CTRL_PONG => { + log::debug!("Received QUIC PONG from edge {}", edge_id); + } + _ => { + log::warn!("Unexpected QUIC control message type {} from edge {}", msg_type, edge_id); + } + } + } + Ok(None) => { + log::info!("QUIC edge {} control stream EOF", edge_id); + disconnect_reason = "quic_ctrl_eof".to_string(); + break; + } + Err(e) => { + log::error!("QUIC edge {} control stream error: {}", edge_id, e); + disconnect_reason = format!("quic_ctrl_error: {}", e); + break; + } + } + } + // QUIC connection closed + reason = quic_conn.closed() => { + log::info!("QUIC connection to edge {} closed: {}", edge_id, reason); + disconnect_reason = format!("quic_closed: {}", reason); + break; + } + // Hub-initiated cancellation + _ = edge_token.cancelled() => { + log::info!("QUIC edge {} cancelled by hub", edge_id); + disconnect_reason = "cancelled_by_hub".to_string(); + break; + } + } + } + + // Cleanup + edge_token.cancel(); + data_handle.abort(); + quic_conn.close(quinn::VarInt::from_u32(0), b"hub_shutdown"); + + { + let mut edges = connected.lock().await; + edges.remove(&edge_id); + } + let _ = event_tx.try_send(HubEvent::EdgeDisconnected { + edge_id, + reason: disconnect_reason, + }); + + Ok(()) +} + +/// Handle a single tunneled client connection arriving via a QUIC bidirectional stream. +/// Reads the PROXY header, connects to SmartProxy, and pipes data bidirectionally. +async fn handle_quic_stream( + mut quic_send: quinn::SendStream, + mut quic_recv: quinn::RecvStream, + stream_id: u32, + target_host: &str, + _edge_id: &str, + stream_token: CancellationToken, +) { + // Read PROXY header from the beginning of the stream + let proxy_header = match quic_transport::read_proxy_header(&mut quic_recv).await { + Ok(h) => h, + Err(e) => { + log::error!("QUIC stream {} failed to read PROXY header: {}", stream_id, e); + return; + } + }; + + let dest_port = parse_dest_port_from_proxy(&proxy_header).unwrap_or(443); + + // Connect to SmartProxy + let mut upstream = match tokio::time::timeout( + Duration::from_secs(10), + TcpStream::connect((target_host, dest_port)), + ).await { + Ok(Ok(s)) => s, + Ok(Err(e)) => { + log::error!("QUIC stream {} connect to {}:{} failed: {}", stream_id, target_host, dest_port, e); + return; + } + Err(_) => { + log::error!("QUIC stream {} connect to {}:{} timed out", stream_id, target_host, dest_port); + return; + } + }; + + let _ = upstream.set_nodelay(true); + // Send PROXY header to SmartProxy + if let Err(e) = upstream.write_all(proxy_header.as_bytes()).await { + log::error!("QUIC stream {} failed to write PROXY header to upstream: {}", stream_id, e); + return; + } + + let (mut up_read, mut up_write) = upstream.into_split(); + + // Task: QUIC -> upstream (edge data to SmartProxy) + let writer_token = stream_token.clone(); + let writer_task = tokio::spawn(async move { + let mut buf = vec![0u8; 32768]; + loop { + tokio::select! { + read_result = quic_recv.read(&mut buf) => { + match read_result { + Ok(Some(n)) => { + let write_result = tokio::select! { + r = tokio::time::timeout( + Duration::from_secs(60), + up_write.write_all(&buf[..n]), + ) => r, + _ = writer_token.cancelled() => break, + }; + match write_result { + Ok(Ok(())) => {} + Ok(Err(_)) => break, + Err(_) => break, + } + } + Ok(None) => break, // QUIC stream finished + Err(_) => break, + } + } + _ = writer_token.cancelled() => break, + } + } + let _ = up_write.shutdown().await; + }); + + // Task: upstream -> QUIC (SmartProxy data to edge) + let mut buf = vec![0u8; 32768]; + loop { + tokio::select! { + read_result = up_read.read(&mut buf) => { + match read_result { + Ok(0) => break, + Ok(n) => { + if quic_send.write_all(&buf[..n]).await.is_err() { + break; + } + } + Err(_) => break, + } + } + _ = stream_token.cancelled() => break, + } + } + + // Gracefully close the QUIC send stream + let _ = quic_send.finish(); + writer_task.abort(); +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/crates/remoteingress-core/src/lib.rs b/rust/crates/remoteingress-core/src/lib.rs index 9213086..5ebeb53 100644 --- a/rust/crates/remoteingress-core/src/lib.rs +++ b/rust/crates/remoteingress-core/src/lib.rs @@ -1,5 +1,6 @@ pub mod hub; pub mod edge; pub mod stun; +pub mod transport; pub use remoteingress_protocol as protocol; diff --git a/rust/crates/remoteingress-core/src/transport/mod.rs b/rust/crates/remoteingress-core/src/transport/mod.rs new file mode 100644 index 0000000..0b2c46a --- /dev/null +++ b/rust/crates/remoteingress-core/src/transport/mod.rs @@ -0,0 +1,22 @@ +pub mod quic; + +use serde::{Deserialize, Serialize}; + +/// Transport mode for the tunnel connection between edge and hub. +/// +/// - `TcpTls`: TCP + TLS with frame-based multiplexing via TunnelIo (default). +/// - `Quic`: QUIC with native stream multiplexing (one QUIC stream per tunneled connection). +/// - `QuicWithFallback`: Try QUIC first, fall back to TCP+TLS if UDP is blocked. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum TransportMode { + TcpTls, + Quic, + QuicWithFallback, +} + +impl Default for TransportMode { + fn default() -> Self { + TransportMode::TcpTls + } +} diff --git a/rust/crates/remoteingress-core/src/transport/quic.rs b/rust/crates/remoteingress-core/src/transport/quic.rs new file mode 100644 index 0000000..0cf46ac --- /dev/null +++ b/rust/crates/remoteingress-core/src/transport/quic.rs @@ -0,0 +1,191 @@ +use std::sync::Arc; + +/// QUIC control stream message types (reuses frame type constants for consistency). +pub const CTRL_CONFIG: u8 = 0x06; +pub const CTRL_PING: u8 = 0x07; +pub const CTRL_PONG: u8 = 0x08; + +/// Header size for control stream messages: [type:1][length:4] = 5 bytes. +pub const CTRL_HEADER_SIZE: usize = 5; + +/// Build a quinn ClientConfig that skips server certificate verification +/// (auth is via shared secret, same as the TCP+TLS path). +pub fn build_quic_client_config() -> quinn::ClientConfig { + let mut tls_config = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoCertVerifier)) + .with_no_client_auth(); + + // QUIC mandates ALPN negotiation (RFC 9001 §8.1). + // Must match the server's ALPN protocol. + tls_config.alpn_protocols = vec![b"remoteingress".to_vec()]; + + let quic_config = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config) + .expect("failed to build QUIC client config from rustls config"); + + let mut transport = quinn::TransportConfig::default(); + transport.keep_alive_interval(Some(std::time::Duration::from_secs(15))); + transport.max_idle_timeout(Some( + quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(), + )); + // Match MAX_STREAMS_PER_EDGE (1024) from hub.rs. + // Default is 100 which is too low for high-concurrency tunneling. + transport.max_concurrent_bidi_streams(1024u32.into()); + + let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config)); + client_config.transport_config(Arc::new(transport)); + client_config +} + +/// Build a quinn ServerConfig from the same TLS server config used for TCP+TLS. +pub fn build_quic_server_config( + tls_server_config: rustls::ServerConfig, +) -> Result> { + let quic_config = quinn::crypto::rustls::QuicServerConfig::try_from(tls_server_config)?; + + let mut transport = quinn::TransportConfig::default(); + transport.keep_alive_interval(Some(std::time::Duration::from_secs(15))); + transport.max_idle_timeout(Some( + quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(), + )); + transport.max_concurrent_bidi_streams(1024u32.into()); + + let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config)); + server_config.transport_config(Arc::new(transport)); + Ok(server_config) +} + +/// Write a control message to a QUIC send stream. +/// Format: [type:1][length:4][payload:N] +pub async fn write_ctrl_message( + send: &mut quinn::SendStream, + msg_type: u8, + payload: &[u8], +) -> Result<(), std::io::Error> { + let len = payload.len() as u32; + let mut header = [0u8; CTRL_HEADER_SIZE]; + header[0] = msg_type; + header[1..5].copy_from_slice(&len.to_be_bytes()); + send.write_all(&header).await?; + if !payload.is_empty() { + send.write_all(payload).await?; + } + Ok(()) +} + +/// Read a control message from a QUIC recv stream. +/// Returns (msg_type, payload). Returns None on EOF. +pub async fn read_ctrl_message( + recv: &mut quinn::RecvStream, +) -> Result)>, std::io::Error> { + let mut header = [0u8; CTRL_HEADER_SIZE]; + match recv.read_exact(&mut header).await { + Ok(()) => {} + Err(e) => { + if let quinn::ReadExactError::FinishedEarly(_) = e { + return Ok(None); + } + return Err(std::io::Error::new(std::io::ErrorKind::Other, e)); + } + } + let msg_type = header[0]; + let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize; + let mut payload = vec![0u8; len]; + if len > 0 { + recv.read_exact(&mut payload).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + } + Ok(Some((msg_type, payload))) +} + +/// Write the PROXY v1 header as the first bytes on a QUIC data stream. +/// The header is length-prefixed so the receiver knows where it ends and data begins. +/// Format: [header_len:4][proxy_header:N] +pub async fn write_proxy_header( + send: &mut quinn::SendStream, + proxy_header: &str, +) -> Result<(), std::io::Error> { + let header_bytes = proxy_header.as_bytes(); + let len = header_bytes.len() as u32; + send.write_all(&len.to_be_bytes()).await?; + send.write_all(header_bytes).await?; + Ok(()) +} + +/// Read the PROXY v1 header from the first bytes of a QUIC data stream. +/// Returns the header string. +pub async fn read_proxy_header( + recv: &mut quinn::RecvStream, +) -> Result { + let mut len_buf = [0u8; 4]; + recv.read_exact(&mut len_buf).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + let len = u32::from_be_bytes(len_buf) as usize; + if len > 8192 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "proxy header too long", + )); + } + let mut header = vec![0u8; len]; + recv.read_exact(&mut header).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + String::from_utf8(header).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "proxy header not UTF-8") + }) +} + +/// TLS certificate verifier that accepts any certificate (auth is via shared secret). +/// Same as the one in edge.rs but placed here so the QUIC module is self-contained. +#[derive(Debug)] +struct NoCertVerifier; + +impl rustls::client::danger::ServerCertVerifier for NoCertVerifier { + fn verify_server_cert( + &self, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp_response: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &rustls::pki_types::CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + rustls::SignatureScheme::RSA_PKCS1_SHA256, + rustls::SignatureScheme::RSA_PKCS1_SHA384, + rustls::SignatureScheme::RSA_PKCS1_SHA512, + rustls::SignatureScheme::ECDSA_NISTP256_SHA256, + rustls::SignatureScheme::ECDSA_NISTP384_SHA384, + rustls::SignatureScheme::ECDSA_NISTP521_SHA512, + rustls::SignatureScheme::RSA_PSS_SHA256, + rustls::SignatureScheme::RSA_PSS_SHA384, + rustls::SignatureScheme::RSA_PSS_SHA512, + rustls::SignatureScheme::ED25519, + rustls::SignatureScheme::ED448, + ] + } +} diff --git a/test/test.quic.node.ts b/test/test.quic.node.ts new file mode 100644 index 0000000..260f1b0 --- /dev/null +++ b/test/test.quic.node.ts @@ -0,0 +1,283 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import * as net from 'net'; +import * as crypto from 'crypto'; +import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js'; + +// --------------------------------------------------------------------------- +// Helpers (same patterns as test.flowcontrol.node.ts) +// --------------------------------------------------------------------------- + +async function findFreePorts(count: number): Promise { + const servers: net.Server[] = []; + const ports: number[] = []; + for (let i = 0; i < count; i++) { + const server = net.createServer(); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + ports.push((server.address() as net.AddressInfo).port); + servers.push(server); + } + await Promise.all(servers.map((s) => new Promise((resolve) => s.close(() => resolve())))); + return ports; +} + +type TrackingServer = net.Server & { destroyAll: () => void }; + +function startEchoServer(port: number, host: string): Promise { + return new Promise((resolve, reject) => { + const connections = new Set(); + const server = net.createServer((socket) => { + connections.add(socket); + socket.on('close', () => connections.delete(socket)); + let proxyHeaderParsed = false; + let pendingBuf = Buffer.alloc(0); + socket.on('data', (data: Buffer) => { + if (!proxyHeaderParsed) { + pendingBuf = Buffer.concat([pendingBuf, data]); + const idx = pendingBuf.indexOf('\r\n'); + if (idx !== -1) { + proxyHeaderParsed = true; + const remainder = pendingBuf.subarray(idx + 2); + if (remainder.length > 0) socket.write(remainder); + } + return; + } + socket.write(data); + }); + socket.on('error', () => {}); + }) as TrackingServer; + server.destroyAll = () => { + for (const conn of connections) conn.destroy(); + connections.clear(); + }; + server.on('error', reject); + server.listen(port, host, () => resolve(server)); + }); +} + +async function forceCloseServer(server: TrackingServer): Promise { + server.destroyAll(); + await new Promise((resolve) => server.close(() => resolve())); +} + +interface TestTunnel { + hub: RemoteIngressHub; + edge: RemoteIngressEdge; + edgePort: number; + cleanup: () => Promise; +} + +/** + * Start a full hub + edge tunnel using QUIC transport. + * Edge binds to 127.0.0.1, upstream server binds to 127.0.0.2. + */ +async function startQuicTunnel(edgePort: number, hubPort: number): Promise { + const hub = new RemoteIngressHub(); + const edge = new RemoteIngressEdge(); + + await hub.start({ + tunnelPort: hubPort, + targetHost: '127.0.0.2', + }); + + await hub.updateAllowedEdges([ + { id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] }, + ]); + + const connectedPromise = new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000); + edge.once('tunnelConnected', () => { + clearTimeout(timeout); + resolve(); + }); + }); + + await edge.start({ + hubHost: '127.0.0.1', + hubPort, + edgeId: 'test-edge', + secret: 'test-secret', + bindAddress: '127.0.0.1', + transportMode: 'quic', + }); + + await connectedPromise; + await new Promise((resolve) => setTimeout(resolve, 500)); + + return { + hub, + edge, + edgePort, + cleanup: async () => { + await edge.stop(); + await hub.stop(); + }, + }; +} + +function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let totalReceived = 0; + const expectedLength = data.length; + let settled = false; + + const client = net.createConnection({ host: '127.0.0.1', port }, () => { + client.write(data); + client.end(); + }); + + const timer = setTimeout(() => { + if (!settled) { + settled = true; + client.destroy(); + reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`)); + } + }, timeoutMs); + + client.on('data', (chunk: Buffer) => { + chunks.push(chunk); + totalReceived += chunk.length; + if (totalReceived >= expectedLength && !settled) { + settled = true; + clearTimeout(timer); + client.destroy(); + resolve(Buffer.concat(chunks)); + } + }); + + client.on('end', () => { + if (!settled) { + settled = true; + clearTimeout(timer); + resolve(Buffer.concat(chunks)); + } + }); + + client.on('error', (err) => { + if (!settled) { + settled = true; + clearTimeout(timer); + reject(err); + } + }); + }); +} + +function sha256(buf: Buffer): string { + return crypto.createHash('sha256').update(buf).digest('hex'); +} + +// --------------------------------------------------------------------------- +// QUIC Transport E2E Tests +// --------------------------------------------------------------------------- + +let tunnel: TestTunnel; +let echoServer: TrackingServer; +let hubPort: number; +let edgePort: number; + +tap.test('QUIC setup: start echo server and QUIC tunnel', async () => { + [hubPort, edgePort] = await findFreePorts(2); + + echoServer = await startEchoServer(edgePort, '127.0.0.2'); + tunnel = await startQuicTunnel(edgePort, hubPort); + + expect(tunnel.hub.running).toBeTrue(); + const status = await tunnel.edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('QUIC: single stream echo — 1KB', async () => { + const data = crypto.randomBytes(1024); + const hash = sha256(data); + const received = await sendAndReceive(edgePort, data, 10000); + expect(received.length).toEqual(1024); + expect(sha256(received)).toEqual(hash); +}); + +tap.test('QUIC: single stream echo — 1MB', async () => { + const size = 1024 * 1024; + const data = crypto.randomBytes(size); + const hash = sha256(data); + const received = await sendAndReceive(edgePort, data, 30000); + expect(received.length).toEqual(size); + expect(sha256(received)).toEqual(hash); +}); + +tap.test('QUIC: single stream echo — 16MB', async () => { + const size = 16 * 1024 * 1024; + const data = crypto.randomBytes(size); + const hash = sha256(data); + const received = await sendAndReceive(edgePort, data, 60000); + expect(received.length).toEqual(size); + expect(sha256(received)).toEqual(hash); +}); + +tap.test('QUIC: 10 concurrent streams x 1MB each', async () => { + const streamCount = 10; + const payloadSize = 1024 * 1024; + + const promises = Array.from({ length: streamCount }, () => { + const data = crypto.randomBytes(payloadSize); + const hash = sha256(data); + return sendAndReceive(edgePort, data, 30000).then((received) => ({ + sent: hash, + received: sha256(received), + sizeOk: received.length === payloadSize, + })); + }); + + const results = await Promise.all(promises); + const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); + expect(failures.length).toEqual(0); +}); + +tap.test('QUIC: 50 concurrent streams x 64KB each', async () => { + const streamCount = 50; + const payloadSize = 64 * 1024; + + const promises = Array.from({ length: streamCount }, () => { + const data = crypto.randomBytes(payloadSize); + const hash = sha256(data); + return sendAndReceive(edgePort, data, 30000).then((received) => ({ + sent: hash, + received: sha256(received), + sizeOk: received.length === payloadSize, + })); + }); + + const results = await Promise.all(promises); + const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); + expect(failures.length).toEqual(0); +}); + +tap.test('QUIC: 200 concurrent streams x 16KB each', async () => { + const streamCount = 200; + const payloadSize = 16 * 1024; + + const promises = Array.from({ length: streamCount }, () => { + const data = crypto.randomBytes(payloadSize); + const hash = sha256(data); + return sendAndReceive(edgePort, data, 60000).then((received) => ({ + sent: hash, + received: sha256(received), + sizeOk: received.length === payloadSize, + })); + }); + + const results = await Promise.all(promises); + const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received); + expect(failures.length).toEqual(0); +}); + +tap.test('QUIC: tunnel still connected after all tests', async () => { + const status = await tunnel.edge.getStatus(); + expect(status.connected).toBeTrue(); +}); + +tap.test('QUIC teardown: stop tunnel and echo server', async () => { + await tunnel.cleanup(); + await forceCloseServer(echoServer); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 67bfdc7..0ed6bc5 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.9.1', + version: '4.10.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' } diff --git a/ts/classes.remoteingressedge.ts b/ts/classes.remoteingressedge.ts index 53023ca..0cbfd4d 100644 --- a/ts/classes.remoteingressedge.ts +++ b/ts/classes.remoteingressedge.ts @@ -15,6 +15,7 @@ type TEdgeCommands = { edgeId: string; secret: string; bindAddress?: string; + transportMode?: 'tcpTls' | 'quic' | 'quicWithFallback'; }; result: { started: boolean }; }; @@ -40,6 +41,7 @@ export interface IEdgeConfig { edgeId: string; secret: string; bindAddress?: string; + transportMode?: 'tcpTls' | 'quic' | 'quicWithFallback'; } const MAX_RESTART_ATTEMPTS = 10; @@ -137,6 +139,7 @@ export class RemoteIngressEdge extends EventEmitter { edgeId: edgeConfig.edgeId, secret: edgeConfig.secret, ...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}), + ...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}), }); this.started = true; @@ -233,6 +236,7 @@ export class RemoteIngressEdge extends EventEmitter { edgeId: this.savedConfig.edgeId, secret: this.savedConfig.secret, ...(this.savedConfig.bindAddress ? { bindAddress: this.savedConfig.bindAddress } : {}), + ...(this.savedConfig.transportMode ? { transportMode: this.savedConfig.transportMode } : {}), }); this.started = true;