net: enable TCP keepalive
This commit is contained in:
@@ -5,7 +5,7 @@ use anyhow::Result;
|
||||
|
||||
use futures::stream::StreamExt;
|
||||
use log::*;
|
||||
use tokio::net::{TcpListener, TcpStream, UdpSocket};
|
||||
use tokio::net::{TcpStream, UdpSocket};
|
||||
use tokio::sync::mpsc::channel as tokio_channel;
|
||||
use tokio::sync::mpsc::{Receiver as TokioReceiver, Sender as TokioSender};
|
||||
|
||||
@@ -14,7 +14,7 @@ use crate::app::nat_manager::{NatManager, UdpPacket};
|
||||
use crate::proxy::InboundHandler;
|
||||
use crate::proxy::{
|
||||
InboundDatagram, InboundTransport, SimpleInboundDatagram, SimpleProxyStream,
|
||||
SingleInboundTransport,
|
||||
SingleInboundTransport, TcpListener,
|
||||
};
|
||||
use crate::session::{Network, Session, SocksAddr};
|
||||
use crate::Runner;
|
||||
|
||||
@@ -12,15 +12,20 @@ use futures::stream::Stream;
|
||||
use futures::TryFutureExt;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use socket2::SockRef;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::{TcpSocket, UdpSocket};
|
||||
use tokio::net::{TcpSocket, TcpStream, UdpSocket};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::timeout;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::AsRawFd;
|
||||
#[cfg(windows)]
|
||||
use std::os::windows::io::AsRawSocket;
|
||||
#[cfg(target_os = "android")]
|
||||
use {
|
||||
std::os::unix::io::AsRawFd, std::os::unix::io::RawFd, tokio::io::AsyncReadExt,
|
||||
tokio::io::AsyncWriteExt, tokio::net::UnixStream,
|
||||
std::os::unix::io::RawFd, tokio::io::AsyncReadExt, tokio::io::AsyncWriteExt,
|
||||
tokio::net::UnixStream,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -146,10 +151,10 @@ pub async fn set_outbound_binds(binds: Vec<OutboundBind>) {
|
||||
}
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
async fn protect_socket(fd: RawFd) -> io::Result<()> {
|
||||
async fn protect_socket<S: AsRawFd>(socket: S) -> io::Result<()> {
|
||||
if let Some(path) = SOCKET_PROTECT_PATH.lock().await.as_ref() {
|
||||
let mut stream = UnixStream::connect(path).await?;
|
||||
stream.write_i32(fd as i32).await?;
|
||||
stream.write_i32(socket.as_raw_fd() as i32).await?;
|
||||
if stream.read_i32().await? != 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
@@ -160,9 +165,6 @@ async fn protect_socket(fd: RawFd) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(target_os = "macos", target_os = "linux"))]
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
#[cfg(any(target_os = "macos", target_os = "linux"))]
|
||||
trait BindSocket: AsRawFd {
|
||||
fn bind(&self, bind_addr: &SocketAddr) -> io::Result<()>;
|
||||
@@ -185,6 +187,24 @@ impl BindSocket for socket2::Socket {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TcpListener {
|
||||
inner: tokio::net::TcpListener,
|
||||
}
|
||||
|
||||
impl TcpListener {
|
||||
pub async fn bind(addr: &SocketAddr) -> io::Result<Self> {
|
||||
Ok(Self {
|
||||
inner: tokio::net::TcpListener::bind(addr).await?,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
|
||||
let (stream, addr) = self.inner.accept().await?;
|
||||
apply_socket_opts(&stream)?;
|
||||
Ok((stream, addr))
|
||||
}
|
||||
}
|
||||
|
||||
async fn bind_socket<T: BindSocket>(
|
||||
socket: &T,
|
||||
bind_addr: &SocketAddr,
|
||||
@@ -304,11 +324,27 @@ async fn create_udp_socket(
|
||||
bind_socket(&socket, bind_addr, indicator).await?;
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
protect_socket(socket.as_raw_fd()).await?;
|
||||
protect_socket(&socket).await?;
|
||||
|
||||
UdpSocket::from_std(socket.into())
|
||||
}
|
||||
|
||||
fn apply_socket_opts_internal(s: SockRef) -> io::Result<()> {
|
||||
s.set_keepalive(true)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn apply_socket_opts<S: AsRawFd>(socket: &S) -> io::Result<()> {
|
||||
let sock_ref = SockRef::from(socket);
|
||||
apply_socket_opts_internal(sock_ref)
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn apply_socket_opts<S: AsRawSocket>(socket: &S) -> io::Result<()> {
|
||||
let sock_ref = SockRef::from(socket);
|
||||
apply_socket_opts_internal(sock_ref)
|
||||
}
|
||||
|
||||
// A single TCP dial.
|
||||
async fn tcp_dial_task(
|
||||
dial_addr: SocketAddr,
|
||||
@@ -322,7 +358,7 @@ async fn tcp_dial_task(
|
||||
bind_socket(&socket, bind_addr, &dial_addr).await?;
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
protect_socket(socket.as_raw_fd()).await?;
|
||||
protect_socket(&socket).await?;
|
||||
|
||||
trace!("tcp dialing {}", &dial_addr);
|
||||
let stream = timeout(
|
||||
@@ -330,6 +366,9 @@ async fn tcp_dial_task(
|
||||
socket.connect(dial_addr),
|
||||
)
|
||||
.await??;
|
||||
|
||||
apply_socket_opts(&stream)?;
|
||||
|
||||
trace!("tcp connected {} <-> {}", stream.local_addr()?, &dial_addr);
|
||||
Ok((Box::new(SimpleProxyStream(stream)), dial_addr))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user