Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
162ccef1e4 | ||
|
|
c54a2843eb | ||
|
|
8517bb33fc |
@@ -504,6 +504,8 @@ impl OutboundManager {
|
||||
actors.clone(),
|
||||
settings.max_accepts as usize,
|
||||
settings.concurrency as usize,
|
||||
settings.max_recv_bytes as usize,
|
||||
settings.max_lifetime,
|
||||
dns_client.clone(),
|
||||
);
|
||||
let handler = HandlerBuilder::default()
|
||||
|
||||
@@ -77,6 +77,8 @@ pub struct Proxy {
|
||||
pub amux: Option<bool>,
|
||||
pub amux_max: Option<i32>,
|
||||
pub amux_con: Option<i32>,
|
||||
pub amux_max_recv: Option<u64>,
|
||||
pub amux_max_lifetime: Option<u64>,
|
||||
|
||||
pub quic: Option<bool>,
|
||||
}
|
||||
@@ -106,6 +108,8 @@ impl Default for Proxy {
|
||||
amux: Some(false),
|
||||
amux_max: Some(8),
|
||||
amux_con: Some(2),
|
||||
amux_max_recv: Some(0),
|
||||
amux_max_lifetime: Some(0),
|
||||
quic: Some(false),
|
||||
}
|
||||
}
|
||||
@@ -430,6 +434,22 @@ pub fn from_lines(lines: Vec<io::Result<String>>) -> Result<Config> {
|
||||
};
|
||||
proxy.amux_con = i;
|
||||
}
|
||||
"amux-max-recv" => {
|
||||
let i = if let Ok(i) = v.parse::<u64>() {
|
||||
Some(i)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
proxy.amux_max_recv = i;
|
||||
}
|
||||
"amux-max-lifetime" => {
|
||||
let i = if let Ok(i) = v.parse::<u64>() {
|
||||
Some(i)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
proxy.amux_max_lifetime = i;
|
||||
}
|
||||
"quic" => proxy.quic = if v == "true" { Some(true) } else { Some(false) },
|
||||
"interface" => {
|
||||
proxy.interface = v.to_string();
|
||||
@@ -978,6 +998,8 @@ pub fn to_internal(conf: &mut Config) -> Result<internal::Config> {
|
||||
if let Some(ext_concurrency) = &ext_proxy.amux_con {
|
||||
amux_settings.concurrency = *ext_concurrency as u32;
|
||||
}
|
||||
amux_settings.max_recv_bytes = ext_proxy.amux_max_recv.unwrap_or_default();
|
||||
amux_settings.max_lifetime = ext_proxy.amux_max_lifetime.unwrap_or_default();
|
||||
let amux_settings = amux_settings.write_to_bytes().unwrap();
|
||||
amux_outbound.settings = amux_settings;
|
||||
amux_outbound.protocol = "amux".to_string();
|
||||
|
||||
@@ -148,6 +148,8 @@ message AMuxOutboundSettings {
|
||||
repeated string actors = 3;
|
||||
uint32 max_accepts = 4;
|
||||
uint32 concurrency = 5;
|
||||
uint64 max_recv_bytes = 6;
|
||||
uint64 max_lifetime = 7;
|
||||
}
|
||||
|
||||
message QuicOutboundSettings {
|
||||
|
||||
@@ -2655,6 +2655,10 @@ pub struct AMuxOutboundSettings {
|
||||
pub max_accepts: u32,
|
||||
// @@protoc_insertion_point(field:AMuxOutboundSettings.concurrency)
|
||||
pub concurrency: u32,
|
||||
// @@protoc_insertion_point(field:AMuxOutboundSettings.max_recv_bytes)
|
||||
pub max_recv_bytes: u64,
|
||||
// @@protoc_insertion_point(field:AMuxOutboundSettings.max_lifetime)
|
||||
pub max_lifetime: u64,
|
||||
// special fields
|
||||
// @@protoc_insertion_point(special_field:AMuxOutboundSettings.special_fields)
|
||||
pub special_fields: ::protobuf::SpecialFields,
|
||||
@@ -2697,6 +2701,12 @@ impl ::protobuf::Message for AMuxOutboundSettings {
|
||||
40 => {
|
||||
self.concurrency = is.read_uint32()?;
|
||||
},
|
||||
48 => {
|
||||
self.max_recv_bytes = is.read_uint64()?;
|
||||
},
|
||||
56 => {
|
||||
self.max_lifetime = is.read_uint64()?;
|
||||
},
|
||||
tag => {
|
||||
::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?;
|
||||
},
|
||||
@@ -2724,6 +2734,12 @@ impl ::protobuf::Message for AMuxOutboundSettings {
|
||||
if self.concurrency != 0 {
|
||||
my_size += ::protobuf::rt::uint32_size(5, self.concurrency);
|
||||
}
|
||||
if self.max_recv_bytes != 0 {
|
||||
my_size += ::protobuf::rt::uint64_size(6, self.max_recv_bytes);
|
||||
}
|
||||
if self.max_lifetime != 0 {
|
||||
my_size += ::protobuf::rt::uint64_size(7, self.max_lifetime);
|
||||
}
|
||||
my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields());
|
||||
self.special_fields.cached_size().set(my_size as u32);
|
||||
my_size
|
||||
@@ -2745,6 +2761,12 @@ impl ::protobuf::Message for AMuxOutboundSettings {
|
||||
if self.concurrency != 0 {
|
||||
os.write_uint32(5, self.concurrency)?;
|
||||
}
|
||||
if self.max_recv_bytes != 0 {
|
||||
os.write_uint64(6, self.max_recv_bytes)?;
|
||||
}
|
||||
if self.max_lifetime != 0 {
|
||||
os.write_uint64(7, self.max_lifetime)?;
|
||||
}
|
||||
os.write_unknown_fields(self.special_fields.unknown_fields())?;
|
||||
::std::result::Result::Ok(())
|
||||
}
|
||||
@@ -2767,6 +2789,8 @@ impl ::protobuf::Message for AMuxOutboundSettings {
|
||||
self.actors.clear();
|
||||
self.max_accepts = 0;
|
||||
self.concurrency = 0;
|
||||
self.max_recv_bytes = 0;
|
||||
self.max_lifetime = 0;
|
||||
self.special_fields.clear();
|
||||
}
|
||||
|
||||
@@ -2777,6 +2801,8 @@ impl ::protobuf::Message for AMuxOutboundSettings {
|
||||
actors: ::std::vec::Vec::new(),
|
||||
max_accepts: 0,
|
||||
concurrency: 0,
|
||||
max_recv_bytes: 0,
|
||||
max_lifetime: 0,
|
||||
special_fields: ::protobuf::SpecialFields::new(),
|
||||
};
|
||||
&instance
|
||||
|
||||
@@ -132,6 +132,10 @@ lazy_static! {
|
||||
get_env_var_or("UDP_DOWNLINK_CHANNEL_SIZE", 256)
|
||||
};
|
||||
|
||||
pub static ref QUIC_ACCEPT_CHANNEL_SIZE: usize = {
|
||||
get_env_var_or("QUIC_ACCEPT_CHANNEL_SIZE", 1024)
|
||||
};
|
||||
|
||||
/// Buffer size for UDP datagrams receiving/sending, in KB.
|
||||
pub static ref DATAGRAM_BUFFER_SIZE: usize = {
|
||||
get_env_var_or("DATAGRAM_BUFFER_SIZE", 2)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{io, pin::Pin};
|
||||
@@ -19,11 +19,11 @@ use futures::{
|
||||
task::{Context, Poll},
|
||||
Future, TryFutureExt,
|
||||
};
|
||||
use log::trace;
|
||||
use log::{debug, trace};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::{sleep, Instant};
|
||||
|
||||
#[cfg(feature = "inbound-amux")]
|
||||
pub mod inbound;
|
||||
@@ -404,6 +404,7 @@ impl MuxSession {
|
||||
mut frame_stream: SplitStream<MuxConnection<S>>,
|
||||
recv_end: Option<Arc<Mutex<bool>>>,
|
||||
mut accept: Option<Accept>,
|
||||
recv_bytes_counter: Option<Arc<AtomicUsize>>,
|
||||
) -> AbortHandle
|
||||
where
|
||||
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
|
||||
@@ -440,6 +441,9 @@ impl MuxSession {
|
||||
if let Some(stream_read_tx) =
|
||||
streams.lock().await.get(&stream_id).cloned()
|
||||
{
|
||||
if let Some(c) = recv_bytes_counter.as_ref() {
|
||||
c.fetch_add(data.len(), Ordering::Relaxed);
|
||||
}
|
||||
// FIXME error
|
||||
let _ = stream_read_tx.send(data).await;
|
||||
}
|
||||
@@ -513,7 +517,13 @@ impl MuxSession {
|
||||
handle
|
||||
}
|
||||
|
||||
pub fn connector<S>(conn: S, max_accepts: usize, concurrency: usize) -> MuxConnector
|
||||
pub fn connector<S>(
|
||||
conn: S,
|
||||
max_accepts: usize,
|
||||
concurrency: usize,
|
||||
max_recv_bytes: usize,
|
||||
max_lifetime: u64,
|
||||
) -> MuxConnector
|
||||
where
|
||||
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
|
||||
{
|
||||
@@ -521,11 +531,13 @@ impl MuxSession {
|
||||
let (frame_write_tx, frame_write_rx) = mpsc::channel::<MuxFrame>(1);
|
||||
let (recv_end, send_end) = (Arc::new(Mutex::new(false)), Arc::new(Mutex::new(false)));
|
||||
let streams: Streams = Arc::new(Mutex::new(HashMap::new()));
|
||||
let recv_bytes_counter = Arc::new(AtomicUsize::new(0));
|
||||
let recv_handle = Self::run_frame_receive_loop(
|
||||
streams.clone(),
|
||||
frame_stream,
|
||||
Some(recv_end.clone()),
|
||||
None,
|
||||
Some(recv_bytes_counter.clone()),
|
||||
);
|
||||
let send_handle = Self::run_frame_send_loop(
|
||||
streams.clone(),
|
||||
@@ -534,9 +546,14 @@ impl MuxSession {
|
||||
Some(send_end.clone()),
|
||||
);
|
||||
let session_id = random_u16();
|
||||
let started_at = Instant::now();
|
||||
MuxConnector::new(
|
||||
max_accepts,
|
||||
concurrency,
|
||||
max_recv_bytes,
|
||||
recv_bytes_counter,
|
||||
max_lifetime,
|
||||
started_at,
|
||||
session_id,
|
||||
streams,
|
||||
frame_write_tx,
|
||||
@@ -565,6 +582,7 @@ impl MuxSession {
|
||||
stream_accept_tx,
|
||||
frame_write_tx,
|
||||
}),
|
||||
None,
|
||||
);
|
||||
let send_handle = Self::run_frame_send_loop(streams, frame_sink, frame_write_rx, None);
|
||||
MuxAcceptor::new(session_id, stream_accept_rx, recv_handle, send_handle)
|
||||
@@ -576,6 +594,16 @@ pub struct MuxConnector {
|
||||
max_accepts: usize,
|
||||
// Stream concurrency.
|
||||
concurrency: usize,
|
||||
// New streams will not be created on the connection if total received
|
||||
// bytes of the connection exceeds this value.
|
||||
max_recv_bytes: usize,
|
||||
// A counter to count currently received bytes on the connection.
|
||||
recv_bytes_counter: Arc<AtomicUsize>,
|
||||
// New streams will not be created on the connection if the lifetime of
|
||||
// the connection exceeds this value, in seconds.
|
||||
max_lifetime: u64,
|
||||
// The time the connection is started.
|
||||
started_at: Instant,
|
||||
// ID for debugging purposes.
|
||||
session_id: SessionId,
|
||||
// Counter for number of streams created.
|
||||
@@ -604,6 +632,10 @@ impl MuxConnector {
|
||||
pub fn new(
|
||||
max_accepts: usize,
|
||||
concurrency: usize,
|
||||
max_recv_bytes: usize,
|
||||
recv_bytes_counter: Arc<AtomicUsize>,
|
||||
max_lifetime: u64,
|
||||
started_at: Instant,
|
||||
session_id: SessionId,
|
||||
streams: Streams,
|
||||
frame_write_tx: Sender<MuxFrame>,
|
||||
@@ -621,6 +653,10 @@ impl MuxConnector {
|
||||
MuxConnector {
|
||||
max_accepts,
|
||||
concurrency,
|
||||
max_recv_bytes,
|
||||
recv_bytes_counter,
|
||||
max_lifetime,
|
||||
started_at,
|
||||
session_id,
|
||||
total_accepted: 0,
|
||||
streams,
|
||||
@@ -642,15 +678,21 @@ impl MuxConnector {
|
||||
if self.done.load(Ordering::SeqCst) {
|
||||
return true;
|
||||
} else {
|
||||
if self.total_accepted >= self.max_accepts {
|
||||
if self.total_accepted >= self.max_accepts
|
||||
|| (self.max_recv_bytes > 0
|
||||
&& self.recv_bytes_counter.load(Ordering::Relaxed) >= self.max_recv_bytes)
|
||||
|| (self.max_lifetime > 0
|
||||
&& Instant::now().duration_since(self.started_at).as_secs()
|
||||
>= self.max_lifetime)
|
||||
{
|
||||
for end in self.stream_ends.iter() {
|
||||
if !end.load(Ordering::Relaxed) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
true
|
||||
} else {
|
||||
return false;
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -667,13 +709,39 @@ impl MuxConnector {
|
||||
self.done.store(true, Ordering::Relaxed);
|
||||
return None;
|
||||
}
|
||||
if self.max_recv_bytes > 0
|
||||
&& self.recv_bytes_counter.load(Ordering::Relaxed) >= self.max_recv_bytes
|
||||
{
|
||||
debug!(
|
||||
"exceeding allowed received bytes ({}): {}",
|
||||
self.session_id, self.max_recv_bytes
|
||||
);
|
||||
return None;
|
||||
}
|
||||
if self.max_lifetime > 0
|
||||
&& Instant::now().duration_since(self.started_at).as_secs() >= self.max_lifetime
|
||||
{
|
||||
debug!(
|
||||
"exceeding allowed lifetime ({}): {}s",
|
||||
self.session_id, self.max_lifetime
|
||||
);
|
||||
return None;
|
||||
}
|
||||
if self.total_accepted >= self.max_accepts {
|
||||
if self.streams.lock().await.is_empty() {
|
||||
self.done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
debug!(
|
||||
"exceeding allowed accpets ({}): {}",
|
||||
self.session_id, self.max_accepts
|
||||
);
|
||||
return None;
|
||||
}
|
||||
if self.streams.lock().await.len() >= self.concurrency {
|
||||
debug!(
|
||||
"exceeding allowed concurrency ({}): {}",
|
||||
self.session_id, self.concurrency
|
||||
);
|
||||
return None;
|
||||
}
|
||||
let frame_write_tx = self.frame_write_tx.clone();
|
||||
|
||||
@@ -6,6 +6,9 @@ use async_trait::async_trait;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::future::{abortable, AbortHandle};
|
||||
use futures::FutureExt;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
@@ -24,6 +27,8 @@ pub struct MuxManager {
|
||||
pub actors: Vec<AnyOutboundHandler>,
|
||||
pub max_accepts: usize,
|
||||
pub concurrency: usize,
|
||||
pub max_recv_bytes: usize,
|
||||
pub max_lifetime: u64,
|
||||
pub dns_client: SyncDnsClient,
|
||||
// TODO Verify whether the run loops in connectors are aborted after
|
||||
// a config reload.
|
||||
@@ -38,6 +43,8 @@ impl MuxManager {
|
||||
actors: Vec<AnyOutboundHandler>,
|
||||
max_accepts: usize,
|
||||
concurrency: usize,
|
||||
max_recv_bytes: usize,
|
||||
max_lifetime: u64,
|
||||
dns_client: SyncDnsClient,
|
||||
) -> (Self, Vec<AbortHandle>) {
|
||||
let mut abort_handles = Vec::new();
|
||||
@@ -48,8 +55,7 @@ impl MuxManager {
|
||||
let fut = async move {
|
||||
loop {
|
||||
connectors2.lock().await.retain(|c| !c.is_done());
|
||||
log::trace!("active connectors {}", connectors2.lock().await.len());
|
||||
tokio::time::sleep(Duration::from_secs(20)).await;
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
};
|
||||
let (abortable, abort_handle) = abortable(fut);
|
||||
@@ -62,6 +68,8 @@ impl MuxManager {
|
||||
actors,
|
||||
max_accepts,
|
||||
concurrency,
|
||||
max_recv_bytes,
|
||||
max_lifetime,
|
||||
dns_client,
|
||||
connectors,
|
||||
monitor_task: Mutex::new(Some(monitor_task)),
|
||||
@@ -80,7 +88,9 @@ impl MuxManager {
|
||||
|
||||
if !sess.new_conn_once {
|
||||
// Try to create the stream from existing connections.
|
||||
for c in self.connectors.lock().await.iter_mut() {
|
||||
let mut conns = self.connectors.lock().await;
|
||||
conns.shuffle(&mut StdRng::from_entropy());
|
||||
for c in conns.iter_mut() {
|
||||
if let Some(s) = c.new_stream().await {
|
||||
return Ok(s);
|
||||
}
|
||||
@@ -104,17 +114,24 @@ impl MuxManager {
|
||||
// Create the stream over this new connection.
|
||||
let mut connector = {
|
||||
if sess.new_conn_once {
|
||||
MuxSession::connector(conn, 1, 1)
|
||||
MuxSession::connector(conn, 1, 1, 0, 0)
|
||||
} else {
|
||||
MuxSession::connector(conn, self.max_accepts, self.concurrency)
|
||||
MuxSession::connector(
|
||||
conn,
|
||||
self.max_accepts,
|
||||
self.concurrency,
|
||||
self.max_recv_bytes,
|
||||
self.max_lifetime,
|
||||
)
|
||||
}
|
||||
};
|
||||
let s = match connector.new_stream().await {
|
||||
Some(s) => s,
|
||||
None => return Err(io::Error::new(io::ErrorKind::Other, "new stream failed")),
|
||||
};
|
||||
self.connectors.lock().await.push(connector);
|
||||
|
||||
let mut conns = self.connectors.lock().await;
|
||||
conns.push(connector);
|
||||
log::debug!("created new amux conn, total: {}", conns.len());
|
||||
Ok(s)
|
||||
}
|
||||
}
|
||||
@@ -132,10 +149,20 @@ impl Handler {
|
||||
actors: Vec<AnyOutboundHandler>,
|
||||
max_accepts: usize,
|
||||
concurrency: usize,
|
||||
max_recv_bytes: usize,
|
||||
max_lifetime: u64,
|
||||
dns_client: SyncDnsClient,
|
||||
) -> (Self, Vec<AbortHandle>) {
|
||||
let (manager, abort_handles) =
|
||||
MuxManager::new(address, port, actors, max_accepts, concurrency, dns_client);
|
||||
let (manager, abort_handles) = MuxManager::new(
|
||||
address,
|
||||
port,
|
||||
actors,
|
||||
max_accepts,
|
||||
concurrency,
|
||||
max_recv_bytes,
|
||||
max_lifetime,
|
||||
dns_client,
|
||||
);
|
||||
(Handler { manager }, abort_handles)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,8 +135,7 @@ async fn handle_conn(
|
||||
#[async_trait]
|
||||
impl InboundDatagramHandler for Handler {
|
||||
async fn handle<'a>(&'a self, socket: AnyInboundDatagram) -> io::Result<AnyInboundTransport> {
|
||||
// FIXME What is a good size for this?
|
||||
let (stream_tx, stream_rx) = channel(1024);
|
||||
let (stream_tx, stream_rx) = channel(*crate::option::QUIC_ACCEPT_CHANNEL_SIZE);
|
||||
let endpoint = quinn::Endpoint::new(
|
||||
quinn::EndpointConfig::default(),
|
||||
Some(self.server_config.clone()),
|
||||
|
||||
Reference in New Issue
Block a user