3 Commits

Author SHA1 Message Date
eric
162ccef1e4 inbound/quic: make an option for QUIC inbound accept channel size
Some checks failed
ci / test (macos-latest) (push) Has been cancelled
ci / test (ubuntu-latest) (push) Has been cancelled
ci / build-bin-cross (aarch64-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (arm-unknown-linux-musleabi) (push) Has been cancelled
ci / build-bin-cross (armv7-unknown-linux-musleabihf) (push) Has been cancelled
ci / build-bin-cross (i686-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (mips-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (mipsel-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-cross (x86_64-pc-windows-gnu) (push) Has been cancelled
ci / build-bin-cross (x86_64-unknown-linux-musl) (push) Has been cancelled
ci / build-bin-local (macos-latest, x86_64-apple-darwin) (push) Has been cancelled
ci / build-apple (push) Has been cancelled
ci / build-android (push) Has been cancelled
releases / build-bin-cross (aarch64-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-cross (mips-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-cross (x86_64-pc-windows-gnu) (push) Has been cancelled
releases / build-bin-cross (x86_64-unknown-linux-musl) (push) Has been cancelled
releases / build-bin-local (macos-latest, x86_64-apple-darwin) (push) Has been cancelled
releases / build-apple (push) Has been cancelled
releases / build-android (push) Has been cancelled
releases / create-release (push) Has been cancelled
releases / release-bin (aarch64-unknown-linux-musl) (push) Has been cancelled
releases / release-bin (mips-unknown-linux-musl) (push) Has been cancelled
releases / release-bin (x86_64-apple-darwin) (push) Has been cancelled
releases / release-bin (x86_64-pc-windows-gnu) (push) Has been cancelled
releases / release-bin (x86_64-unknown-linux-musl) (push) Has been cancelled
releases / release-mobile-libs (push) Has been cancelled
2023-09-25 17:53:09 +08:00
eric
c54a2843eb outbound/amux: new streams on existing connections randomly 2023-09-24 20:46:37 +08:00
eric
8517bb33fc outbound/amux: new parameters max-recv-bytes and max-lifetime to control connection behavior 2023-09-24 04:05:27 +08:00
8 changed files with 168 additions and 18 deletions

View File

@@ -504,6 +504,8 @@ impl OutboundManager {
actors.clone(),
settings.max_accepts as usize,
settings.concurrency as usize,
settings.max_recv_bytes as usize,
settings.max_lifetime,
dns_client.clone(),
);
let handler = HandlerBuilder::default()

View File

@@ -77,6 +77,8 @@ pub struct Proxy {
pub amux: Option<bool>,
pub amux_max: Option<i32>,
pub amux_con: Option<i32>,
pub amux_max_recv: Option<u64>,
pub amux_max_lifetime: Option<u64>,
pub quic: Option<bool>,
}
@@ -106,6 +108,8 @@ impl Default for Proxy {
amux: Some(false),
amux_max: Some(8),
amux_con: Some(2),
amux_max_recv: Some(0),
amux_max_lifetime: Some(0),
quic: Some(false),
}
}
@@ -430,6 +434,22 @@ pub fn from_lines(lines: Vec<io::Result<String>>) -> Result<Config> {
};
proxy.amux_con = i;
}
"amux-max-recv" => {
let i = if let Ok(i) = v.parse::<u64>() {
Some(i)
} else {
None
};
proxy.amux_max_recv = i;
}
"amux-max-lifetime" => {
let i = if let Ok(i) = v.parse::<u64>() {
Some(i)
} else {
None
};
proxy.amux_max_lifetime = i;
}
"quic" => proxy.quic = if v == "true" { Some(true) } else { Some(false) },
"interface" => {
proxy.interface = v.to_string();
@@ -978,6 +998,8 @@ pub fn to_internal(conf: &mut Config) -> Result<internal::Config> {
if let Some(ext_concurrency) = &ext_proxy.amux_con {
amux_settings.concurrency = *ext_concurrency as u32;
}
amux_settings.max_recv_bytes = ext_proxy.amux_max_recv.unwrap_or_default();
amux_settings.max_lifetime = ext_proxy.amux_max_lifetime.unwrap_or_default();
let amux_settings = amux_settings.write_to_bytes().unwrap();
amux_outbound.settings = amux_settings;
amux_outbound.protocol = "amux".to_string();

View File

@@ -148,6 +148,8 @@ message AMuxOutboundSettings {
repeated string actors = 3;
uint32 max_accepts = 4;
uint32 concurrency = 5;
uint64 max_recv_bytes = 6;
uint64 max_lifetime = 7;
}
message QuicOutboundSettings {

View File

@@ -2655,6 +2655,10 @@ pub struct AMuxOutboundSettings {
pub max_accepts: u32,
// @@protoc_insertion_point(field:AMuxOutboundSettings.concurrency)
pub concurrency: u32,
// @@protoc_insertion_point(field:AMuxOutboundSettings.max_recv_bytes)
pub max_recv_bytes: u64,
// @@protoc_insertion_point(field:AMuxOutboundSettings.max_lifetime)
pub max_lifetime: u64,
// special fields
// @@protoc_insertion_point(special_field:AMuxOutboundSettings.special_fields)
pub special_fields: ::protobuf::SpecialFields,
@@ -2697,6 +2701,12 @@ impl ::protobuf::Message for AMuxOutboundSettings {
40 => {
self.concurrency = is.read_uint32()?;
},
48 => {
self.max_recv_bytes = is.read_uint64()?;
},
56 => {
self.max_lifetime = is.read_uint64()?;
},
tag => {
::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?;
},
@@ -2724,6 +2734,12 @@ impl ::protobuf::Message for AMuxOutboundSettings {
if self.concurrency != 0 {
my_size += ::protobuf::rt::uint32_size(5, self.concurrency);
}
if self.max_recv_bytes != 0 {
my_size += ::protobuf::rt::uint64_size(6, self.max_recv_bytes);
}
if self.max_lifetime != 0 {
my_size += ::protobuf::rt::uint64_size(7, self.max_lifetime);
}
my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields());
self.special_fields.cached_size().set(my_size as u32);
my_size
@@ -2745,6 +2761,12 @@ impl ::protobuf::Message for AMuxOutboundSettings {
if self.concurrency != 0 {
os.write_uint32(5, self.concurrency)?;
}
if self.max_recv_bytes != 0 {
os.write_uint64(6, self.max_recv_bytes)?;
}
if self.max_lifetime != 0 {
os.write_uint64(7, self.max_lifetime)?;
}
os.write_unknown_fields(self.special_fields.unknown_fields())?;
::std::result::Result::Ok(())
}
@@ -2767,6 +2789,8 @@ impl ::protobuf::Message for AMuxOutboundSettings {
self.actors.clear();
self.max_accepts = 0;
self.concurrency = 0;
self.max_recv_bytes = 0;
self.max_lifetime = 0;
self.special_fields.clear();
}
@@ -2777,6 +2801,8 @@ impl ::protobuf::Message for AMuxOutboundSettings {
actors: ::std::vec::Vec::new(),
max_accepts: 0,
concurrency: 0,
max_recv_bytes: 0,
max_lifetime: 0,
special_fields: ::protobuf::SpecialFields::new(),
};
&instance

View File

@@ -132,6 +132,10 @@ lazy_static! {
get_env_var_or("UDP_DOWNLINK_CHANNEL_SIZE", 256)
};
pub static ref QUIC_ACCEPT_CHANNEL_SIZE: usize = {
get_env_var_or("QUIC_ACCEPT_CHANNEL_SIZE", 1024)
};
/// Buffer size for UDP datagrams receiving/sending, in KB.
pub static ref DATAGRAM_BUFFER_SIZE: usize = {
get_env_var_or("DATAGRAM_BUFFER_SIZE", 2)

View File

@@ -1,7 +1,7 @@
use std::cmp::min;
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{io, pin::Pin};
@@ -19,11 +19,11 @@ use futures::{
task::{Context, Poll},
Future, TryFutureExt,
};
use log::trace;
use log::{debug, trace};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio::time::{sleep, Instant};
#[cfg(feature = "inbound-amux")]
pub mod inbound;
@@ -404,6 +404,7 @@ impl MuxSession {
mut frame_stream: SplitStream<MuxConnection<S>>,
recv_end: Option<Arc<Mutex<bool>>>,
mut accept: Option<Accept>,
recv_bytes_counter: Option<Arc<AtomicUsize>>,
) -> AbortHandle
where
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
@@ -440,6 +441,9 @@ impl MuxSession {
if let Some(stream_read_tx) =
streams.lock().await.get(&stream_id).cloned()
{
if let Some(c) = recv_bytes_counter.as_ref() {
c.fetch_add(data.len(), Ordering::Relaxed);
}
// FIXME error
let _ = stream_read_tx.send(data).await;
}
@@ -513,7 +517,13 @@ impl MuxSession {
handle
}
pub fn connector<S>(conn: S, max_accepts: usize, concurrency: usize) -> MuxConnector
pub fn connector<S>(
conn: S,
max_accepts: usize,
concurrency: usize,
max_recv_bytes: usize,
max_lifetime: u64,
) -> MuxConnector
where
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
{
@@ -521,11 +531,13 @@ impl MuxSession {
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
let (recv_end, send_end) = (Arc::new(Mutex::new(false)), Arc::new(Mutex::new(false)));
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
let recv_bytes_counter = Arc::new(AtomicUsize::new(0));
let recv_handle = Self::run_frame_receive_loop(
streams.clone(),
frame_stream,
Some(recv_end.clone()),
None,
Some(recv_bytes_counter.clone()),
);
let send_handle = Self::run_frame_send_loop(
streams.clone(),
@@ -534,9 +546,14 @@ impl MuxSession {
Some(send_end.clone()),
);
let session_id = random_u16();
let started_at = Instant::now();
MuxConnector::new(
max_accepts,
concurrency,
max_recv_bytes,
recv_bytes_counter,
max_lifetime,
started_at,
session_id,
streams,
frame_write_tx,
@@ -565,6 +582,7 @@ impl MuxSession {
stream_accept_tx,
frame_write_tx,
}),
None,
);
let send_handle = Self::run_frame_send_loop(streams, frame_sink, frame_write_rx, None);
MuxAcceptor::new(session_id, stream_accept_rx, recv_handle, send_handle)
@@ -576,6 +594,16 @@ pub struct MuxConnector {
max_accepts: usize,
// Stream concurrency.
concurrency: usize,
// New streams will not be created on the connection if total received
// bytes of the connection exceeds this value.
max_recv_bytes: usize,
// A counter to count currently received bytes on the connection.
recv_bytes_counter: Arc<AtomicUsize>,
// New streams will not be created on the connection if the lifetime of
// the connection exceeds this value, in seconds.
max_lifetime: u64,
// The time the connection is started.
started_at: Instant,
// ID for debugging purposes.
session_id: SessionId,
// Counter for number of streams created.
@@ -604,6 +632,10 @@ impl MuxConnector {
pub fn new(
max_accepts: usize,
concurrency: usize,
max_recv_bytes: usize,
recv_bytes_counter: Arc<AtomicUsize>,
max_lifetime: u64,
started_at: Instant,
session_id: SessionId,
streams: Streams,
frame_write_tx: Sender<MuxFrame>,
@@ -621,6 +653,10 @@ impl MuxConnector {
MuxConnector {
max_accepts,
concurrency,
max_recv_bytes,
recv_bytes_counter,
max_lifetime,
started_at,
session_id,
total_accepted: 0,
streams,
@@ -642,15 +678,21 @@ impl MuxConnector {
if self.done.load(Ordering::SeqCst) {
return true;
} else {
if self.total_accepted >= self.max_accepts {
if self.total_accepted >= self.max_accepts
|| (self.max_recv_bytes > 0
&& self.recv_bytes_counter.load(Ordering::Relaxed) >= self.max_recv_bytes)
|| (self.max_lifetime > 0
&& Instant::now().duration_since(self.started_at).as_secs()
>= self.max_lifetime)
{
for end in self.stream_ends.iter() {
if !end.load(Ordering::Relaxed) {
return false;
}
}
return true;
true
} else {
return false;
false
}
}
}
@@ -667,13 +709,39 @@ impl MuxConnector {
self.done.store(true, Ordering::Relaxed);
return None;
}
if self.max_recv_bytes > 0
&& self.recv_bytes_counter.load(Ordering::Relaxed) >= self.max_recv_bytes
{
debug!(
"exceeding allowed received bytes ({}): {}",
self.session_id, self.max_recv_bytes
);
return None;
}
if self.max_lifetime > 0
&& Instant::now().duration_since(self.started_at).as_secs() >= self.max_lifetime
{
debug!(
"exceeding allowed lifetime ({}): {}s",
self.session_id, self.max_lifetime
);
return None;
}
if self.total_accepted >= self.max_accepts {
if self.streams.lock().await.is_empty() {
self.done.store(true, Ordering::Relaxed);
}
debug!(
"exceeding allowed accpets ({}): {}",
self.session_id, self.max_accepts
);
return None;
}
if self.streams.lock().await.len() >= self.concurrency {
debug!(
"exceeding allowed concurrency ({}): {}",
self.session_id, self.concurrency
);
return None;
}
let frame_write_tx = self.frame_write_tx.clone();

View File

@@ -6,6 +6,9 @@ use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::future::{abortable, AbortHandle};
use futures::FutureExt;
use rand::prelude::SliceRandom;
use rand::rngs::StdRng;
use rand::SeedableRng;
use tokio::sync::Mutex;
use crate::{
@@ -24,6 +27,8 @@ pub struct MuxManager {
pub actors: Vec<AnyOutboundHandler>,
pub max_accepts: usize,
pub concurrency: usize,
pub max_recv_bytes: usize,
pub max_lifetime: u64,
pub dns_client: SyncDnsClient,
// TODO Verify whether the run loops in connectors are aborted after
// a config reload.
@@ -38,6 +43,8 @@ impl MuxManager {
actors: Vec<AnyOutboundHandler>,
max_accepts: usize,
concurrency: usize,
max_recv_bytes: usize,
max_lifetime: u64,
dns_client: SyncDnsClient,
) -> (Self, Vec<AbortHandle>) {
let mut abort_handles = Vec::new();
@@ -48,8 +55,7 @@ impl MuxManager {
let fut = async move {
loop {
connectors2.lock().await.retain(|c| !c.is_done());
log::trace!("active connectors {}", connectors2.lock().await.len());
tokio::time::sleep(Duration::from_secs(20)).await;
tokio::time::sleep(Duration::from_secs(5)).await;
}
};
let (abortable, abort_handle) = abortable(fut);
@@ -62,6 +68,8 @@ impl MuxManager {
actors,
max_accepts,
concurrency,
max_recv_bytes,
max_lifetime,
dns_client,
connectors,
monitor_task: Mutex::new(Some(monitor_task)),
@@ -80,7 +88,9 @@ impl MuxManager {
if !sess.new_conn_once {
// Try to create the stream from existing connections.
for c in self.connectors.lock().await.iter_mut() {
let mut conns = self.connectors.lock().await;
conns.shuffle(&mut StdRng::from_entropy());
for c in conns.iter_mut() {
if let Some(s) = c.new_stream().await {
return Ok(s);
}
@@ -104,17 +114,24 @@ impl MuxManager {
// Create the stream over this new connection.
let mut connector = {
if sess.new_conn_once {
MuxSession::connector(conn, 1, 1)
MuxSession::connector(conn, 1, 1, 0, 0)
} else {
MuxSession::connector(conn, self.max_accepts, self.concurrency)
MuxSession::connector(
conn,
self.max_accepts,
self.concurrency,
self.max_recv_bytes,
self.max_lifetime,
)
}
};
let s = match connector.new_stream().await {
Some(s) => s,
None => return Err(io::Error::new(io::ErrorKind::Other, "new stream failed")),
};
self.connectors.lock().await.push(connector);
let mut conns = self.connectors.lock().await;
conns.push(connector);
log::debug!("created new amux conn, total: {}", conns.len());
Ok(s)
}
}
@@ -132,10 +149,20 @@ impl Handler {
actors: Vec<AnyOutboundHandler>,
max_accepts: usize,
concurrency: usize,
max_recv_bytes: usize,
max_lifetime: u64,
dns_client: SyncDnsClient,
) -> (Self, Vec<AbortHandle>) {
let (manager, abort_handles) =
MuxManager::new(address, port, actors, max_accepts, concurrency, dns_client);
let (manager, abort_handles) = MuxManager::new(
address,
port,
actors,
max_accepts,
concurrency,
max_recv_bytes,
max_lifetime,
dns_client,
);
(Handler { manager }, abort_handles)
}
}

View File

@@ -135,8 +135,7 @@ async fn handle_conn(
#[async_trait]
impl InboundDatagramHandler for Handler {
async fn handle<'a>(&'a self, socket: AnyInboundDatagram) -> io::Result<AnyInboundTransport> {
// FIXME What is a good size for this?
let (stream_tx, stream_rx) = channel(1024);
let (stream_tx, stream_rx) = channel(*crate::option::QUIC_ACCEPT_CHANNEL_SIZE);
let endpoint = quinn::Endpoint::new(
quinn::EndpointConfig::default(),
Some(self.server_config.clone()),