Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
533dfae715 | ||
|
|
9a6f28c0bc | ||
|
|
52f7a21939 | ||
|
|
b4b7da5d14 | ||
|
|
bd24848bc2 | ||
|
|
2f09e6896f | ||
|
|
b7ce5d3382 | ||
|
|
e3663a4c3f | ||
|
|
9a5abfded9 | ||
|
|
4b4338b59f |
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "leaf-bin"
|
||||
version = "0.8.2"
|
||||
version = "0.9.1"
|
||||
authors = ["eycorsican <eric.y.corsican@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ all-endpoints = [
|
||||
"inbound-ws",
|
||||
"inbound-tls",
|
||||
"inbound-trojan",
|
||||
# "inbound-http",
|
||||
"inbound-http",
|
||||
"inbound-shadowsocks",
|
||||
"inbound-socks",
|
||||
"inbound-tun",
|
||||
@@ -107,7 +107,7 @@ outbound-vmess = ["lz_fnv", "cfb-mode", "hmac", "aes", "sha3", "digest", "uuid",
|
||||
inbound-trojan = ["sha2", "hex"]
|
||||
inbound-shadowsocks = ["hkdf", "sha-1", "md-5", "tokio-util"]
|
||||
inbound-socks = []
|
||||
inbound-http = ["hyper/server", "hyper/http1"]
|
||||
inbound-http = ["http"]
|
||||
inbound-tun = ["tun", "netstack-lwip"]
|
||||
inbound-ws = ["tungstenite", "tokio-tungstenite", "url", "http"]
|
||||
inbound-amux = ["tokio-util"]
|
||||
@@ -188,9 +188,6 @@ tokio-tungstenite = { version = "0.16", optional = true }
|
||||
url = { version = "2.2", optional = true }
|
||||
http = { version = "0.2", optional = true }
|
||||
|
||||
# HTTP inbound
|
||||
hyper = { version = "0.14", default-features = false, optional = true }
|
||||
|
||||
# SOCKS outbound
|
||||
async-socks5 = { version = "0.5", optional = true }
|
||||
|
||||
|
||||
@@ -30,26 +30,28 @@ fn log_request(
|
||||
handshake_time: Option<u128>,
|
||||
) {
|
||||
let hs = handshake_time.map_or("failed".to_string(), |hs| format!("{}ms", hs));
|
||||
if !*crate::option::LOG_NO_COLOR {
|
||||
let (network, outbound_tag) = if !*crate::option::LOG_NO_COLOR {
|
||||
use colored::Colorize;
|
||||
let network_color = match sess.network {
|
||||
Network::Tcp => colored::Color::Blue,
|
||||
Network::Udp => colored::Color::Yellow,
|
||||
};
|
||||
info!(
|
||||
"[{}] [{}] [{}] [{}] {}",
|
||||
&sess.inbound_tag,
|
||||
sess.network.to_string().color(network_color),
|
||||
outbound_tag.color(*outbound_tag_color),
|
||||
hs,
|
||||
&sess.destination,
|
||||
);
|
||||
(
|
||||
sess.network.to_string().color(network_color).to_string(),
|
||||
outbound_tag.color(*outbound_tag_color).to_string(),
|
||||
)
|
||||
} else {
|
||||
info!(
|
||||
"[{}] [{}] [{}] [{}] {}",
|
||||
sess.network, &sess.inbound_tag, outbound_tag, hs, &sess.destination,
|
||||
);
|
||||
}
|
||||
(sess.network.to_string(), outbound_tag.to_string())
|
||||
};
|
||||
info!(
|
||||
"[{}] [{}] [{}] [{}] [{}] [{}]",
|
||||
sess.forwarded_source.unwrap_or_else(|| sess.source.ip()),
|
||||
network,
|
||||
&sess.inbound_tag,
|
||||
outbound_tag,
|
||||
hs,
|
||||
&sess.destination,
|
||||
);
|
||||
}
|
||||
|
||||
pub struct Dispatcher {
|
||||
|
||||
@@ -180,13 +180,21 @@ impl FakeDnsImpl {
|
||||
// multiple domains point to a same IP.
|
||||
self.domain_to_ip.remove(&prev_domain);
|
||||
}
|
||||
let ip = self.get_ip();
|
||||
self.domain_to_ip.insert(domain.to_owned(), self.cursor);
|
||||
let ip = Self::u32_to_ip(self.cursor);
|
||||
self.cursor += 1;
|
||||
ip
|
||||
}
|
||||
|
||||
fn get_ip(&mut self) -> Ipv4Addr {
|
||||
if self.cursor > self.max_cursor {
|
||||
self.cursor = self.min_cursor;
|
||||
}
|
||||
ip
|
||||
let ip = Self::u32_to_ip(self.cursor);
|
||||
match ip.octets()[3] {
|
||||
0 | 255 => { self.cursor += 1;self.get_ip() },
|
||||
_ => ip,
|
||||
}
|
||||
}
|
||||
|
||||
fn accept(&self, domain: &str) -> bool {
|
||||
|
||||
@@ -163,6 +163,22 @@ impl Counter {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn log_session_end(c: &Counter) {
|
||||
log::info!(
|
||||
"[{}] [{}] [{}] [{}] [{}] [{}] [{}] [END]",
|
||||
c.sess
|
||||
.forwarded_source
|
||||
.unwrap_or_else(|| c.sess.source.ip()),
|
||||
c.sess.network,
|
||||
c.sess.inbound_tag,
|
||||
c.sess.outbound_tag,
|
||||
c.sess.destination,
|
||||
c.bytes_sent(),
|
||||
c.bytes_recvd(),
|
||||
);
|
||||
}
|
||||
|
||||
pub struct StatManager {
|
||||
pub counters: Vec<Counter>,
|
||||
}
|
||||
@@ -182,7 +198,8 @@ impl StatManager {
|
||||
let mut i = 0;
|
||||
while i < sm.counters.len() {
|
||||
if sm.counters[i].recv_completed() && sm.counters[i].send_completed() {
|
||||
sm.counters.swap_remove(i);
|
||||
let c = sm.counters.swap_remove(i);
|
||||
log_session_end(&c);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
|
||||
@@ -120,29 +120,46 @@ impl OutboundDatagramHandler for Handler {
|
||||
let a = &self.actors[i];
|
||||
|
||||
debug!(
|
||||
"failover handles udp [{}] to [{}]",
|
||||
"[{}] handles [{}:{}] to [{}]",
|
||||
a.tag(),
|
||||
sess.network,
|
||||
sess.destination,
|
||||
a.tag()
|
||||
);
|
||||
|
||||
match timeout(
|
||||
Duration::from_secs(self.fail_timeout as u64),
|
||||
a.datagram()?.handle(
|
||||
sess,
|
||||
connect_datagram_outbound(sess, self.dns_client.clone(), a).await?,
|
||||
),
|
||||
)
|
||||
.await
|
||||
{
|
||||
// return before timeout
|
||||
let try_outbound = async move {
|
||||
a.datagram()?
|
||||
.handle(
|
||||
sess,
|
||||
connect_datagram_outbound(sess, self.dns_client.clone(), a).await?,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
match timeout(Duration::from_secs(self.fail_timeout as u64), try_outbound).await {
|
||||
Ok(t) => match t {
|
||||
// return ok
|
||||
Ok(v) => return Ok(v),
|
||||
// return err
|
||||
Err(_) => continue,
|
||||
Err(e) => {
|
||||
trace!(
|
||||
"[{}] failed to handle [{}:{}]: {}",
|
||||
a.tag(),
|
||||
sess.network,
|
||||
sess.destination,
|
||||
e,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
// after timeout
|
||||
Err(_) => continue,
|
||||
Err(e) => {
|
||||
trace!(
|
||||
"[{}] failed to handle [{}:{}]: {}",
|
||||
a.tag(),
|
||||
sess.network,
|
||||
sess.destination,
|
||||
e,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -159,21 +159,23 @@ impl OutboundStreamHandler for Handler {
|
||||
let a = &self.actors[actor_idx];
|
||||
|
||||
debug!(
|
||||
"failover handles tcp [{}] to [{}]",
|
||||
"[{}] handles [{}:{}] to [{}]",
|
||||
a.tag(),
|
||||
sess.network,
|
||||
sess.destination,
|
||||
a.tag()
|
||||
);
|
||||
|
||||
match timeout(
|
||||
Duration::from_secs(self.fail_timeout as u64),
|
||||
a.stream()?.handle(
|
||||
sess,
|
||||
connect_stream_outbound(sess, self.dns_client.clone(), a).await?,
|
||||
),
|
||||
)
|
||||
.await
|
||||
{
|
||||
// return before timeout
|
||||
let try_outbound = async move {
|
||||
a.stream()?
|
||||
.handle(
|
||||
sess,
|
||||
connect_stream_outbound(sess, self.dns_client.clone(), a).await?,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
match timeout(Duration::from_secs(self.fail_timeout as u64), try_outbound).await {
|
||||
Ok(t) => match t {
|
||||
Ok(v) => {
|
||||
// Only cache for fallback actors.
|
||||
@@ -188,8 +190,9 @@ impl OutboundStreamHandler for Handler {
|
||||
}
|
||||
Err(e) => {
|
||||
trace!(
|
||||
"[{}] failed to handle [{}]: {}",
|
||||
"[{}] failed to handle [{}:{}]: {}",
|
||||
a.tag(),
|
||||
sess.network,
|
||||
sess.destination,
|
||||
e,
|
||||
);
|
||||
@@ -198,8 +201,9 @@ impl OutboundStreamHandler for Handler {
|
||||
},
|
||||
Err(e) => {
|
||||
trace!(
|
||||
"[{}] failed to handle [{}]: {}",
|
||||
"[{}] failed to handle [{}:{}]: {}",
|
||||
a.tag(),
|
||||
sess.network,
|
||||
sess.destination,
|
||||
e,
|
||||
);
|
||||
|
||||
@@ -1,53 +1,228 @@
|
||||
use std::convert::TryFrom;
|
||||
use std::io;
|
||||
use std::{net::IpAddr, pin::Pin, task::Poll};
|
||||
use std::str;
|
||||
use std::cmp;
|
||||
use std::convert::TryFrom;
|
||||
use std::{net::IpAddr, pin::Pin, task::Poll, task::Context};
|
||||
|
||||
use anyhow::Result;
|
||||
use tokio::io::{AsyncWriteExt, AsyncReadExt, ReadBuf};
|
||||
use bytes::BytesMut;
|
||||
use async_trait::async_trait;
|
||||
use futures::future::{self, Future};
|
||||
use hyper::{server::conn::Http, service::Service, Body, Request, Response};
|
||||
use log::*;
|
||||
use ::http::{Method, Uri};
|
||||
|
||||
use crate::{
|
||||
proxy::*,
|
||||
session::{Session, SocksAddr},
|
||||
};
|
||||
|
||||
struct ProxyService {
|
||||
uri: String,
|
||||
const BUFFER_SIZE: usize = 1024;
|
||||
const EOL: [u8; 2] = [13, 10];
|
||||
const EOH: [u8; 4] = [13, 10, 13, 10];
|
||||
|
||||
fn bad_request() -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, "bad request")
|
||||
}
|
||||
|
||||
impl ProxyService {
|
||||
pub fn new() -> Self {
|
||||
ProxyService {
|
||||
uri: "".to_string(),
|
||||
fn split_slice_once(s: &[u8], sep: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
|
||||
s.windows(sep.len()).position(|w| w == sep).map(|loc| (s[..loc].to_vec(), s[loc..].to_vec()))
|
||||
}
|
||||
|
||||
/// Parse destination
|
||||
impl TryFrom<&Uri> for SocksAddr {
|
||||
type Error = io::Error;
|
||||
fn try_from(uri: &Uri) -> Result<Self, Self::Error> {
|
||||
let (host, port) = (
|
||||
uri.host().ok_or(bad_request())?,
|
||||
uri.port_u16().or_else(|| {
|
||||
match uri.scheme_str() {
|
||||
Some("http") => Some(80),
|
||||
Some("https") => Some(443),
|
||||
_ => None,
|
||||
}
|
||||
}).ok_or(bad_request())?
|
||||
);
|
||||
let addr = if let Ok(host) = host.parse::<IpAddr>() {
|
||||
SocksAddr::from((host, port))
|
||||
} else {
|
||||
SocksAddr::try_from((host, port))?
|
||||
};
|
||||
Ok(addr)
|
||||
}
|
||||
}
|
||||
|
||||
/// https://www.rfc-editor.org/rfc/rfc7230#section-5.3
|
||||
enum TargetFormat {
|
||||
Origin,
|
||||
Absolute,
|
||||
Authority,
|
||||
Asterisk,
|
||||
}
|
||||
|
||||
struct RequestHead {
|
||||
method: Method,
|
||||
uri: Uri,
|
||||
version: String,
|
||||
headers: Vec<(String, String)>,
|
||||
target_format: TargetFormat,
|
||||
}
|
||||
|
||||
impl RequestHead {
|
||||
fn parse_request_line(request_line: &[u8]) -> io::Result<(Method, Uri, String)> {
|
||||
let mut tokens = str::from_utf8(request_line).unwrap_or("").splitn(3, ' ');
|
||||
let method = match Method::try_from(tokens.next().unwrap_or("")) {
|
||||
Ok(v) => v,
|
||||
Err(_e) => return Err(bad_request()),
|
||||
};
|
||||
let uri = match Uri::try_from(tokens.next().unwrap_or("")) {
|
||||
Ok(v) => v,
|
||||
Err(_e) => return Err(bad_request()),
|
||||
};
|
||||
let version = tokens.next().unwrap_or("HTTP/1.1");
|
||||
Ok((method, uri, version.to_string()))
|
||||
}
|
||||
|
||||
fn parse_headers(header_lines: &[u8]) -> io::Result<Vec<(String, String)>> {
|
||||
let mut headers = Vec::new();
|
||||
let lines = str::from_utf8(header_lines).unwrap_or("").split("\r\n");
|
||||
for line in lines {
|
||||
let (name, value) = match line.split_once(':') {
|
||||
Some((n, v)) => (n.trim(), v.trim()),
|
||||
None => continue,
|
||||
};
|
||||
headers.push((name.to_string(), value.to_string()));
|
||||
}
|
||||
Ok(headers)
|
||||
}
|
||||
|
||||
fn set_header(&mut self, name: String, value: String) {
|
||||
for (i, (n, _v)) in self.headers.iter().enumerate() {
|
||||
if n.to_lowercase() == name.to_lowercase() {
|
||||
self.headers[i] = (n.clone(), value);
|
||||
return;
|
||||
}
|
||||
}
|
||||
self.headers.push((name, value));
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<Vec<u8>> for RequestHead {
|
||||
fn into(self) -> Vec<u8> {
|
||||
let mut head = Vec::new();
|
||||
let request_line = format!("{} {} {}\r\n", self.method, self.uri, self.version);
|
||||
head.append(&mut request_line.into_bytes());
|
||||
for (name, value) in self.headers {
|
||||
let header = format!("{}: {}\r\n", name, value);
|
||||
head.append(&mut header.into_bytes());
|
||||
}
|
||||
head.extend_from_slice("\r\n".as_bytes());
|
||||
head
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Vec<u8>> for RequestHead {
|
||||
type Error = io::Error;
|
||||
fn try_from(head: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
let (request_line, header) = split_slice_once(&head, &EOL)
|
||||
.unwrap_or((head, Vec::new()));
|
||||
let (method, uri, version) = RequestHead::parse_request_line(&request_line)?;
|
||||
let headers = RequestHead::parse_headers(&header)?;
|
||||
let target_format = if uri.to_string() == "*" {
|
||||
TargetFormat::Asterisk
|
||||
} else if uri.scheme().is_some() {
|
||||
TargetFormat::Absolute
|
||||
} else if method == Method::CONNECT {
|
||||
TargetFormat::Authority
|
||||
} else {
|
||||
TargetFormat::Origin
|
||||
};
|
||||
Ok(RequestHead {
|
||||
method,
|
||||
uri,
|
||||
version,
|
||||
headers,
|
||||
target_format,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct HttpStream {
|
||||
cache: Vec<u8>,
|
||||
destination: Option<SocksAddr>,
|
||||
origin: AnyStream,
|
||||
}
|
||||
|
||||
impl HttpStream {
|
||||
async fn sniff(&mut self) -> io::Result<()> {
|
||||
let (head_buf, mut rest_buf) = self.drain(&EOH).await?;
|
||||
let mut head = RequestHead::try_from(head_buf)?;
|
||||
|
||||
let addr = SocksAddr::try_from(&head.uri)?;
|
||||
self.destination = Some(addr.clone());
|
||||
|
||||
match head.target_format {
|
||||
TargetFormat::Absolute => {
|
||||
let path_and_query = head.uri.path_and_query().map(|paq| paq.as_str()).unwrap_or("/");
|
||||
head.uri = path_and_query.parse().unwrap();
|
||||
head.set_header("host".to_string(), addr.to_string());
|
||||
self.cache.clear();
|
||||
self.cache.append(&mut head.into());
|
||||
self.cache.append(&mut rest_buf);
|
||||
return Ok(());
|
||||
},
|
||||
TargetFormat::Authority => {
|
||||
self.origin.write_all(b"HTTP/1.1 200 Connection established\r\n\r\n").await?;
|
||||
return Ok(());
|
||||
},
|
||||
_ => return Err(bad_request()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_uri(&self) -> &String {
|
||||
&self.uri
|
||||
async fn drain(&mut self, stop_sign: &[u8]) -> io::Result<(Vec<u8>, Vec<u8>)> {
|
||||
let mut data = Vec::new();
|
||||
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
|
||||
loop {
|
||||
buf.clear();
|
||||
let n = self.origin.read_buf(&mut buf).await?;
|
||||
data.extend_from_slice(&buf[..n]);
|
||||
match split_slice_once(&data, stop_sign) {
|
||||
Some(v) => return Ok(v),
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
impl Service<Request<Body>> for ProxyService {
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
type Response = Response<Body>;
|
||||
impl AsyncRead for HttpStream {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
if !self.cache.is_empty() {
|
||||
let n = cmp::min(buf.capacity(), self.cache.len());
|
||||
let cached_data = self.cache.drain(..n);
|
||||
buf.put_slice(cached_data.as_slice());
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
Pin::new(&mut self.origin).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request<Body>) -> Self::Future {
|
||||
self.uri = req.uri().to_string();
|
||||
Box::pin(future::ready(Ok(Response::builder()
|
||||
.status(200)
|
||||
.body(hyper::Body::empty())
|
||||
.unwrap())))
|
||||
impl AsyncWrite for HttpStream {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.origin).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<std::result::Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.origin).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.origin).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,47 +233,17 @@ impl InboundStreamHandler for Handler {
|
||||
async fn handle<'a>(
|
||||
&'a self,
|
||||
mut sess: Session,
|
||||
stream: Box<dyn ProxyStream>,
|
||||
stream: AnyStream,
|
||||
) -> std::io::Result<AnyInboundTransport> {
|
||||
let http = Http::new();
|
||||
let proxy_service = ProxyService::new();
|
||||
let conn = http
|
||||
.serve_connection(stream, proxy_service)
|
||||
.without_shutdown();
|
||||
let parts = match conn.await {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
debug!("accept conn failed: {}", err);
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
|
||||
}
|
||||
let mut http_stream = HttpStream {
|
||||
cache: Vec::new(),
|
||||
destination: None,
|
||||
origin: stream,
|
||||
};
|
||||
http_stream.sniff().await?;
|
||||
|
||||
let uri = parts.service.get_uri();
|
||||
let host_port: Vec<&str> = uri.split(':').collect();
|
||||
if host_port.len() != 2 {
|
||||
debug!("invalid target {:?}", uri);
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
|
||||
}
|
||||
sess.destination = http_stream.destination.clone().ok_or(bad_request())?;
|
||||
|
||||
let destination = if let Ok(port) = host_port[1].parse::<u16>() {
|
||||
if let Ok(ip) = host_port[0].parse::<IpAddr>() {
|
||||
SocksAddr::from((ip, port))
|
||||
} else {
|
||||
match SocksAddr::try_from((host_port[0], port)) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
debug!("invalid target {:?}: {}", uri, err);
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("invalid target {:?}", uri);
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
|
||||
};
|
||||
|
||||
sess.destination = destination;
|
||||
|
||||
Ok(InboundTransport::Stream(Box::new(parts.io), sess))
|
||||
Ok(InboundTransport::Stream(Box::new(http_stream), sess))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user