dispatcher: fix EOF propagation

This commit is contained in:
eric
2022-04-09 01:04:50 +08:00
parent c3afb38611
commit 1d5b5d619d
5 changed files with 589 additions and 278 deletions

View File

@@ -3,15 +3,13 @@ use std::io::{self, ErrorKind};
use std::sync::Arc;
use std::time::Duration;
use futures::future::{self, Either};
use log::*;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::sync::RwLock;
use tokio::time::timeout;
use crate::{
app::SyncDnsClient,
common::sniff,
common::{self, sniff},
option,
proxy::{OutboundDatagram, ProxyStream, TcpOutboundHandler, UdpOutboundHandler},
session::{Network, Session, SocksAddr},
@@ -173,248 +171,40 @@ impl Dispatcher {
}
};
match TcpOutboundHandler::handle(h.as_ref(), sess, stream).await {
Ok(rhs) => {
Ok(mut rhs) => {
let elapsed = tokio::time::Instant::now().duration_since(handshake_start);
log_request(sess, h.tag(), h.color(), Some(elapsed.as_millis()));
let (lr, mut lw) = tokio::io::split(lhs);
let (rr, mut rw) = tokio::io::split(rhs);
let mut lr = BufReader::with_capacity(*option::LINK_BUFFER_SIZE * 1024, lr);
let mut rr = BufReader::with_capacity(*option::LINK_BUFFER_SIZE * 1024, rr);
let l2r = Box::pin(tokio::io::copy_buf(&mut lr, &mut rw));
let r2l = Box::pin(tokio::io::copy_buf(&mut rr, &mut lw));
// TODO Propagate EOF signal.
// Drives both uplink and downlink to completion, i.e. read till EOF.
match future::select(l2r, r2l).await {
// Uplink task returns first, with the result of the completed uplink
// task and the uncompleted downlink task.
Either::Left((up_res, new_r2l)) => {
// Logs the uplink result, either successful with bytes transfered
// or an error.
match up_res {
Ok(up_n) => {
debug!(
"tcp uplink {} -> {} done, {} bytes transfered [{}]",
&sess.source,
&sess.destination,
up_n,
&h.tag(),
);
}
Err(up_e) => {
// FIXME Perhaps we should terminate the pipe immediately.
debug!(
"tcp uplink {} -> {} error: {} [{}]",
&sess.source,
&sess.destination,
up_e,
&h.tag()
);
}
}
// Puts a timeout limit on the uncompleted downlink task, because uplink
// has been completed, and we don't like half-closed connections, the other
// half must complete before timeout.
let timed_r2l =
timeout(Duration::from_secs(*option::TCP_DOWNLINK_TIMEOUT), new_r2l);
trace!(
"applied {}s downlink timeout to {} <- {}",
*option::TCP_DOWNLINK_TIMEOUT,
match common::io::copy_buf_bidirectional_with_timeout(
&mut lhs,
&mut rhs,
*option::LINK_BUFFER_SIZE * 1024,
Duration::from_secs(*option::TCP_UPLINK_TIMEOUT),
Duration::from_secs(*option::TCP_DOWNLINK_TIMEOUT),
)
.await
{
Ok((up_count, down_count)) => {
debug!(
"tcp link {} <-> {} done, ({}, {}) bytes transfered [{}]",
&sess.source,
&sess.destination
&sess.destination,
up_count,
down_count,
&h.tag(),
);
// Because uplink has been completed, no furture data from the inbound
// connection, we would like to close the write side of the outbound
// connection, so that notifies the close of the pipeline.
//
// TODO Perhaps we should not send FIN in order to compatible with some
// of the improperly implemented server programs, e.g. a server closes
// the write side after reading EOF on read side.
// let rw_shutdown = rw.shutdown();
// Drives both the above tasks to completion simultaneously and get the
// results.
// let (shutdown_res, timed_r2l_res) =
// future::join(rw_shutdown, timed_r2l).await;
let timed_r2l_res = timed_r2l.await;
// Logs the shutdown result.
// if let Err(e) = shutdown_res {
// debug!(
// "tcp uplink {} -> {} error: {} [{}]",
// &sess.source,
// &sess.destination,
// e,
// &h.tag()
// );
// }
// Logs the downlink result.
match timed_r2l_res {
Ok(down_res) => match down_res {
Ok(down_n) => {
debug!(
"tcp downlink {} <- {} done, {} bytes transfered [{}]",
&sess.source,
&sess.destination,
down_n,
&h.tag(),
);
}
Err(down_e) => {
debug!(
"tcp downlink {} <- {} error: {} [{}]",
&sess.source,
&sess.destination,
down_e,
&h.tag()
);
}
},
Err(timeout_e) => {
debug!(
"tcp downlink {} <- {} timeout: {} [{}]",
&sess.source,
&sess.destination,
timeout_e,
&h.tag()
);
}
}
// Finally shuts down the inbound connection.
// if let Err(e) = lw.shutdown().await {
// debug!(
// "tcp downlink {} <- {} error: {} [{}]",
// &sess.source,
// &sess.destination,
// e,
// &h.tag()
// );
// }
}
// In case downlink returns first, the process is similar to the other
// side described above, with the roles of uplink and downlink interchanged.
Either::Right((down_res, new_l2r)) => {
match down_res {
Ok(down_n) => {
debug!(
"tcp downlink {} <- {} done, {} bytes transfered [{}]",
&sess.source,
&sess.destination,
down_n,
&h.tag(),
);
}
Err(down_e) => {
debug!(
"tcp downlink {} <- {} error: {} [{}]",
&sess.source,
&sess.destination,
down_e,
&h.tag()
);
}
}
let timed_l2r =
timeout(Duration::from_secs(*option::TCP_UPLINK_TIMEOUT), new_l2r);
trace!(
"applied {}s uplink timeout to {} -> {}",
*option::TCP_UPLINK_TIMEOUT,
Err(e) => {
debug!(
"tcp link {} <-> {} error: {} [{}]",
&sess.source,
&sess.destination
&sess.destination,
e,
&h.tag()
);
// let (shutdown_res, timed_l2r_res) =
// future::join(lw.shutdown(), timed_l2r).await;
let timed_l2r_res = timed_l2r.await;
// if let Err(e) = shutdown_res {
// debug!(
// "tcp downlink {} <- {} error: {} [{}]",
// &sess.source,
// &sess.destination,
// e,
// &h.tag()
// );
// }
match timed_l2r_res {
Ok(up_res) => match up_res {
Ok(up_n) => {
debug!(
"tcp uplink {} -> {} done, {} bytes transfered [{}]",
&sess.source,
&sess.destination,
up_n,
&h.tag(),
);
}
Err(up_e) => {
debug!(
"tcp uplink {} -> {} error: {} [{}]",
&sess.source,
&sess.destination,
up_e,
&h.tag()
);
}
},
Err(timeout_e) => {
debug!(
"tcp uplink {} -> {} timeout: {} [{}]",
&sess.source,
&sess.destination,
timeout_e,
&h.tag()
);
}
}
// if let Err(e) = rw.shutdown().await {
// debug!(
// "tcp uplink {} -> {} error: {} [{}]",
// &sess.source,
// &sess.destination,
// e,
// &h.tag()
// );
// }
}
}
if let Err(e) = rw.shutdown().await {
debug!(
"tcp uplink {} -> {} error: {} [{}]",
&sess.source,
&sess.destination,
e,
&h.tag()
);
}
if let Err(e) = lw.shutdown().await {
debug!(
"tcp downlink {} <- {} error: {} [{}]",
&sess.source,
&sess.destination,
e,
&h.tag()
);
}
}
Err(e) => {
debug!(

281
leaf/src/common/io.rs Normal file
View File

@@ -0,0 +1,281 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use futures::ready;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
#[derive(Debug)]
pub struct CopyBuffer {
read_done: bool,
need_flush: bool,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}
impl CopyBuffer {
pub fn new() -> Self {
Self {
read_done: false,
need_flush: false,
pos: 0,
cap: 0,
amt: 0,
buf: vec![0; 2 * 1024].into_boxed_slice(),
}
}
pub fn new_with_capacity(size: usize) -> Self {
Self {
read_done: false,
need_flush: false,
pos: 0,
cap: 0,
amt: 0,
buf: vec![0; size].into_boxed_slice(),
}
}
pub fn amount_transfered(&self) -> u64 {
self.amt
}
pub fn poll_copy<R, W>(
&mut self,
cx: &mut Context<'_>,
mut reader: Pin<&mut R>,
mut writer: Pin<&mut W>,
) -> Poll<io::Result<u64>>
where
R: AsyncRead + ?Sized,
W: AsyncWrite + ?Sized,
{
loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
let me = &mut *self;
let mut buf = ReadBuf::new(&mut me.buf);
match reader.as_mut().poll_read(cx, &mut buf) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))?;
self.need_flush = false;
}
return Poll::Pending;
}
}
let n = buf.filled().len();
if n == 0 {
self.read_done = true;
} else {
self.pos = 0;
self.cap = n;
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let me = &mut *self;
let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?;
if i == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte into writer",
)));
} else {
self.pos += i;
self.amt += i as u64;
self.need_flush = true;
}
}
// If pos larger than cap, this loop will never stop.
// In particular, user's wrong poll_write implementation returning
// incorrect written length may lead to thread blocking.
debug_assert!(
self.pos <= self.cap,
"writer returned length larger than input slice"
);
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
ready!(writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(self.amt));
}
}
}
}
enum TransferState {
Running(CopyBuffer),
ShuttingDown(u64),
Done,
}
struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> {
a: &'a mut A,
b: &'a mut B,
a_to_b: TransferState,
b_to_a: TransferState,
a_to_b_count: u64,
b_to_a_count: u64,
a_to_b_delay: Option<Pin<Box<tokio::time::Sleep>>>,
b_to_a_delay: Option<Pin<Box<tokio::time::Sleep>>>,
uplink_timeout_duration: Duration,
downlink_timeout_duration: Duration,
}
impl<'a, A, B> Future for CopyBidirectional<'a, A, B>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result<(u64, u64)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Unpack self into mut refs to each field to avoid borrow check issues.
let CopyBidirectional {
a,
b,
a_to_b,
b_to_a,
a_to_b_count,
b_to_a_count,
a_to_b_delay,
b_to_a_delay,
uplink_timeout_duration,
downlink_timeout_duration,
} = &mut *self;
let mut a = Pin::new(a);
let mut b = Pin::new(b);
loop {
match a_to_b {
TransferState::Running(buf) => {
let res = buf.poll_copy(cx, a.as_mut(), b.as_mut());
match res {
Poll::Ready(Ok(count)) => {
*a_to_b = TransferState::ShuttingDown(count);
continue;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {
if let Some(delay) = a_to_b_delay {
match delay.as_mut().poll(cx) {
Poll::Ready(()) => {
*a_to_b =
TransferState::ShuttingDown(buf.amount_transfered());
continue;
}
Poll::Pending => (),
}
}
}
}
}
TransferState::ShuttingDown(count) => {
let res = b.as_mut().poll_shutdown(cx);
match res {
Poll::Ready(Ok(())) => {
*a_to_b_count += *count;
*a_to_b = TransferState::Done;
b_to_a_delay
.replace(Box::pin(tokio::time::sleep(*downlink_timeout_duration)));
continue;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => (),
}
}
TransferState::Done => (),
}
match b_to_a {
TransferState::Running(buf) => {
let res = buf.poll_copy(cx, b.as_mut(), a.as_mut());
match res {
Poll::Ready(Ok(count)) => {
*b_to_a = TransferState::ShuttingDown(count);
continue;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {
if let Some(delay) = b_to_a_delay {
match delay.as_mut().poll(cx) {
Poll::Ready(()) => {
*b_to_a =
TransferState::ShuttingDown(buf.amount_transfered());
continue;
}
Poll::Pending => (),
}
}
}
}
}
TransferState::ShuttingDown(count) => {
let res = a.as_mut().poll_shutdown(cx);
match res {
Poll::Ready(Ok(())) => {
*b_to_a_count += *count;
*b_to_a = TransferState::Done;
a_to_b_delay
.replace(Box::pin(tokio::time::sleep(*uplink_timeout_duration)));
continue;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => (),
}
}
TransferState::Done => (),
}
match (&a_to_b, &b_to_a) {
(TransferState::Done, TransferState::Done) => break,
_ => return Poll::Pending,
}
}
Poll::Ready(Ok((*a_to_b_count, *b_to_a_count)))
}
}
pub async fn copy_buf_bidirectional_with_timeout<A, B>(
a: &mut A,
b: &mut B,
size: usize,
uplink_timeout_duration: Duration,
downlink_timeout_duration: Duration,
) -> Result<(u64, u64), std::io::Error>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
CopyBidirectional {
a,
b,
a_to_b: TransferState::Running(CopyBuffer::new_with_capacity(size)),
b_to_a: TransferState::Running(CopyBuffer::new_with_capacity(size)),
a_to_b_count: 0,
b_to_a_count: 0,
a_to_b_delay: None,
b_to_a_delay: None,
uplink_timeout_duration,
downlink_timeout_duration,
}
.await
}

View File

@@ -1,4 +1,5 @@
pub mod crypto;
pub mod io;
pub mod mutex;
pub mod net;
pub mod resolver;

View File

@@ -9,6 +9,7 @@ use tokio::sync::RwLock;
use tokio::time::timeout;
use leaf::proxy::*;
use leaf::session::Session;
pub async fn run_tcp_echo_server<A: ToSocketAddrs>(addr: A) {
let listener = TcpListener::bind(addr).await.unwrap();
@@ -67,6 +68,58 @@ pub fn run_leaf_instances(
leaf_rt_ids
}
fn new_socks_outbound(socks_addr: &str, socks_port: u16) -> AnyOutboundHandler {
// Make use of a socks outbound to initiate a socks request to a leaf instance.
let settings = leaf::config::json::SocksOutboundSettings {
address: Some(socks_addr.to_string()),
port: Some(socks_port),
};
let settings_str = serde_json::to_string(&settings).unwrap();
let raw_settings = serde_json::value::RawValue::from_string(settings_str).unwrap();
let outbounds = vec![leaf::config::json::Outbound {
protocol: "socks".to_string(),
tag: Some("socks".to_string()),
settings: Some(raw_settings),
}];
let mut config = leaf::config::json::Config {
log: None,
inbounds: None,
outbounds: Some(outbounds),
router: None,
dns: None,
api: None,
};
let config = leaf::config::json::to_internal(&mut config).unwrap();
let dns_client = Arc::new(RwLock::new(
leaf::app::dns_client::DnsClient::new(&config.dns).unwrap(),
));
let outbound_manager =
leaf::app::outbound::manager::OutboundManager::new(&config.outbounds, dns_client).unwrap();
let handler = outbound_manager.get("socks").unwrap();
handler
}
pub async fn new_socks_stream(socks_addr: &str, socks_port: u16, sess: &Session) -> AnyStream {
let handler = new_socks_outbound(socks_addr, socks_port);
let stream = tokio::net::TcpStream::connect(format!("{}:{}", socks_addr, socks_port))
.await
.unwrap();
TcpOutboundHandler::handle(handler.as_ref(), sess, Some(Box::new(stream)))
.await
.unwrap()
}
pub async fn new_socks_datagram(
socks_addr: &str,
socks_port: u16,
sess: &Session,
) -> AnyOutboundDatagram {
let handler = new_socks_outbound(socks_addr, socks_port);
UdpOutboundHandler::handle(handler.as_ref(), sess, None)
.await
.unwrap()
}
// Runs multiple leaf instances, thereafter a socks request will be sent to the
// given socks server to test the proxy chain. The proxy chain is expected to
// correctly handle the request to it's destination.
@@ -88,53 +141,16 @@ pub fn test_configs(configs: Vec<String>, socks_addr: &str, socks_port: u16) {
let app_task = async move {
tokio::time::sleep(Duration::from_millis(100)).await;
// Make use of a socks outbound to initiate a socks request to a leaf instance.
let settings = leaf::config::json::SocksOutboundSettings {
address: Some(socks_addr.to_string()),
port: Some(socks_port),
};
let settings_str = serde_json::to_string(&settings).unwrap();
let raw_settings = serde_json::value::RawValue::from_string(settings_str).unwrap();
let outbounds = vec![leaf::config::json::Outbound {
protocol: "socks".to_string(),
tag: Some("socks".to_string()),
settings: Some(raw_settings),
}];
let mut config = leaf::config::json::Config {
log: None,
inbounds: None,
outbounds: Some(outbounds),
router: None,
dns: None,
api: None,
};
let config = leaf::config::json::to_internal(&mut config).unwrap();
let dns_client = Arc::new(RwLock::new(
leaf::app::dns_client::DnsClient::new(&config.dns).unwrap(),
));
let outbound_manager =
leaf::app::outbound::manager::OutboundManager::new(&config.outbounds, dns_client)
.unwrap();
let handler = outbound_manager.get("socks").unwrap();
let mut sess = leaf::session::Session::default();
sess.destination = leaf::session::SocksAddr::Ip("127.0.0.1:3000".parse().unwrap());
// Test TCP
let stream = tokio::net::TcpStream::connect(format!("{}:{}", socks_addr, socks_port))
.await
.unwrap();
let mut s = TcpOutboundHandler::handle(handler.as_ref(), &sess, Some(Box::new(stream)))
.await
.unwrap();
let mut s = new_socks_stream(socks_addr, socks_port, &sess).await;
s.write_all(b"abc").await.unwrap();
let mut buf = Vec::new();
let n = s.read_buf(&mut buf).await.unwrap();
assert_eq!("abc".to_string(), String::from_utf8_lossy(&buf[..n]));
// Test UDP
let dgram = UdpOutboundHandler::handle(handler.as_ref(), &sess, None)
.await
.unwrap();
let mut dgram = new_socks_datagram(socks_addr, socks_port, &sess).await;
let (mut r, mut s) = dgram.split();
let msg = b"def";
let n = s.send_to(&msg.to_vec(), &sess.destination).await.unwrap();
@@ -149,10 +165,8 @@ pub fn test_configs(configs: Vec<String>, socks_addr: &str, socks_port: u16) {
// Test if we can handle a second UDP session. This can fail in stream
// transports if the stream ID has not been correctly set.
let dgram = UdpOutboundHandler::handle(handler.as_ref(), &sess, None)
.await
.unwrap();
let (mut r, mut s) = dgram.split();
let mut dgram2 = new_socks_datagram(socks_addr, socks_port, &sess).await;
let (mut r, mut s) = dgram2.split();
let msg = b"ghi";
let n = s.send_to(&msg.to_vec(), &sess.destination).await.unwrap();
assert_eq!(msg.len(), n);

View File

@@ -0,0 +1,225 @@
mod common;
use std::io::ErrorKind;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;
#[cfg(all(
feature = "outbound-socks",
feature = "inbound-socks",
feature = "outbound-shadowsocks",
feature = "inbound-shadowsocks",
feature = "outbound-trojan",
feature = "inbound-trojan",
feature = "outbound-direct",
))]
#[test]
fn test_tcp_half_close() {
std::env::set_var("TCP_DOWNLINK_TIMEOUT", "3");
std::env::set_var("TCP_UPLINK_TIMEOUT", "3");
let config1 = r#"
{
"inbounds": [
{
"protocol": "socks",
"address": "127.0.0.1",
"port": 1086
}
],
"outbounds": [
{
"protocol": "shadowsocks",
"tag": "shadowsocks",
"settings": {
"address": "127.0.0.1",
"port": 3001,
"method": "chacha20-ietf-poly1305",
"password": "password"
}
},
{
"protocol": "trojan",
"tag": "trojan",
"settings": {
"address": "127.0.0.1",
"port": 3002,
"password": "password"
}
}
]
}
"#;
let config2 = r#"
{
"inbounds": [
{
"protocol": "trojan",
"tag": "trojan",
"address": "127.0.0.1",
"port": 3002,
"settings": {
"passwords": [
"password",
"password2"
]
}
},
{
"protocol": "shadowsocks",
"tag": "shadowsocks",
"address": "127.0.0.1",
"port": 3001,
"settings": {
"method": "chacha20-ietf-poly1305",
"password": "password"
}
}
],
"outbounds": [
{
"protocol": "direct"
}
]
}
"#;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let leaf_rt_ids =
common::run_leaf_instances(&rt, vec![config1.to_string(), config2.to_string()]);
let res = rt.block_on(rt.spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let listener = TcpListener::bind("127.0.0.1:3000").await.unwrap();
let mut sess = leaf::session::Session::default();
sess.destination = leaf::session::SocksAddr::Ip("127.0.0.1:3000".parse().unwrap());
let mut client_stream = common::new_socks_stream("127.0.0.1", 1086, &sess).await;
let (mut server_stream, _) = listener.accept().await.unwrap();
// client <-> server
//
// Ensure both directions work.
//
// When testing with proxy protocols need additional info from the other
// side to initialize itself, such as shadowsocks needs a salt from the
// other side, we must forward some payload first.
client_stream.write_all(b"hello").await.unwrap();
let mut buf = Vec::new();
let n = server_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(String::from_utf8_lossy(&buf[..n]), "hello");
server_stream.write_all(b"world").await.unwrap();
let mut buf = Vec::new();
let n = client_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(String::from_utf8_lossy(&buf[..n]), "world");
// client(shutdown) <-> server
//
// The case client performs a shutdown.
//
// The expected behaiver is, the client socket is no longer writable
// after the shutdown, but can still read data from server socket.
// The server socket can write data to client, a read on the server socket
// will return zero bytes (EOF) immediately. After TCP_DOWNLINK_TIMEOUT and
// reading out all previous transfered data, a read on client socket should
// also return zero bytes immediately even though we havn't explicitly
// shutdown the server socket, this verifies TCP_DOWNLINK_TIMEOUT works as
// expected.
client_stream.shutdown().await.unwrap();
let res = client_stream
.write_all(b"hello")
.await
.map_err(|e| e.kind());
assert_eq!(res, Err(ErrorKind::BrokenPipe));
server_stream.write_all(b"world").await.unwrap();
let mut buf = Vec::new();
let n = client_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(String::from_utf8_lossy(&buf[..n]), "world");
let mut buf = Vec::new();
let n = timeout(Duration::from_millis(20), server_stream.read_buf(&mut buf))
.await
.unwrap()
.unwrap();
assert_eq!(n, 0);
tokio::time::sleep(
Duration::from_secs(*leaf::option::TCP_DOWNLINK_TIMEOUT)
.checked_sub(Duration::from_secs(1))
.unwrap(),
)
.await;
server_stream.write_all(b"world").await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let res = client_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(res, 5);
let mut buf = Vec::new();
let n = timeout(Duration::from_millis(20), client_stream.read_buf(&mut buf))
.await
.unwrap()
.unwrap();
assert_eq!(n, 0);
let mut client_stream = common::new_socks_stream("127.0.0.1", 1086, &sess).await;
let (mut server_stream, _) = listener.accept().await.unwrap();
// Another direction.
//
// client <-> server
//
// Ensure both directions work.
//
// When testing with proxy protocols need additional info from the other
// side to initialize itself, such as shadowsocks needs a salt from the
// other side, we must forward some payload first.
client_stream.write_all(b"hello").await.unwrap();
let mut buf = Vec::new();
let n = server_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(String::from_utf8_lossy(&buf[..n]), "hello");
server_stream.write_all(b"world").await.unwrap();
let mut buf = Vec::new();
let n = client_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(String::from_utf8_lossy(&buf[..n]), "world");
server_stream.shutdown().await.unwrap();
client_stream.write_all(b"hello").await.unwrap();
let mut buf = Vec::new();
let n = server_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(String::from_utf8_lossy(&buf[..n]), "hello");
let res = server_stream
.write_all(b"world")
.await
.map_err(|e| e.kind());
assert_eq!(res, Err(ErrorKind::BrokenPipe));
let mut buf = Vec::new();
let n = timeout(Duration::from_millis(20), client_stream.read_buf(&mut buf))
.await
.unwrap()
.unwrap();
assert_eq!(n, 0);
tokio::time::sleep(
Duration::from_secs(*leaf::option::TCP_UPLINK_TIMEOUT)
.checked_sub(Duration::from_millis(500))
.unwrap(),
)
.await;
client_stream.write_all(b"world").await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let res = server_stream.read_buf(&mut buf).await.unwrap();
assert_eq!(res, 5);
let mut buf = Vec::new();
let n = timeout(Duration::from_millis(20), server_stream.read_buf(&mut buf))
.await
.unwrap()
.unwrap();
assert_eq!(n, 0);
}));
for id in leaf_rt_ids.into_iter() {
leaf::shutdown(id);
}
assert!(res.is_ok());
}