Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
338ab32a2d | ||
|
|
7305a241c0 | ||
|
|
f7788cdc73 | ||
|
|
7718eae9d2 | ||
|
|
a59cefc498 |
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "leaf-bin"
|
||||
version = "0.10.6"
|
||||
version = "0.10.7"
|
||||
authors = ["eycorsican <eric.y.corsican@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
|
||||
@@ -134,13 +134,16 @@ impl NatManager {
|
||||
return;
|
||||
}
|
||||
|
||||
let sess = sess.cloned().unwrap_or(Session {
|
||||
let mut sess = sess.cloned().unwrap_or(Session {
|
||||
network: Network::Udp,
|
||||
source: dgram_src.address,
|
||||
destination: pkt.dst_addr.clone(),
|
||||
inbound_tag: inbound_tag.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
if sess.inbound_tag.is_empty() {
|
||||
sess.inbound_tag = inbound_tag.to_string();
|
||||
}
|
||||
|
||||
self.add_session(sess, dgram_src.clone(), client_ch_tx.clone(), &mut guard)
|
||||
.await;
|
||||
|
||||
@@ -136,6 +136,18 @@ lazy_static! {
|
||||
get_env_var_or("QUIC_ACCEPT_CHANNEL_SIZE", 1024)
|
||||
};
|
||||
|
||||
pub static ref AMUX_ACCEPT_CHANNEL_SIZE: usize = {
|
||||
get_env_var_or("AMUX_ACCEPT_CHANNEL_SIZE", 1024)
|
||||
};
|
||||
|
||||
pub static ref AMUX_STREAM_CHANNEL_SIZE: usize = {
|
||||
get_env_var_or("AMUX_STREAM_CHANNEL_SIZE", 16)
|
||||
};
|
||||
|
||||
pub static ref AMUX_FRAME_CHANNEL_SIZE: usize = {
|
||||
get_env_var_or("AMUX_FRAME_CHANNEL_SIZE", 32)
|
||||
};
|
||||
|
||||
/// Buffer size for UDP datagrams receiving/sending, in KB.
|
||||
pub static ref DATAGRAM_BUFFER_SIZE: usize = {
|
||||
get_env_var_or("DATAGRAM_BUFFER_SIZE", 2)
|
||||
|
||||
@@ -112,7 +112,8 @@ impl MuxStream {
|
||||
stream_end: Arc<AtomicBool>,
|
||||
) -> (Self, Sender<Vec<u8>>) {
|
||||
trace!("new mux stream {} (session {})", stream_id, session_id);
|
||||
let (stream_read_tx, stream_read_rx) = mpsc::channel::<Vec<u8>>(1);
|
||||
let (stream_read_tx, stream_read_rx) =
|
||||
mpsc::channel::<Vec<u8>>(*crate::option::AMUX_STREAM_CHANNEL_SIZE);
|
||||
(
|
||||
MuxStream {
|
||||
session_id,
|
||||
@@ -364,8 +365,6 @@ impl<S: AsyncWrite + Unpin> Sink<MuxFrame> for MuxConnection<S> {
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let me = &mut *self;
|
||||
|
||||
// ready!(Pin::new(&mut me.inner.write_all(&me.write_buf)).poll(cx))?;
|
||||
|
||||
while !me.write_buf.is_empty() {
|
||||
let n = ready!(Pin::new(&mut me.inner).poll_write(cx, &me.write_buf))?;
|
||||
if n == 0 {
|
||||
@@ -528,7 +527,8 @@ impl MuxSession {
|
||||
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
|
||||
{
|
||||
let (frame_sink, frame_stream) = MuxConnection::new(conn).split();
|
||||
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
|
||||
let (frame_write_tx, frame_write_rx) =
|
||||
mpsc::channel::<MuxFrame>(*crate::option::AMUX_FRAME_CHANNEL_SIZE);
|
||||
let (recv_end, send_end) = (Arc::new(Mutex::new(false)), Arc::new(Mutex::new(false)));
|
||||
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
|
||||
let recv_bytes_counter = Arc::new(AtomicUsize::new(0));
|
||||
@@ -569,9 +569,11 @@ impl MuxSession {
|
||||
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
|
||||
{
|
||||
let (frame_sink, frame_stream) = MuxConnection::new(conn).split();
|
||||
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
|
||||
let (frame_write_tx, frame_write_rx) =
|
||||
mpsc::channel::<MuxFrame>(*crate::option::AMUX_FRAME_CHANNEL_SIZE);
|
||||
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
|
||||
let (stream_accept_tx, stream_accept_rx) = mpsc::channel(1);
|
||||
let (stream_accept_tx, stream_accept_rx) =
|
||||
mpsc::channel(*crate::option::AMUX_ACCEPT_CHANNEL_SIZE);
|
||||
let session_id = random_u16();
|
||||
let recv_handle = Self::run_frame_receive_loop(
|
||||
streams.clone(),
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
@@ -59,10 +59,8 @@ where
|
||||
let dst_addr = SocksAddr::read_from(&mut self.0, SocksAddrWireType::PortLast)
|
||||
.map_err(|e| ProxyError::DatagramFatal(e.into()))
|
||||
.await?;
|
||||
let mut buf2 = BytesMut::new();
|
||||
buf2.resize(2, 0);
|
||||
let _ = self
|
||||
.0
|
||||
let mut buf2 = [0; 4];
|
||||
self.0
|
||||
.read_exact(&mut buf2)
|
||||
.map_err(|e| ProxyError::DatagramFatal(e.into()))
|
||||
.await?;
|
||||
@@ -70,21 +68,11 @@ where
|
||||
if buf.len() < payload_len {
|
||||
return Err(ProxyError::DatagramFatal(anyhow!("Small buffer")));
|
||||
}
|
||||
let _ = self
|
||||
.0
|
||||
.read_exact(&mut buf2)
|
||||
// TODO Check CRLF?
|
||||
self.0
|
||||
.read_exact(&mut buf[..payload_len])
|
||||
.map_err(|e| ProxyError::DatagramFatal(e.into()))
|
||||
.await?;
|
||||
if &buf2[..2] != b"\r\n" {
|
||||
return Err(ProxyError::DatagramFatal(anyhow!("Expeced CRLF")));
|
||||
}
|
||||
buf2.resize(payload_len, 0);
|
||||
let _ = self
|
||||
.0
|
||||
.read_exact(&mut buf2)
|
||||
.map_err(|e| ProxyError::DatagramFatal(e.into()))
|
||||
.await?;
|
||||
buf[..payload_len].copy_from_slice(&buf2[..payload_len]);
|
||||
trace!(
|
||||
"trojan inbound received UDP {} bytes for {}",
|
||||
payload_len,
|
||||
@@ -126,16 +114,16 @@ where
|
||||
}
|
||||
|
||||
pub struct Handler {
|
||||
keys: HashMap<Vec<u8>, ()>,
|
||||
keys: HashSet<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
pub fn new(passwords: Vec<String>) -> Self {
|
||||
let mut keys = HashMap::new();
|
||||
let mut keys = HashSet::new();
|
||||
for pass in passwords {
|
||||
let key = Sha224::digest(pass.as_bytes());
|
||||
let key = hex::encode(&key[..]);
|
||||
keys.insert(key.as_bytes().to_vec(), ());
|
||||
keys.insert(key.as_bytes().to_vec());
|
||||
}
|
||||
Handler { keys }
|
||||
}
|
||||
@@ -148,26 +136,21 @@ impl InboundStreamHandler for Handler {
|
||||
mut sess: Session,
|
||||
mut stream: AnyStream,
|
||||
) -> std::io::Result<AnyInboundTransport> {
|
||||
let mut buf = BytesMut::new();
|
||||
let mut buf = [0; 56];
|
||||
// read key
|
||||
buf.resize(56, 0);
|
||||
stream.read_exact(&mut buf).await?;
|
||||
if !self.keys.contains_key(&buf[..]) {
|
||||
stream.read_exact(&mut buf[..56]).await?;
|
||||
if !self.keys.contains(&buf[..]) {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "invalid key"));
|
||||
}
|
||||
// read crlf
|
||||
buf.resize(2, 0);
|
||||
stream.read_exact(&mut buf).await?;
|
||||
// read cmd
|
||||
buf.resize(1, 0);
|
||||
stream.read_exact(&mut buf).await?;
|
||||
let cmd = buf[0];
|
||||
// read crlf and cmd
|
||||
stream.read_exact(&mut buf[..3]).await?;
|
||||
// TODO Check CRLF?
|
||||
let cmd = buf[2];
|
||||
// read addr
|
||||
let dst_addr = SocksAddr::read_from(&mut stream, SocksAddrWireType::PortLast).await?;
|
||||
sess.destination = dst_addr;
|
||||
// read crlf
|
||||
buf.resize(2, 0);
|
||||
stream.read_exact(&mut buf).await?;
|
||||
stream.read_exact(&mut buf[..2]).await?;
|
||||
match cmd {
|
||||
// tcp
|
||||
0x01 => Ok(InboundTransport::Stream(stream, sess)),
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use std::cmp::min;
|
||||
use std::io;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -94,22 +93,14 @@ where
|
||||
{
|
||||
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocksAddr)> {
|
||||
let addr = SocksAddr::read_from(&mut self.0, SocksAddrWireType::PortLast).await?;
|
||||
let mut buf2 = BytesMut::new();
|
||||
buf2.resize(2, 0);
|
||||
let _ = self.0.read_exact(&mut buf2).await?;
|
||||
let payload_len = u16::from_be_bytes(buf2[..2].try_into().unwrap());
|
||||
let _ = self.0.read_exact(&mut buf2).await?;
|
||||
if &buf2[..2] != b"\r\n" {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "Expected CLRF"));
|
||||
}
|
||||
buf2.resize(payload_len as usize, 0);
|
||||
let _ = self.0.read_exact(&mut buf2).await?;
|
||||
let to_write = min(buf2.len(), buf.len());
|
||||
if to_write < buf2.len() {
|
||||
let mut buf2 = [0; 4];
|
||||
self.0.read_exact(&mut buf2).await?;
|
||||
let payload_len = u16::from_be_bytes(buf2[..2].try_into().unwrap()) as usize;
|
||||
// TODO Check CLRF?
|
||||
if buf.len() < payload_len {
|
||||
return Err(io::Error::new(io::ErrorKind::Interrupted, "Small buffer"));
|
||||
}
|
||||
buf[..to_write].copy_from_slice(&buf2[..to_write]);
|
||||
|
||||
self.0.read_exact(&mut buf[..payload_len]).await?;
|
||||
// If the initial destination is of domain type, we return that
|
||||
// domain address instead of the real source address. That also
|
||||
// means we assume all received packets are comming from a same
|
||||
@@ -117,17 +108,17 @@ where
|
||||
if self.1.is_some() {
|
||||
trace!(
|
||||
"trojan outbound received UDP {} bytes from {}",
|
||||
to_write,
|
||||
payload_len,
|
||||
self.1.as_ref().unwrap()
|
||||
);
|
||||
Ok((to_write, self.1.as_ref().unwrap().clone()))
|
||||
Ok((payload_len, self.1.as_ref().unwrap().clone()))
|
||||
} else {
|
||||
trace!(
|
||||
"trojan outbound received UDP {} bytes from {}",
|
||||
to_write,
|
||||
payload_len,
|
||||
&addr
|
||||
);
|
||||
Ok((to_write, addr))
|
||||
Ok((payload_len, addr))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -141,10 +132,6 @@ where
|
||||
{
|
||||
async fn send_to(&mut self, buf: &[u8], target: &SocksAddr) -> io::Result<usize> {
|
||||
trace!("trojan outbound send UDP {} bytes to {}", buf.len(), target);
|
||||
// FIXME we should calculate the return size more carefully.
|
||||
// max(0, n_written - all_headers_size)
|
||||
let payload_size = buf.len();
|
||||
|
||||
let mut data = BytesMut::new();
|
||||
target.write_buf(&mut data, SocksAddrWireType::PortLast);
|
||||
data.put_u16(buf.len() as u16);
|
||||
@@ -155,11 +142,11 @@ where
|
||||
if self.1.is_some() {
|
||||
if let Some(mut head) = self.1.take() {
|
||||
head.extend_from_slice(&data);
|
||||
return self.0.write_all(&head).map_ok(|_| payload_size).await;
|
||||
return self.0.write_all(&head).map_ok(|_| buf.len()).await;
|
||||
}
|
||||
}
|
||||
|
||||
self.0.write_all(&data).map_ok(|_| payload_size).await
|
||||
self.0.write_all(&data).map_ok(|_| buf.len()).await
|
||||
}
|
||||
|
||||
async fn close(&mut self) -> io::Result<()> {
|
||||
|
||||
Reference in New Issue
Block a user