Added statistics manager

This commit is contained in:
eric
2022-04-13 21:19:30 +08:00
parent c88af08409
commit 96294a681b
10 changed files with 345 additions and 220 deletions

View File

@@ -22,7 +22,8 @@ default-ring = [
# quinn supports only rustls as tls backend for now
"inbound-quic",
"outbound-quic",
# "api",
"api",
"stat",
]
default-openssl = [
@@ -30,7 +31,6 @@ default-openssl = [
"all-endpoints",
"openssl-aead",
"openssl-tls",
# "api",
]
# Grouping all features
@@ -109,6 +109,7 @@ inbound-quic = ["quinn", "rustls", "webpki-roots"]
inbound-tls = []
inbound-chain = []
stat = []
api = ["warp"]
auto-reload = ["notify"]
ctrlc = ["tokio/signal"]

View File

@@ -74,6 +74,43 @@ mod handlers {
Ok(StatusCode::ACCEPTED)
}
}
#[cfg(feature = "stat")]
pub async fn stat_html(rm: Arc<RuntimeManager>) -> Result<impl warp::Reply, Infallible> {
let mut body = String::from(
r#"<html>
<head><style>
table, th, td {
border: 1px solid black;
border-collapse: collapse;
text-align: right;
padding: 4;
font-size: small;
}
.highlight {
font-weight: bold;
}
</style></head>
<table style=\"border=4px solid\">
<tr><td>Network</td><td>Destination</td><td>SentBytes</td><td>RecvdBytes</td><td>SendFin</td><td>RecvFin</td></tr>
"#,
);
let sm = rm.stat_manager();
let sm = sm.read().await;
for c in sm.counters.iter() {
body.push_str(&format!(
"<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>",
&c.sess.network,
&c.sess.destination,
c.bytes_sent(),
c.bytes_recvd(),
c.send_completed(),
c.recv_completed(),
));
}
body.push_str("</table></html>");
Ok(warp::reply::html(body))
}
}
mod filters {
@@ -126,6 +163,17 @@ mod filters {
.and(with_runtime_manager(rm))
.and_then(handlers::runtime_shutdown)
}
// POST /api/v1/runtime/stat/html
#[cfg(feature = "stat")]
pub fn stat_html(
rm: Arc<RuntimeManager>,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "v1" / "runtime" / "stat" / "html")
.and(warp::get())
.and(with_runtime_manager(rm))
.and_then(handlers::stat_html)
}
}
pub struct ApiServer {
@@ -142,6 +190,10 @@ impl ApiServer {
.or(filters::select_get(self.runtime_manager.clone()))
.or(filters::runtime_reload(self.runtime_manager.clone()))
.or(filters::runtime_shutdown(self.runtime_manager.clone()));
#[cfg(feature = "stat")]
let routes = routes.or(filters::stat_html(self.runtime_manager.clone()));
log::info!("api server listening tcp {}", &listen_addr);
Box::pin(warp::serve(routes).bind(listen_addr))
}

View File

@@ -15,6 +15,9 @@ use crate::{
session::{Network, Session, SocksAddr},
};
#[cfg(feature = "stat")]
use crate::app::SyncStatManager;
use super::outbound::manager::OutboundManager;
use super::router::Router;
@@ -52,6 +55,8 @@ pub struct Dispatcher {
outbound_manager: Arc<RwLock<OutboundManager>>,
router: Arc<RwLock<Router>>,
dns_client: SyncDnsClient,
#[cfg(feature = "stat")]
stat_manager: SyncStatManager,
}
impl Dispatcher {
@@ -59,11 +64,14 @@ impl Dispatcher {
outbound_manager: Arc<RwLock<OutboundManager>>,
router: Arc<RwLock<Router>>,
dns_client: SyncDnsClient,
#[cfg(feature = "stat")] stat_manager: SyncStatManager,
) -> Self {
Dispatcher {
outbound_manager,
router,
dns_client,
#[cfg(feature = "stat")]
stat_manager,
}
}
@@ -170,11 +178,19 @@ impl Dispatcher {
}
};
match TcpOutboundHandler::handle(h.as_ref(), sess, stream).await {
#[allow(unused_mut)]
Ok(mut rhs) => {
let elapsed = tokio::time::Instant::now().duration_since(handshake_start);
log_request(sess, h.tag(), h.color(), Some(elapsed.as_millis()));
#[cfg(feature = "stat")]
let mut rhs = self
.stat_manager
.write()
.await
.stat_stream(rhs, sess.clone());
match common::io::copy_buf_bidirectional_with_timeout(
&mut lhs,
&mut rhs,
@@ -267,12 +283,19 @@ impl Dispatcher {
let transport =
crate::proxy::connect_udp_outbound(sess, self.dns_client.clone(), &h).await?;
match UdpOutboundHandler::handle(h.as_ref(), sess, transport).await {
Ok(c) => {
Ok(d) => {
let elapsed = tokio::time::Instant::now().duration_since(handshake_start);
log_request(sess, h.tag(), h.color(), Some(elapsed.as_millis()));
Ok(c)
#[cfg(feature = "stat")]
let d = self
.stat_manager
.write()
.await
.stat_outbound_datagram(d, sess.clone());
Ok(d)
}
Err(e) => {
debug!(

View File

@@ -10,6 +10,9 @@ pub mod nat_manager;
pub mod outbound;
pub mod router;
#[cfg(feature = "stat")]
pub mod stat_manager;
#[cfg(feature = "api")]
pub mod api;
@@ -22,3 +25,6 @@ pub mod api;
pub mod fake_dns;
pub type SyncDnsClient = Arc<RwLock<dns_client::DnsClient>>;
#[cfg(feature = "stat")]
pub type SyncStatManager = Arc<RwLock<stat_manager::StatManager>>;

View File

@@ -0,0 +1,235 @@
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::{io, pin::Pin};
use async_trait::async_trait;
use futures::{
ready,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::{proxy::*, session::*};
pub struct Stream {
pub inner: AnyStream,
pub bytes_recvd: Arc<AtomicU64>,
pub bytes_sent: Arc<AtomicU64>,
pub recv_completed: Arc<AtomicBool>,
pub send_completed: Arc<AtomicBool>,
}
impl Drop for Stream {
fn drop(&mut self) {
// In case of abnormal shutdown.
self.recv_completed.store(true, Ordering::Relaxed);
self.send_completed.store(true, Ordering::Relaxed);
}
}
impl AsyncRead for Stream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
ready!(Pin::new(&mut self.inner).poll_read(cx, buf))?;
if buf.filled().is_empty() {
self.recv_completed.store(true, Ordering::Relaxed);
} else {
self.bytes_recvd
.fetch_add(buf.filled().len() as u64, Ordering::Relaxed);
}
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for Stream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let n = ready!(Pin::new(&mut self.inner).poll_write(cx, buf))?;
self.bytes_sent.fetch_add(n as u64, Ordering::Relaxed);
Poll::Ready(Ok(n))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
ready!(Pin::new(&mut self.inner).poll_shutdown(cx))?;
self.send_completed.store(true, Ordering::Relaxed);
Poll::Ready(Ok(()))
}
}
pub struct Datagram {
pub inner: AnyOutboundDatagram,
pub bytes_recvd: Arc<AtomicU64>,
pub bytes_sent: Arc<AtomicU64>,
pub recv_completed: Arc<AtomicBool>,
pub send_completed: Arc<AtomicBool>,
}
impl OutboundDatagram for Datagram {
fn split(
self: Box<Self>,
) -> (
Box<dyn OutboundDatagramRecvHalf>,
Box<dyn OutboundDatagramSendHalf>,
) {
let (r, s) = self.inner.split();
(
Box::new(DatagramRecvHalf(r, self.bytes_recvd, self.recv_completed)),
Box::new(DatagramSendHalf(s, self.bytes_sent, self.send_completed)),
)
}
}
pub struct DatagramRecvHalf(
Box<dyn OutboundDatagramRecvHalf>,
Arc<AtomicU64>,
Arc<AtomicBool>,
);
impl Drop for DatagramRecvHalf {
fn drop(&mut self) {
self.2.store(true, Ordering::Relaxed);
}
}
#[async_trait]
impl OutboundDatagramRecvHalf for DatagramRecvHalf {
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocksAddr)> {
self.0.recv_from(buf).await.map(|(n, a)| {
self.1.fetch_add(n as u64, Ordering::Relaxed);
(n, a)
})
}
}
pub struct DatagramSendHalf(
Box<dyn OutboundDatagramSendHalf>,
Arc<AtomicU64>,
Arc<AtomicBool>,
);
impl Drop for DatagramSendHalf {
fn drop(&mut self) {
self.2.store(true, Ordering::Relaxed);
}
}
#[async_trait]
impl OutboundDatagramSendHalf for DatagramSendHalf {
async fn send_to(&mut self, buf: &[u8], target: &SocksAddr) -> io::Result<usize> {
self.0.send_to(buf, target).await.map(|n| {
self.1.fetch_add(n as u64, Ordering::Relaxed);
n
})
}
}
pub struct Counter {
pub sess: Session,
pub bytes_recvd: Arc<AtomicU64>,
pub bytes_sent: Arc<AtomicU64>,
pub recv_completed: Arc<AtomicBool>,
pub send_completed: Arc<AtomicBool>,
}
impl Counter {
pub fn bytes_recvd(&self) -> u64 {
self.bytes_recvd.load(Ordering::Relaxed)
}
pub fn bytes_sent(&self) -> u64 {
self.bytes_sent.load(Ordering::Relaxed)
}
pub fn recv_completed(&self) -> bool {
self.recv_completed.load(Ordering::Relaxed)
}
pub fn send_completed(&self) -> bool {
self.send_completed.load(Ordering::Relaxed)
}
}
pub struct StatManager {
pub counters: Vec<Counter>,
}
impl StatManager {
pub fn new() -> Self {
Self {
counters: Vec::new(),
}
}
pub fn cleanup_task(sm: super::SyncStatManager) -> crate::Runner {
Box::pin(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
let mut sm = sm.write().await;
let mut i = 0;
while i < sm.counters.len() {
if sm.counters[i].recv_completed() && sm.counters[i].send_completed() {
sm.counters.remove(i);
} else {
i += 1;
}
}
}
})
}
pub fn stat_stream(&mut self, stream: AnyStream, sess: Session) -> AnyStream {
let bytes_recvd = Arc::new(AtomicU64::new(0));
let bytes_sent = Arc::new(AtomicU64::new(0));
let recv_completed = Arc::new(AtomicBool::new(false));
let send_completed = Arc::new(AtomicBool::new(false));
self.counters.push(Counter {
sess,
bytes_recvd: bytes_recvd.clone(),
bytes_sent: bytes_sent.clone(),
recv_completed: recv_completed.clone(),
send_completed: send_completed.clone(),
});
Box::new(Stream {
inner: stream,
bytes_recvd,
bytes_sent,
recv_completed,
send_completed,
})
}
pub fn stat_outbound_datagram(
&mut self,
dgram: AnyOutboundDatagram,
sess: Session,
) -> AnyOutboundDatagram {
let bytes_recvd = Arc::new(AtomicU64::new(0));
let bytes_sent = Arc::new(AtomicU64::new(0));
let recv_completed = Arc::new(AtomicBool::new(false));
let send_completed = Arc::new(AtomicBool::new(false));
self.counters.push(Counter {
sess,
bytes_recvd: bytes_recvd.clone(),
bytes_sent: bytes_sent.clone(),
recv_completed: recv_completed.clone(),
send_completed: send_completed.clone(),
});
Box::new(Datagram {
inner: dgram,
bytes_recvd,
bytes_sent,
recv_completed,
send_completed,
})
}
}

View File

@@ -1201,26 +1201,12 @@ pub fn to_internal(conf: &mut Config) -> Result<internal::Config> {
dns.hosts = hosts;
}
let api = if let Some(ext_general) = &conf.general {
if ext_general.api_interface.is_some() && ext_general.api_port.is_some() {
let mut api_inner = internal::Api::new();
api_inner.address = ext_general.api_interface.as_ref().unwrap().to_string();
api_inner.port = ext_general.api_port.unwrap() as u32;
protobuf::SingularPtrField::some(api_inner)
} else {
protobuf::SingularPtrField::none()
}
} else {
protobuf::SingularPtrField::none()
};
let mut config = internal::Config::new();
config.log = protobuf::SingularPtrField::some(log);
config.inbounds = inbounds;
config.outbounds = outbounds;
config.router = router;
config.dns = protobuf::SingularPtrField::some(dns);
config.api = api;
Ok(config)
}

View File

@@ -1,9 +1,6 @@
syntax = "proto3";
// Every time you make changes to this file, run `make proto-gen` to re-generate protobuf files.
message Api {
string address = 1;
uint32 port = 2;
}
syntax = "proto3";
message Dns {
message Ips {
@@ -214,5 +211,4 @@ message Config {
repeated Outbound outbounds = 3;
Router router = 4;
Dns dns = 5;
Api api = 6;
}

View File

@@ -23,145 +23,6 @@
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_27_1;
#[derive(PartialEq,Clone,Default,Debug)]
pub struct Api {
// message fields
pub address: ::std::string::String,
pub port: u32,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a Api {
fn default() -> &'a Api {
<Api as ::protobuf::Message>::default_instance()
}
}
impl Api {
pub fn new() -> Api {
::std::default::Default::default()
}
// string address = 1;
pub fn get_address(&self) -> &str {
&self.address
}
// uint32 port = 2;
pub fn get_port(&self) -> u32 {
self.port
}
}
impl ::protobuf::Message for Api {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.address)?;
},
2 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_uint32()?;
self.port = tmp;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if !self.address.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.address);
}
if self.port != 0 {
my_size += ::protobuf::rt::value_size(2, self.port, ::protobuf::wire_format::WireTypeVarint);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.address.is_empty() {
os.write_string(1, &self.address)?;
}
if self.port != 0 {
os.write_uint32(2, self.port)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> Api {
Api::new()
}
fn default_instance() -> &'static Api {
static instance: ::protobuf::rt::LazyV2<Api> = ::protobuf::rt::LazyV2::INIT;
instance.get(Api::new)
}
}
impl ::protobuf::Clear for Api {
fn clear(&mut self) {
self.address.clear();
self.port = 0;
self.unknown_fields.clear();
}
}
impl ::protobuf::reflect::ProtobufValue for Api {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
}
#[derive(PartialEq,Clone,Default,Debug)]
pub struct Dns {
// message fields
@@ -5086,7 +4947,6 @@ pub struct Config {
pub outbounds: ::protobuf::RepeatedField<Outbound>,
pub router: ::protobuf::SingularPtrField<Router>,
pub dns: ::protobuf::SingularPtrField<Dns>,
pub api: ::protobuf::SingularPtrField<Api>,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
@@ -5137,13 +4997,6 @@ impl Config {
pub fn get_dns(&self) -> &Dns {
self.dns.as_ref().unwrap_or_else(|| <Dns as ::protobuf::Message>::default_instance())
}
// .Api api = 6;
pub fn get_api(&self) -> &Api {
self.api.as_ref().unwrap_or_else(|| <Api as ::protobuf::Message>::default_instance())
}
}
impl ::protobuf::Message for Config {
@@ -5173,11 +5026,6 @@ impl ::protobuf::Message for Config {
return false;
}
};
for v in &self.api {
if !v.is_initialized() {
return false;
}
};
true
}
@@ -5200,9 +5048,6 @@ impl ::protobuf::Message for Config {
5 => {
::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.dns)?;
},
6 => {
::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.api)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
@@ -5235,10 +5080,6 @@ impl ::protobuf::Message for Config {
let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
}
if let Some(ref v) = self.api.as_ref() {
let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
@@ -5270,11 +5111,6 @@ impl ::protobuf::Message for Config {
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
}
if let Some(ref v) = self.api.as_ref() {
os.write_tag(6, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
@@ -5322,7 +5158,6 @@ impl ::protobuf::Clear for Config {
self.outbounds.clear();
self.router.clear();
self.dns.clear();
self.api.clear();
self.unknown_fields.clear();
}
}

View File

@@ -9,12 +9,6 @@ use serde_json::value::RawValue;
use crate::config::{external_rule, internal};
#[derive(Serialize, Deserialize, Debug)]
pub struct Api {
pub address: Option<String>,
pub port: Option<u16>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Dns {
pub servers: Option<Vec<String>>,
@@ -241,7 +235,6 @@ pub struct Config {
pub outbounds: Option<Vec<Outbound>>,
pub router: Option<Router>,
pub dns: Option<Dns>,
pub api: Option<Api>,
}
pub fn to_internal(json: &mut Config) -> Result<internal::Config> {
@@ -967,28 +960,12 @@ pub fn to_internal(json: &mut Config) -> Result<internal::Config> {
dns.hosts = hosts;
}
let api = if let Some(ext_api) = json.api.as_ref() {
if let (Some(ext_address), Some(ext_port)) =
(ext_api.address.as_ref(), ext_api.port.as_ref())
{
let mut api = internal::Api::new();
api.address = ext_address.to_owned();
api.port = ext_port.to_owned() as u32;
protobuf::SingularPtrField::some(api)
} else {
protobuf::SingularPtrField::none()
}
} else {
protobuf::SingularPtrField::none()
};
let mut config = internal::Config::new();
config.log = protobuf::SingularPtrField::some(log);
config.inbounds = inbounds;
config.outbounds = outbounds;
config.router = router;
config.dns = protobuf::SingularPtrField::some(dns);
config.api = api;
Ok(config)
}

View File

@@ -21,6 +21,9 @@ use app::{
nat_manager::NatManager, outbound::manager::OutboundManager, router::Router,
};
#[cfg(feature = "stat")]
use crate::app::{stat_manager::StatManager, SyncStatManager};
#[cfg(feature = "api")]
use crate::app::api::api_server::ApiServer;
@@ -72,6 +75,8 @@ pub struct RuntimeManager {
router: Arc<RwLock<Router>>,
dns_client: Arc<RwLock<DnsClient>>,
outbound_manager: Arc<RwLock<OutboundManager>>,
#[cfg(feature = "stat")]
stat_manager: SyncStatManager,
#[cfg(feature = "auto-reload")]
watcher: Mutex<Option<RecommendedWatcher>>,
}
@@ -87,6 +92,7 @@ impl RuntimeManager {
router: Arc<RwLock<Router>>,
dns_client: Arc<RwLock<DnsClient>>,
outbound_manager: Arc<RwLock<OutboundManager>>,
#[cfg(feature = "stat")] stat_manager: SyncStatManager,
) -> Arc<Self> {
Arc::new(Self {
#[cfg(feature = "auto-reload")]
@@ -99,11 +105,18 @@ impl RuntimeManager {
router,
dns_client,
outbound_manager,
#[cfg(feature = "stat")]
stat_manager,
#[cfg(feature = "auto-reload")]
watcher: Mutex::new(None),
})
}
#[cfg(feature = "stat")]
pub fn stat_manager(&self) -> SyncStatManager {
self.stat_manager.clone()
}
pub async fn set_outbound_selected(&self, outbound: &str, select: &str) -> Result<(), Error> {
if let Some(selector) = self.outbound_manager.read().await.get_selector(outbound) {
selector
@@ -386,10 +399,16 @@ pub fn start(rt_id: RuntimeId, opts: StartOptions) -> Result<(), Error> {
&mut config.router,
dns_client.clone(),
)));
#[cfg(feature = "stat")]
let stat_manager = Arc::new(RwLock::new(StatManager::new()));
#[cfg(feature = "stat")]
runners.push(StatManager::cleanup_task(stat_manager.clone()));
let dispatcher = Arc::new(Dispatcher::new(
outbound_manager.clone(),
router.clone(),
dns_client.clone(),
#[cfg(feature = "stat")]
stat_manager.clone(),
));
let nat_manager = Arc::new(NatManager::new(dispatcher.clone()));
let inbound_manager =
@@ -449,6 +468,8 @@ pub fn start(rt_id: RuntimeId, opts: StartOptions) -> Result<(), Error> {
router,
dns_client,
outbound_manager,
#[cfg(feature = "stat")]
stat_manager,
);
// Monitor config file changes.
@@ -461,20 +482,13 @@ pub fn start(rt_id: RuntimeId, opts: StartOptions) -> Result<(), Error> {
#[cfg(feature = "api")]
{
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
let listen_addr = if !(&*option::API_LISTEN).is_empty() {
Some(
(&*option::API_LISTEN)
.parse::<SocketAddr>()
.map_err(|e| Error::Config(anyhow!("parse SocketAddr failed: {}", e)))?,
)
} else if let Some(api) = config.api.as_ref() {
Some(SocketAddr::new(
api.address
.parse::<IpAddr>()
.map_err(|e| Error::Config(anyhow!("parse IpAddr failed: {}", e)))?,
api.port as u16,
))
} else {
None
};