Add FFI for outbound tests
This commit is contained in:
@@ -32,3 +32,5 @@ auto-reload = ["leaf/auto-reload"]
|
||||
[dependencies]
|
||||
leaf = { path = "../leaf", default-features = false, optional = true }
|
||||
tokio = { version = "1", features = ["rt"] }
|
||||
anyhow = "1.0"
|
||||
futures = "0.3"
|
||||
|
||||
@@ -168,6 +168,90 @@ pub unsafe extern "C" fn leaf_test_config(config_path: *const c_char) -> i32 {
|
||||
}
|
||||
}
|
||||
|
||||
/// Tests all outbounds connectivity and latency.
|
||||
///
|
||||
/// @param config The content of the config file.
|
||||
/// @param concurrency The maximum number of concurrent tests.
|
||||
/// @param timeout_sec The timeout in seconds for each test.
|
||||
/// @param context User-provided context pointer to be passed back to the callback.
|
||||
/// @param callback The callback function to receive results.
|
||||
/// Arguments: tag (string), tcp_latency (ms, -1 if failed), udp_latency (ms, -1 if failed), context.
|
||||
/// @return Returns ERR_OK on success.
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn leaf_test_outbounds(
|
||||
config: *const c_char,
|
||||
concurrency: u32,
|
||||
timeout_sec: u32,
|
||||
context: *mut std::ffi::c_void,
|
||||
callback: extern "C" fn(*const c_char, i32, i32, *mut std::ffi::c_void),
|
||||
) -> i32 {
|
||||
if let Ok(config_str) = unsafe { CStr::from_ptr(config).to_str() } {
|
||||
// Send context safely to the other thread?
|
||||
// raw pointers are not Send.
|
||||
// But we are blocking on rt.block_on, so we are staying in this function?
|
||||
// No, rt.block_on blocks the current thread until the future completes.
|
||||
// The callback is called from within the future.
|
||||
// Since we block, the context pointer is valid for the duration.
|
||||
// However, the future is executed on the runtime.
|
||||
// We need to wrap the pointer in a Send wrapper if the runtime is multi-threaded.
|
||||
// But here we create a new Runtime `Runtime::new()`, which is multi-threaded by default?
|
||||
// Or we can use `current_thread` runtime.
|
||||
// leaf::util::test_outbounds is async.
|
||||
|
||||
// Let's use a wrapper struct to make the pointer Send/Sync since we know we are waiting for it.
|
||||
struct SendPtr(*mut std::ffi::c_void);
|
||||
unsafe impl Send for SendPtr {}
|
||||
unsafe impl Sync for SendPtr {}
|
||||
let ctx = SendPtr(context);
|
||||
|
||||
let config = match leaf::config::from_string(config_str) {
|
||||
Ok(c) => c,
|
||||
Err(e) => return to_errno(leaf::Error::Config(anyhow::anyhow!(e))),
|
||||
};
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
rt.block_on(async move {
|
||||
use futures::StreamExt;
|
||||
let timeout = if timeout_sec > 0 {
|
||||
Some(std::time::Duration::from_secs(timeout_sec as u64))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if let Ok(mut stream) =
|
||||
leaf::util::stream_outbounds_tests(&config, timeout, concurrency as usize).await
|
||||
{
|
||||
while let Some((tag, (tcp_res, udp_res))) = stream.next().await {
|
||||
let tag_cstring = std::ffi::CString::new(tag.clone()).unwrap();
|
||||
let tcp_latency = match tcp_res {
|
||||
Ok(d) => d.as_millis() as i32,
|
||||
Err(e) => {
|
||||
println!("TCP test failed for {}: {:?}", tag, e);
|
||||
-1
|
||||
}
|
||||
};
|
||||
let udp_latency = match udp_res {
|
||||
Ok(d) => d.as_millis() as i32,
|
||||
Err(e) => {
|
||||
println!("UDP test failed for {}: {:?}", tag, e);
|
||||
-1
|
||||
}
|
||||
};
|
||||
callback(tag_cstring.as_ptr(), tcp_latency, udp_latency, ctx.0);
|
||||
}
|
||||
} else {
|
||||
println!("Failed to start stream_outbounds_tests");
|
||||
}
|
||||
});
|
||||
ERR_OK
|
||||
} else {
|
||||
ERR_CONFIG_PATH
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a health check for an outbound.
|
||||
///
|
||||
/// This performs an active health check by sending a PING to healthcheck.leaf
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -173,6 +174,87 @@ pub async fn test_outbound(
|
||||
Ok((tcp_res, udp_res))
|
||||
}
|
||||
|
||||
pub async fn test_outbounds(
|
||||
config: &Config,
|
||||
to: Option<Duration>,
|
||||
concurrency: usize,
|
||||
) -> Result<HashMap<String, (Result<Duration>, Result<Duration>)>> {
|
||||
let to = to.unwrap_or(Duration::from_secs(4));
|
||||
let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns)?));
|
||||
let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone())?;
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for handler in outbound_manager.handlers() {
|
||||
let tag = handler.tag().clone();
|
||||
let handler = handler.clone();
|
||||
let dns_client = dns_client.clone();
|
||||
tasks.push(async move {
|
||||
let (tcp_res, udp_res) = futures::future::join(
|
||||
timeout(to, test_tcp_outbound(dns_client.clone(), handler.clone())),
|
||||
timeout(to, test_udp_outbound(dns_client, handler)),
|
||||
)
|
||||
.await;
|
||||
let tcp_res = match tcp_res {
|
||||
Ok(res) => res,
|
||||
Err(_) => Err(anyhow!("timeout")),
|
||||
};
|
||||
let udp_res = match udp_res {
|
||||
Ok(res) => res,
|
||||
Err(_) => Err(anyhow!("timeout")),
|
||||
};
|
||||
(tag, (tcp_res, udp_res))
|
||||
});
|
||||
}
|
||||
|
||||
use futures::StreamExt;
|
||||
let results = futures::stream::iter(tasks)
|
||||
.buffer_unordered(concurrency)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
let mut map = HashMap::new();
|
||||
for (tag, res) in results {
|
||||
map.insert(tag, res);
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
pub async fn stream_outbounds_tests(
|
||||
config: &Config,
|
||||
to: Option<Duration>,
|
||||
concurrency: usize,
|
||||
) -> Result<impl futures::Stream<Item = (String, (Result<Duration>, Result<Duration>))>> {
|
||||
let to = to.unwrap_or(Duration::from_secs(4));
|
||||
let dns_client = Arc::new(RwLock::new(DnsClient::new(&config.dns)?));
|
||||
let outbound_manager = OutboundManager::new(&config.outbounds, dns_client.clone())?;
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for handler in outbound_manager.handlers() {
|
||||
let tag = handler.tag().clone();
|
||||
let handler = handler.clone();
|
||||
let dns_client = dns_client.clone();
|
||||
tasks.push(async move {
|
||||
let (tcp_res, udp_res) = futures::future::join(
|
||||
timeout(to, test_tcp_outbound(dns_client.clone(), handler.clone())),
|
||||
timeout(to, test_udp_outbound(dns_client, handler)),
|
||||
)
|
||||
.await;
|
||||
let tcp_res = match tcp_res {
|
||||
Ok(res) => res,
|
||||
Err(_) => Err(anyhow!("timeout")),
|
||||
};
|
||||
let udp_res = match udp_res {
|
||||
Ok(res) => res,
|
||||
Err(_) => Err(anyhow!("timeout")),
|
||||
};
|
||||
(tag, (tcp_res, udp_res))
|
||||
});
|
||||
}
|
||||
|
||||
use futures::StreamExt;
|
||||
Ok(futures::stream::iter(tasks).buffer_unordered(concurrency))
|
||||
}
|
||||
|
||||
pub async fn health_check_outbound(
|
||||
tag: &str,
|
||||
config: &Config,
|
||||
|
||||
Reference in New Issue
Block a user