8 Commits

Author SHA1 Message Date
eric
52f7a21939 v0.9.0
Some checks failed
ci / test (macos-latest) (push) Has been cancelled
ci / test (ubuntu-latest) (push) Has been cancelled
ci / build-bin-cross (aarch64-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (arm-unknown-linux-musleabi) (push) Has been cancelled
ci / build-bin-cross (armv7-unknown-linux-musleabihf) (push) Has been cancelled
ci / build-bin-cross (i686-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (mips-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (mipsel-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (x86_64-pc-windows-gnu) (push) Has been cancelled
ci / build-bin-cross (x86_64-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-local (macos-latest, x86_64-apple-darwin) (push) Has been cancelled
ci / build-apple (push) Has been cancelled
ci / build-android (push) Has been cancelled
releases / build-bin-cross (aarch64-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-cross (mips-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-cross (x86_64-pc-windows-gnu) (push) Has been cancelled
releases / build-bin-cross (x86_64-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-local (macos-latest, x86_64-apple-darwin) (push) Has been cancelled
releases / build-apple (push) Has been cancelled
releases / build-android (push) Has been cancelled
releases / create-release (push) Has been cancelled
releases / release-bin (aarch64-unknown-linux-musl) (push) Has been cancelled
releases / release-bin (mips-unknown-linux-musl) (push) Has been cancelled
releases / release-bin (x86_64-apple-darwin) (push) Has been cancelled
releases / release-bin (x86_64-pc-windows-gnu) (push) Has been cancelled
releases / release-bin (x86_64-unknown-linux-musl) (push) Has been cancelled
releases / release-mobile-libs (push) Has been cancelled
2023-05-24 04:20:45 +08:00
eric
b4b7da5d14 bugfix, outbound/failover: fail-timeout should cover the connect time 2023-05-24 04:09:24 +08:00
Trump Founder
bd24848bc2 outbound/failover: catch more error to try next actor 2023-05-24 03:28:36 +08:00
Trump Founder
2f09e6896f inbound/http: handle one-line head 2023-05-10 17:06:05 +08:00
Trump Founder
b7ce5d3382 inbound/http: ensure the host header field as the same as absolute uri 2023-05-09 15:37:39 +08:00
eric
e3663a4c3f Enable http inbound feature 2023-05-07 18:35:47 +08:00
Trump Founder
9a5abfded9 inbound/http: fixed plain http request failure 2023-05-07 18:34:43 +08:00
zh-yjie
4b4338b59f ignore network number and broadcast IP address 2023-03-31 12:29:19 +08:00
6 changed files with 275 additions and 104 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "leaf-bin"
version = "0.8.2"
version = "0.9.0"
authors = ["eycorsican <eric.y.corsican@gmail.com>"]
edition = "2021"

View File

@@ -47,7 +47,7 @@ all-endpoints = [
"inbound-ws",
"inbound-tls",
"inbound-trojan",
# "inbound-http",
"inbound-http",
"inbound-shadowsocks",
"inbound-socks",
"inbound-tun",
@@ -107,7 +107,7 @@ outbound-vmess = ["lz_fnv", "cfb-mode", "hmac", "aes", "sha3", "digest", "uuid",
inbound-trojan = ["sha2", "hex"]
inbound-shadowsocks = ["hkdf", "sha-1", "md-5", "tokio-util"]
inbound-socks = []
inbound-http = ["hyper/server", "hyper/http1"]
inbound-http = ["http"]
inbound-tun = ["tun", "netstack-lwip"]
inbound-ws = ["tungstenite", "tokio-tungstenite", "url", "http"]
inbound-amux = ["tokio-util"]
@@ -188,9 +188,6 @@ tokio-tungstenite = { version = "0.16", optional = true }
url = { version = "2.2", optional = true }
http = { version = "0.2", optional = true }
# HTTP inbound
hyper = { version = "0.14", default-features = false, optional = true }
# SOCKS outbound
async-socks5 = { version = "0.5", optional = true }

View File

@@ -180,13 +180,21 @@ impl FakeDnsImpl {
// multiple domains point to a same IP.
self.domain_to_ip.remove(&prev_domain);
}
let ip = self.get_ip();
self.domain_to_ip.insert(domain.to_owned(), self.cursor);
let ip = Self::u32_to_ip(self.cursor);
self.cursor += 1;
ip
}
fn get_ip(&mut self) -> Ipv4Addr {
if self.cursor > self.max_cursor {
self.cursor = self.min_cursor;
}
ip
let ip = Self::u32_to_ip(self.cursor);
match ip.octets()[3] {
0 | 255 => { self.cursor += 1;self.get_ip() },
_ => ip,
}
}
fn accept(&self, domain: &str) -> bool {

View File

@@ -120,29 +120,46 @@ impl OutboundDatagramHandler for Handler {
let a = &self.actors[i];
debug!(
"failover handles udp [{}] to [{}]",
"[{}] handles [{}:{}] to [{}]",
a.tag(),
sess.network,
sess.destination,
a.tag()
);
match timeout(
Duration::from_secs(self.fail_timeout as u64),
a.datagram()?.handle(
sess,
connect_datagram_outbound(sess, self.dns_client.clone(), a).await?,
),
)
.await
{
// return before timeout
let try_outbound = async move {
a.datagram()?
.handle(
sess,
connect_datagram_outbound(sess, self.dns_client.clone(), a).await?,
)
.await
};
match timeout(Duration::from_secs(self.fail_timeout as u64), try_outbound).await {
Ok(t) => match t {
// return ok
Ok(v) => return Ok(v),
// return err
Err(_) => continue,
Err(e) => {
trace!(
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);
continue;
}
},
// after timeout
Err(_) => continue,
Err(e) => {
trace!(
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);
continue;
}
}
}

View File

@@ -159,21 +159,23 @@ impl OutboundStreamHandler for Handler {
let a = &self.actors[actor_idx];
debug!(
"failover handles tcp [{}] to [{}]",
"[{}] handles [{}:{}] to [{}]",
a.tag(),
sess.network,
sess.destination,
a.tag()
);
match timeout(
Duration::from_secs(self.fail_timeout as u64),
a.stream()?.handle(
sess,
connect_stream_outbound(sess, self.dns_client.clone(), a).await?,
),
)
.await
{
// return before timeout
let try_outbound = async move {
a.stream()?
.handle(
sess,
connect_stream_outbound(sess, self.dns_client.clone(), a).await?,
)
.await
};
match timeout(Duration::from_secs(self.fail_timeout as u64), try_outbound).await {
Ok(t) => match t {
Ok(v) => {
// Only cache for fallback actors.
@@ -188,8 +190,9 @@ impl OutboundStreamHandler for Handler {
}
Err(e) => {
trace!(
"[{}] failed to handle [{}]: {}",
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);
@@ -198,8 +201,9 @@ impl OutboundStreamHandler for Handler {
},
Err(e) => {
trace!(
"[{}] failed to handle [{}]: {}",
"[{}] failed to handle [{}:{}]: {}",
a.tag(),
sess.network,
sess.destination,
e,
);

View File

@@ -1,53 +1,228 @@
use std::convert::TryFrom;
use std::io;
use std::{net::IpAddr, pin::Pin, task::Poll};
use std::str;
use std::cmp;
use std::convert::TryFrom;
use std::{net::IpAddr, pin::Pin, task::Poll, task::Context};
use anyhow::Result;
use tokio::io::{AsyncWriteExt, AsyncReadExt, ReadBuf};
use bytes::BytesMut;
use async_trait::async_trait;
use futures::future::{self, Future};
use hyper::{server::conn::Http, service::Service, Body, Request, Response};
use log::*;
use ::http::{Method, Uri};
use crate::{
proxy::*,
session::{Session, SocksAddr},
};
struct ProxyService {
uri: String,
const BUFFER_SIZE: usize = 1024;
const EOL: [u8; 2] = [13, 10];
const EOH: [u8; 4] = [13, 10, 13, 10];
fn bad_request() -> io::Error {
io::Error::new(io::ErrorKind::Other, "bad request")
}
impl ProxyService {
pub fn new() -> Self {
ProxyService {
uri: "".to_string(),
fn split_slice_once(s: &[u8], sep: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
s.windows(sep.len()).position(|w| w == sep).map(|loc| (s[..loc].to_vec(), s[loc..].to_vec()))
}
/// Parse destination
impl TryFrom<&Uri> for SocksAddr {
type Error = io::Error;
fn try_from(uri: &Uri) -> Result<Self, Self::Error> {
let (host, port) = (
uri.host().ok_or(bad_request())?,
uri.port_u16().or_else(|| {
match uri.scheme_str() {
Some("http") => Some(80),
Some("https") => Some(443),
_ => None,
}
}).ok_or(bad_request())?
);
let addr = if let Ok(host) = host.parse::<IpAddr>() {
SocksAddr::from((host, port))
} else {
SocksAddr::try_from((host, port))?
};
Ok(addr)
}
}
/// https://www.rfc-editor.org/rfc/rfc7230#section-5.3
enum TargetFormat {
Origin,
Absolute,
Authority,
Asterisk,
}
struct RequestHead {
method: Method,
uri: Uri,
version: String,
headers: Vec<(String, String)>,
target_format: TargetFormat,
}
impl RequestHead {
fn parse_request_line(request_line: &[u8]) -> io::Result<(Method, Uri, String)> {
let mut tokens = str::from_utf8(request_line).unwrap_or("").splitn(3, ' ');
let method = match Method::try_from(tokens.next().unwrap_or("")) {
Ok(v) => v,
Err(_e) => return Err(bad_request()),
};
let uri = match Uri::try_from(tokens.next().unwrap_or("")) {
Ok(v) => v,
Err(_e) => return Err(bad_request()),
};
let version = tokens.next().unwrap_or("HTTP/1.1");
Ok((method, uri, version.to_string()))
}
fn parse_headers(header_lines: &[u8]) -> io::Result<Vec<(String, String)>> {
let mut headers = Vec::new();
let lines = str::from_utf8(header_lines).unwrap_or("").split("\r\n");
for line in lines {
let (name, value) = match line.split_once(':') {
Some((n, v)) => (n.trim(), v.trim()),
None => continue,
};
headers.push((name.to_string(), value.to_string()));
}
Ok(headers)
}
fn set_header(&mut self, name: String, value: String) {
for (i, (n, _v)) in self.headers.iter().enumerate() {
if n.to_lowercase() == name.to_lowercase() {
self.headers[i] = (n.clone(), value);
return;
}
}
self.headers.push((name, value));
}
}
impl Into<Vec<u8>> for RequestHead {
fn into(self) -> Vec<u8> {
let mut head = Vec::new();
let request_line = format!("{} {} {}\r\n", self.method, self.uri, self.version);
head.append(&mut request_line.into_bytes());
for (name, value) in self.headers {
let header = format!("{}: {}\r\n", name, value);
head.append(&mut header.into_bytes());
}
head.extend_from_slice("\r\n".as_bytes());
head
}
}
impl TryFrom<Vec<u8>> for RequestHead {
type Error = io::Error;
fn try_from(head: Vec<u8>) -> Result<Self, Self::Error> {
let (request_line, header) = split_slice_once(&head, &EOL)
.unwrap_or((head, Vec::new()));
let (method, uri, version) = RequestHead::parse_request_line(&request_line)?;
let headers = RequestHead::parse_headers(&header)?;
let target_format = if uri.to_string() == "*" {
TargetFormat::Asterisk
} else if uri.scheme().is_some() {
TargetFormat::Absolute
} else if method == Method::CONNECT {
TargetFormat::Authority
} else {
TargetFormat::Origin
};
Ok(RequestHead {
method,
uri,
version,
headers,
target_format,
})
}
}
struct HttpStream {
cache: Vec<u8>,
destination: Option<SocksAddr>,
origin: AnyStream,
}
impl HttpStream {
async fn sniff(&mut self) -> io::Result<()> {
let (head_buf, mut rest_buf) = self.drain(&EOH).await?;
let mut head = RequestHead::try_from(head_buf)?;
let addr = SocksAddr::try_from(&head.uri)?;
self.destination = Some(addr.clone());
match head.target_format {
TargetFormat::Absolute => {
let path_and_query = head.uri.path_and_query().map(|paq| paq.as_str()).unwrap_or("/");
head.uri = path_and_query.parse().unwrap();
head.set_header("host".to_string(), addr.to_string());
self.cache.clear();
self.cache.append(&mut head.into());
self.cache.append(&mut rest_buf);
return Ok(());
},
TargetFormat::Authority => {
self.origin.write_all(b"HTTP/1.1 200 Connection established\r\n\r\n").await?;
return Ok(());
},
_ => return Err(bad_request()),
}
}
pub fn get_uri(&self) -> &String {
&self.uri
async fn drain(&mut self, stop_sign: &[u8]) -> io::Result<(Vec<u8>, Vec<u8>)> {
let mut data = Vec::new();
let mut buf = BytesMut::with_capacity(BUFFER_SIZE);
loop {
buf.clear();
let n = self.origin.read_buf(&mut buf).await?;
data.extend_from_slice(&buf[..n]);
match split_slice_once(&data, stop_sign) {
Some(v) => return Ok(v),
None => continue,
}
}
}
}
#[allow(clippy::type_complexity)]
impl Service<Request<Body>> for ProxyService {
type Error = Box<dyn std::error::Error + Send + Sync>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
type Response = Response<Body>;
impl AsyncRead for HttpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if !self.cache.is_empty() {
let n = cmp::min(buf.capacity(), self.cache.len());
let cached_data = self.cache.drain(..n);
buf.put_slice(cached_data.as_slice());
return Poll::Ready(Ok(()));
}
Pin::new(&mut self.origin).poll_read(cx, buf)
}
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
self.uri = req.uri().to_string();
Box::pin(future::ready(Ok(Response::builder()
.status(200)
.body(hyper::Body::empty())
.unwrap())))
impl AsyncWrite for HttpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.origin).poll_write(cx, buf)
}
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.origin).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.origin).poll_shutdown(cx)
}
}
@@ -58,47 +233,17 @@ impl InboundStreamHandler for Handler {
async fn handle<'a>(
&'a self,
mut sess: Session,
stream: Box<dyn ProxyStream>,
stream: AnyStream,
) -> std::io::Result<AnyInboundTransport> {
let http = Http::new();
let proxy_service = ProxyService::new();
let conn = http
.serve_connection(stream, proxy_service)
.without_shutdown();
let parts = match conn.await {
Ok(v) => v,
Err(err) => {
debug!("accept conn failed: {}", err);
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
}
let mut http_stream = HttpStream {
cache: Vec::new(),
destination: None,
origin: stream,
};
http_stream.sniff().await?;
let uri = parts.service.get_uri();
let host_port: Vec<&str> = uri.split(':').collect();
if host_port.len() != 2 {
debug!("invalid target {:?}", uri);
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
}
sess.destination = http_stream.destination.clone().ok_or(bad_request())?;
let destination = if let Ok(port) = host_port[1].parse::<u16>() {
if let Ok(ip) = host_port[0].parse::<IpAddr>() {
SocksAddr::from((ip, port))
} else {
match SocksAddr::try_from((host_port[0], port)) {
Ok(v) => v,
Err(err) => {
debug!("invalid target {:?}: {}", uri, err);
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
}
}
}
} else {
debug!("invalid target {:?}", uri);
return Err(io::Error::new(io::ErrorKind::Other, "unspecified"));
};
sess.destination = destination;
Ok(InboundTransport::Stream(Box::new(parts.io), sess))
Ok(InboundTransport::Stream(Box::new(http_stream), sess))
}
}