5 Commits

Author SHA1 Message Date
eric
338ab32a2d v0.10.7
Some checks failed
ci / test (macos-latest) (push) Has been cancelled
ci / test (ubuntu-latest) (push) Has been cancelled
ci / build-bin-cross (aarch64-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (arm-unknown-linux-musleabi) (push) Has been cancelled
ci / build-bin-cross (armv7-unknown-linux-musleabihf) (push) Has been cancelled
ci / build-bin-cross (i686-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (mips-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (mipsel-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (x86_64-pc-windows-gnu) (push) Has been cancelled
ci / build-bin-cross (x86_64-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-local (macos-latest, x86_64-apple-darwin) (push) Has been cancelled
ci / build-apple (push) Has been cancelled
ci / build-android (push) Has been cancelled
releases / build-bin-cross (aarch64-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-cross (mips-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-cross (x86_64-pc-windows-gnu) (push) Has been cancelled
releases / build-bin-cross (x86_64-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-local (macos-latest, x86_64-apple-darwin) (push) Has been cancelled
releases / build-apple (push) Has been cancelled
releases / build-android (push) Has been cancelled
releases / create-release (push) Has been cancelled
releases / release-bin (aarch64-unknown-linux-musl) (push) Has been cancelled
releases / release-bin (mips-unknown-linux-musl) (push) Has been cancelled
releases / release-bin (x86_64-apple-darwin) (push) Has been cancelled
releases / release-bin (x86_64-pc-windows-gnu) (push) Has been cancelled
releases / release-bin (x86_64-unknown-linux-musl) (push) Has been cancelled
releases / release-mobile-libs (push) Has been cancelled
2023-09-27 08:42:18 +08:00
eric
7305a241c0 Revert a bad change 2023-09-27 08:28:04 +08:00
eric
f7788cdc73 amux: added a few options to configure channel sizes 2023-09-27 08:03:16 +08:00
eric
7718eae9d2 Minor improvements 2023-09-27 07:46:51 +08:00
eric
a59cefc498 log: fix missing inbound tag for datagram inbound 2023-09-27 05:57:09 +08:00
6 changed files with 54 additions and 67 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "leaf-bin"
version = "0.10.6"
version = "0.10.7"
authors = ["eycorsican <eric.y.corsican@gmail.com>"]
edition = "2021"

View File

@@ -134,13 +134,16 @@ impl NatManager {
return;
}
let sess = sess.cloned().unwrap_or(Session {
let mut sess = sess.cloned().unwrap_or(Session {
network: Network::Udp,
source: dgram_src.address,
destination: pkt.dst_addr.clone(),
inbound_tag: inbound_tag.to_string(),
..Default::default()
});
if sess.inbound_tag.is_empty() {
sess.inbound_tag = inbound_tag.to_string();
}
self.add_session(sess, dgram_src.clone(), client_ch_tx.clone(), &mut guard)
.await;

View File

@@ -136,6 +136,18 @@ lazy_static! {
get_env_var_or("QUIC_ACCEPT_CHANNEL_SIZE", 1024)
};
pub static ref AMUX_ACCEPT_CHANNEL_SIZE: usize = {
get_env_var_or("AMUX_ACCEPT_CHANNEL_SIZE", 1024)
};
pub static ref AMUX_STREAM_CHANNEL_SIZE: usize = {
get_env_var_or("AMUX_STREAM_CHANNEL_SIZE", 16)
};
pub static ref AMUX_FRAME_CHANNEL_SIZE: usize = {
get_env_var_or("AMUX_FRAME_CHANNEL_SIZE", 32)
};
/// Buffer size for UDP datagrams receiving/sending, in KB.
pub static ref DATAGRAM_BUFFER_SIZE: usize = {
get_env_var_or("DATAGRAM_BUFFER_SIZE", 2)

View File

@@ -112,7 +112,8 @@ impl MuxStream {
stream_end: Arc<AtomicBool>,
) -> (Self, Sender<Vec<u8>>) {
trace!("new mux stream {} (session {})", stream_id, session_id);
let (stream_read_tx, stream_read_rx) = mpsc::channel::<Vec<u8>>(1);
let (stream_read_tx, stream_read_rx) =
mpsc::channel::<Vec<u8>>(*crate::option::AMUX_STREAM_CHANNEL_SIZE);
(
MuxStream {
session_id,
@@ -364,8 +365,6 @@ impl<S: AsyncWrite + Unpin> Sink<MuxFrame> for MuxConnection<S> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let me = &mut *self;
// ready!(Pin::new(&mut me.inner.write_all(&me.write_buf)).poll(cx))?;
while !me.write_buf.is_empty() {
let n = ready!(Pin::new(&mut me.inner).poll_write(cx, &me.write_buf))?;
if n == 0 {
@@ -528,7 +527,8 @@ impl MuxSession {
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
{
let (frame_sink, frame_stream) = MuxConnection::new(conn).split();
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
let (frame_write_tx, frame_write_rx) =
mpsc::channel::<MuxFrame>(*crate::option::AMUX_FRAME_CHANNEL_SIZE);
let (recv_end, send_end) = (Arc::new(Mutex::new(false)), Arc::new(Mutex::new(false)));
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
let recv_bytes_counter = Arc::new(AtomicUsize::new(0));
@@ -569,9 +569,11 @@ impl MuxSession {
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
{
let (frame_sink, frame_stream) = MuxConnection::new(conn).split();
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
let (frame_write_tx, frame_write_rx) =
mpsc::channel::<MuxFrame>(*crate::option::AMUX_FRAME_CHANNEL_SIZE);
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
let (stream_accept_tx, stream_accept_rx) = mpsc::channel(1);
let (stream_accept_tx, stream_accept_rx) =
mpsc::channel(*crate::option::AMUX_ACCEPT_CHANNEL_SIZE);
let session_id = random_u16();
let recv_handle = Self::run_frame_receive_loop(
streams.clone(),

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::io;
use std::net::SocketAddr;
@@ -59,10 +59,8 @@ where
let dst_addr = SocksAddr::read_from(&mut self.0, SocksAddrWireType::PortLast)
.map_err(|e| ProxyError::DatagramFatal(e.into()))
.await?;
let mut buf2 = BytesMut::new();
buf2.resize(2, 0);
let _ = self
.0
let mut buf2 = [0; 4];
self.0
.read_exact(&mut buf2)
.map_err(|e| ProxyError::DatagramFatal(e.into()))
.await?;
@@ -70,21 +68,11 @@ where
if buf.len() < payload_len {
return Err(ProxyError::DatagramFatal(anyhow!("Small buffer")));
}
let _ = self
.0
.read_exact(&mut buf2)
// TODO Check CRLF?
self.0
.read_exact(&mut buf[..payload_len])
.map_err(|e| ProxyError::DatagramFatal(e.into()))
.await?;
if &buf2[..2] != b"\r\n" {
return Err(ProxyError::DatagramFatal(anyhow!("Expeced CRLF")));
}
buf2.resize(payload_len, 0);
let _ = self
.0
.read_exact(&mut buf2)
.map_err(|e| ProxyError::DatagramFatal(e.into()))
.await?;
buf[..payload_len].copy_from_slice(&buf2[..payload_len]);
trace!(
"trojan inbound received UDP {} bytes for {}",
payload_len,
@@ -126,16 +114,16 @@ where
}
pub struct Handler {
keys: HashMap<Vec<u8>, ()>,
keys: HashSet<Vec<u8>>,
}
impl Handler {
pub fn new(passwords: Vec<String>) -> Self {
let mut keys = HashMap::new();
let mut keys = HashSet::new();
for pass in passwords {
let key = Sha224::digest(pass.as_bytes());
let key = hex::encode(&key[..]);
keys.insert(key.as_bytes().to_vec(), ());
keys.insert(key.as_bytes().to_vec());
}
Handler { keys }
}
@@ -148,26 +136,21 @@ impl InboundStreamHandler for Handler {
mut sess: Session,
mut stream: AnyStream,
) -> std::io::Result<AnyInboundTransport> {
let mut buf = BytesMut::new();
let mut buf = [0; 56];
// read key
buf.resize(56, 0);
stream.read_exact(&mut buf).await?;
if !self.keys.contains_key(&buf[..]) {
stream.read_exact(&mut buf[..56]).await?;
if !self.keys.contains(&buf[..]) {
return Err(io::Error::new(io::ErrorKind::Other, "invalid key"));
}
// read crlf
buf.resize(2, 0);
stream.read_exact(&mut buf).await?;
// read cmd
buf.resize(1, 0);
stream.read_exact(&mut buf).await?;
let cmd = buf[0];
// read crlf and cmd
stream.read_exact(&mut buf[..3]).await?;
// TODO Check CRLF?
let cmd = buf[2];
// read addr
let dst_addr = SocksAddr::read_from(&mut stream, SocksAddrWireType::PortLast).await?;
sess.destination = dst_addr;
// read crlf
buf.resize(2, 0);
stream.read_exact(&mut buf).await?;
stream.read_exact(&mut buf[..2]).await?;
match cmd {
// tcp
0x01 => Ok(InboundTransport::Stream(stream, sess)),

View File

@@ -1,4 +1,3 @@
use std::cmp::min;
use std::io;
use async_trait::async_trait;
@@ -94,22 +93,14 @@ where
{
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocksAddr)> {
let addr = SocksAddr::read_from(&mut self.0, SocksAddrWireType::PortLast).await?;
let mut buf2 = BytesMut::new();
buf2.resize(2, 0);
let _ = self.0.read_exact(&mut buf2).await?;
let payload_len = u16::from_be_bytes(buf2[..2].try_into().unwrap());
let _ = self.0.read_exact(&mut buf2).await?;
if &buf2[..2] != b"\r\n" {
return Err(io::Error::new(io::ErrorKind::Other, "Expected CLRF"));
}
buf2.resize(payload_len as usize, 0);
let _ = self.0.read_exact(&mut buf2).await?;
let to_write = min(buf2.len(), buf.len());
if to_write < buf2.len() {
let mut buf2 = [0; 4];
self.0.read_exact(&mut buf2).await?;
let payload_len = u16::from_be_bytes(buf2[..2].try_into().unwrap()) as usize;
// TODO Check CLRF?
if buf.len() < payload_len {
return Err(io::Error::new(io::ErrorKind::Interrupted, "Small buffer"));
}
buf[..to_write].copy_from_slice(&buf2[..to_write]);
self.0.read_exact(&mut buf[..payload_len]).await?;
// If the initial destination is of domain type, we return that
// domain address instead of the real source address. That also
// means we assume all received packets are comming from a same
@@ -117,17 +108,17 @@ where
if self.1.is_some() {
trace!(
"trojan outbound received UDP {} bytes from {}",
to_write,
payload_len,
self.1.as_ref().unwrap()
);
Ok((to_write, self.1.as_ref().unwrap().clone()))
Ok((payload_len, self.1.as_ref().unwrap().clone()))
} else {
trace!(
"trojan outbound received UDP {} bytes from {}",
to_write,
payload_len,
&addr
);
Ok((to_write, addr))
Ok((payload_len, addr))
}
}
}
@@ -141,10 +132,6 @@ where
{
async fn send_to(&mut self, buf: &[u8], target: &SocksAddr) -> io::Result<usize> {
trace!("trojan outbound send UDP {} bytes to {}", buf.len(), target);
// FIXME we should calculate the return size more carefully.
// max(0, n_written - all_headers_size)
let payload_size = buf.len();
let mut data = BytesMut::new();
target.write_buf(&mut data, SocksAddrWireType::PortLast);
data.put_u16(buf.len() as u16);
@@ -155,11 +142,11 @@ where
if self.1.is_some() {
if let Some(mut head) = self.1.take() {
head.extend_from_slice(&data);
return self.0.write_all(&head).map_ok(|_| payload_size).await;
return self.0.write_all(&head).map_ok(|_| buf.len()).await;
}
}
self.0.write_all(&data).map_ok(|_| payload_size).await
self.0.write_all(&data).map_ok(|_| buf.len()).await
}
async fn close(&mut self) -> io::Result<()> {