This commit is contained in:
eric
2023-03-18 14:49:14 +08:00
parent aaaddbd204
commit f531fc2217
5 changed files with 61 additions and 18 deletions

View File

@@ -26,7 +26,7 @@ use super::router::Router;
fn log_request(
sess: &Session,
outbound_tag: &str,
outbound_tag_color: colored::Color,
outbound_tag_color: &colored::Color,
handshake_time: Option<u128>,
) {
let hs = handshake_time.map_or("failed".to_string(), |hs| format!("{}ms", hs));
@@ -40,7 +40,7 @@ fn log_request(
"[{}] [{}] [{}] [{}] {}",
&sess.inbound_tag,
sess.network.to_string().color(network_color),
outbound_tag.color(outbound_tag_color),
outbound_tag.color(*outbound_tag_color),
hs,
&sess.destination,
);

View File

@@ -109,7 +109,7 @@ pub trait Tag {
}
pub trait Color {
fn color(&self) -> colored::Color;
fn color(&self) -> &colored::Color;
}
#[derive(Debug)]
@@ -340,7 +340,7 @@ pub enum DialOrder {
}
// A single TCP dial.
async fn tcp_dial_task(dial_addr: SocketAddr) -> io::Result<(AnyStream, SocketAddr)> {
async fn tcp_dial_task(dial_addr: SocketAddr) -> io::Result<DialResult> {
let socket = match dial_addr {
SocketAddr::V4(..) => TcpSocket::new_v4()?,
SocketAddr::V6(..) => TcpSocket::new_v6()?,
@@ -368,7 +368,10 @@ async fn tcp_dial_task(dial_addr: SocketAddr) -> io::Result<(AnyStream, SocketAd
&dial_addr,
elapsed.as_millis()
);
Ok((Box::new(stream), dial_addr))
Ok(DialResult {
stream: Box::new(stream),
addr: dial_addr,
})
}
pub async fn connect_stream_outbound(
@@ -426,6 +429,11 @@ pub async fn connect_datagram_outbound(
}
}
struct DialResult {
stream: AnyStream,
addr: SocketAddr,
}
// Dials a TCP stream.
pub async fn new_tcp_stream(
dns_client: SyncDnsClient,
@@ -461,10 +469,12 @@ pub async fn new_tcp_stream(
if !tasks.is_empty() {
match select_ok(tasks.into_iter()).await {
Ok(v) => {
#[rustfmt::skip]
dns_client.read().await.optimize_cache(address.to_owned(), v.0.1.ip()).await;
#[rustfmt::skip]
return Ok(v.0.0);
dns_client
.read()
.await
.optimize_cache(address.to_owned(), v.0.addr.ip())
.await;
return Ok(v.0.stream);
}
Err(e) => {
last_err = Some(io::Error::new(

View File

@@ -32,13 +32,13 @@ impl OutboundHandler for Handler {
fn stream(&self) -> io::Result<&AnyOutboundStreamHandler> {
self.stream_handler
.as_ref()
.ok_or(io::Error::new(io::ErrorKind::Other, "no tcp handler"))
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no tcp handler"))
}
fn datagram(&self) -> io::Result<&AnyOutboundDatagramHandler> {
self.datagram_handler
.as_ref()
.ok_or(io::Error::new(io::ErrorKind::Other, "no udp handler"))
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no udp handler"))
}
}
@@ -49,8 +49,8 @@ impl Tag for Handler {
}
impl Color for Handler {
fn color(&self) -> colored::Color {
(&self.color).to_owned()
fn color(&self) -> &colored::Color {
&self.color
}
}

View File

@@ -5,6 +5,11 @@ use futures::future::select_ok;
use crate::{app::SyncDnsClient, proxy::*, session::Session};
struct HandleResult {
idx: usize,
dgram: AnyOutboundDatagram,
}
pub struct Handler {
pub actors: Vec<AnyOutboundHandler>,
pub delay_base: u32,
@@ -36,13 +41,25 @@ impl OutboundDatagramHandler for Handler {
.await;
}
let transport =
crate::proxy::connect_datagram_outbound(sess, self.dns_client.clone(), a).await?;
a.datagram()?.handle(sess, transport).await
crate::proxy::connect_datagram_outbound(sess, self.dns_client.clone(), a)
.await?;
a.datagram()?
.handle(sess, transport)
.await
.map(|dgram| HandleResult { idx: i, dgram })
};
tasks.push(Box::pin(t));
}
match select_ok(tasks.into_iter()).await {
Ok(v) => Ok(v.0),
Ok(v) => {
debug!(
"tryall handles [{}:{}] to [{}]",
sess.network,
sess.destination,
self.actors[v.0.idx].tag()
);
Ok(v.0.dgram)
}
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!("all outbound attempts failed, last error: {}", e),

View File

@@ -5,6 +5,11 @@ use futures::future::select_ok;
use crate::{app::SyncDnsClient, proxy::*, session::Session};
struct HandleResult {
idx: usize,
stream: AnyStream,
}
pub struct Handler {
pub actors: Vec<AnyOutboundHandler>,
pub delay_base: u32,
@@ -33,12 +38,23 @@ impl OutboundStreamHandler for Handler {
}
let stream =
crate::proxy::connect_stream_outbound(sess, self.dns_client.clone(), a).await?;
a.stream()?.handle(sess, stream).await
a.stream()?
.handle(sess, stream)
.await
.map(|stream| HandleResult { idx: i, stream })
};
tasks.push(Box::pin(t));
}
match select_ok(tasks.into_iter()).await {
Ok(v) => Ok(v.0),
Ok(v) => {
debug!(
"tryall handles [{}:{}] to [{}]",
sess.network,
sess.destination,
self.actors[v.0.idx].tag()
);
Ok(v.0.stream)
}
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!("all outbound attempts failed, last error: {}", e),