From 67e9ed2f36c27e411c616acba0b5cb61e0aa3c91 Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 23 Feb 2026 16:03:43 +0800 Subject: [PATCH] Add FFI for outbound tests --- leaf-ffi/Cargo.toml | 2 ++ leaf-ffi/src/lib.rs | 84 +++++++++++++++++++++++++++++++++++++++++++++ leaf/src/util.rs | 82 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+) diff --git a/leaf-ffi/Cargo.toml b/leaf-ffi/Cargo.toml index 5f39679..6937029 100644 --- a/leaf-ffi/Cargo.toml +++ b/leaf-ffi/Cargo.toml @@ -32,3 +32,5 @@ auto-reload = ["leaf/auto-reload"] [dependencies] leaf = { path = "../leaf", default-features = false, optional = true } tokio = { version = "1", features = ["rt"] } +anyhow = "1.0" +futures = "0.3" diff --git a/leaf-ffi/src/lib.rs b/leaf-ffi/src/lib.rs index 7864d24..77927e0 100644 --- a/leaf-ffi/src/lib.rs +++ b/leaf-ffi/src/lib.rs @@ -168,6 +168,90 @@ pub unsafe extern "C" fn leaf_test_config(config_path: *const c_char) -> i32 { } } +/// Tests all outbounds connectivity and latency. +/// +/// @param config The content of the config file. +/// @param concurrency The maximum number of concurrent tests. +/// @param timeout_sec The timeout in seconds for each test. +/// @param context User-provided context pointer to be passed back to the callback. +/// @param callback The callback function to receive results. +/// Arguments: tag (string), tcp_latency (ms, -1 if failed), udp_latency (ms, -1 if failed), context. +/// @return Returns ERR_OK on success. +#[no_mangle] +pub unsafe extern "C" fn leaf_test_outbounds( + config: *const c_char, + concurrency: u32, + timeout_sec: u32, + context: *mut std::ffi::c_void, + callback: extern "C" fn(*const c_char, i32, i32, *mut std::ffi::c_void), +) -> i32 { + if let Ok(config_str) = unsafe { CStr::from_ptr(config).to_str() } { + // Send context safely to the other thread? + // raw pointers are not Send. + // But we are blocking on rt.block_on, so we are staying in this function? + // No, rt.block_on blocks the current thread until the future completes. + // The callback is called from within the future. + // Since we block, the context pointer is valid for the duration. + // However, the future is executed on the runtime. + // We need to wrap the pointer in a Send wrapper if the runtime is multi-threaded. + // But here we create a new Runtime `Runtime::new()`, which is multi-threaded by default? + // Or we can use `current_thread` runtime. + // leaf::util::test_outbounds is async. + + // Let's use a wrapper struct to make the pointer Send/Sync since we know we are waiting for it. + struct SendPtr(*mut std::ffi::c_void); + unsafe impl Send for SendPtr {} + unsafe impl Sync for SendPtr {} + let ctx = SendPtr(context); + + let config = match leaf::config::from_string(config_str) { + Ok(c) => c, + Err(e) => return to_errno(leaf::Error::Config(anyhow::anyhow!(e))), + }; + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async move { + use futures::StreamExt; + let timeout = if timeout_sec > 0 { + Some(std::time::Duration::from_secs(timeout_sec as u64)) + } else { + None + }; + if let Ok(mut stream) = + leaf::util::stream_outbounds_tests(&config, timeout, concurrency as usize).await + { + while let Some((tag, (tcp_res, udp_res))) = stream.next().await { + let tag_cstring = std::ffi::CString::new(tag.clone()).unwrap(); + let tcp_latency = match tcp_res { + Ok(d) => d.as_millis() as i32, + Err(e) => { + println!("TCP test failed for {}: {:?}", tag, e); + -1 + } + }; + let udp_latency = match udp_res { + Ok(d) => d.as_millis() as i32, + Err(e) => { + println!("UDP test failed for {}: {:?}", tag, e); + -1 + } + }; + callback(tag_cstring.as_ptr(), tcp_latency, udp_latency, ctx.0); + } + } else { + println!("Failed to start stream_outbounds_tests"); + } + }); + ERR_OK + } else { + ERR_CONFIG_PATH + } +} + /// Runs a health check for an outbound. /// /// This performs an active health check by sending a PING to healthcheck.leaf diff --git a/leaf/src/util.rs b/leaf/src/util.rs index 71d6f32..3e270e2 100644 --- a/leaf/src/util.rs +++ b/leaf/src/util.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::str::FromStr; use std::sync::Arc; @@ -173,6 +174,87 @@ pub async fn test_outbound( Ok((tcp_res, udp_res)) } +pub async fn test_outbounds( + config: &Config, + to: Option, + concurrency: usize, +) -> Result, Result)>> { + let to = to.unwrap_or(Duration::from_secs(4)); + let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns)?)); + let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone())?; + + let mut tasks = Vec::new(); + for handler in outbound_manager.handlers() { + let tag = handler.tag().clone(); + let handler = handler.clone(); + let dns_client = dns_client.clone(); + tasks.push(async move { + let (tcp_res, udp_res) = futures::future::join( + timeout(to, test_tcp_outbound(dns_client.clone(), handler.clone())), + timeout(to, test_udp_outbound(dns_client, handler)), + ) + .await; + let tcp_res = match tcp_res { + Ok(res) => res, + Err(_) => Err(anyhow!("timeout")), + }; + let udp_res = match udp_res { + Ok(res) => res, + Err(_) => Err(anyhow!("timeout")), + }; + (tag, (tcp_res, udp_res)) + }); + } + + use futures::StreamExt; + let results = futures::stream::iter(tasks) + .buffer_unordered(concurrency) + .collect::>() + .await; + + let mut map = HashMap::new(); + for (tag, res) in results { + map.insert(tag, res); + } + Ok(map) +} + +pub async fn stream_outbounds_tests( + config: &Config, + to: Option, + concurrency: usize, +) -> Result, Result))>> { + let to = to.unwrap_or(Duration::from_secs(4)); + let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns)?)); + let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone())?; + + let mut tasks = Vec::new(); + for handler in outbound_manager.handlers() { + let tag = handler.tag().clone(); + let handler = handler.clone(); + let dns_client = dns_client.clone(); + tasks.push(async move { + let (tcp_res, udp_res) = futures::future::join( + timeout(to, test_tcp_outbound(dns_client.clone(), handler.clone())), + timeout(to, test_udp_outbound(dns_client, handler)), + ) + .await; + let tcp_res = match tcp_res { + Ok(res) => res, + Err(_) => Err(anyhow!("timeout")), + }; + let udp_res = match udp_res { + Ok(res) => res, + Err(_) => Err(anyhow!("timeout")), + }; + (tag, (tcp_res, udp_res)) + }); + } + + use futures::StreamExt; + Ok(futures::stream::iter(tasks).buffer_unordered(concurrency)) +} + pub async fn health_check_outbound( tag: &str, config: &Config,