Refactor
This commit is contained in:
@@ -240,62 +240,57 @@ impl DnsClient {
|
||||
async move {
|
||||
let (mut r, mut s) = socket.split();
|
||||
let server = SocksAddr::from(server);
|
||||
let mut last_err = None;
|
||||
for _i in 0..*option::MAX_DNS_RETRIES {
|
||||
debug!("looking up host {} on {}", host, server);
|
||||
for i in 0..*option::MAX_DNS_RETRIES {
|
||||
debug!(
|
||||
"looking up host {} on {} ({}/{})",
|
||||
host,
|
||||
server,
|
||||
i + 1,
|
||||
*option::MAX_DNS_RETRIES
|
||||
);
|
||||
let start = tokio::time::Instant::now();
|
||||
// 1) send DNS request
|
||||
|
||||
if let Err(err) = s.send_to(&request, &server).await {
|
||||
last_err = Some(anyhow!("send DNS request to {} failed: {}", server, err));
|
||||
// socket send_to error, retry
|
||||
debug!("send DNS query failed: {}", err);
|
||||
continue;
|
||||
}
|
||||
// 2) wait response
|
||||
|
||||
let mut buf = vec![0u8; 512];
|
||||
let recv_result = match timeout(
|
||||
let n = match timeout(
|
||||
Duration::from_secs(*option::DNS_TIMEOUT),
|
||||
r.recv_from(&mut buf),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok((n, _))) => Ok((n, ())),
|
||||
Ok(Err(err)) => {
|
||||
Err(anyhow!("recv DNS response from {} failed: {}", server, err))
|
||||
} // socket recv_from error
|
||||
Err(e) => Err(anyhow!("recv DNS response from {} timeout: {}", server, e)), // timeout
|
||||
Ok(Ok((n, _))) => n,
|
||||
Ok(Err(e)) => {
|
||||
debug!("recv DNS response from {} failed: {}", server, e);
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("recv DNS response from {} failed: {}", server, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// retry
|
||||
if let Err(err) = recv_result {
|
||||
last_err = Some(err);
|
||||
continue;
|
||||
}
|
||||
// happy path !!
|
||||
let n: usize = recv_result.unwrap().0;
|
||||
// 3) parse resp
|
||||
|
||||
let resp = match Message::from_vec(&buf[..n]) {
|
||||
Ok(resp) => resp,
|
||||
Err(err) => {
|
||||
last_err =
|
||||
Some(anyhow!("parse DNS message from {} failed: {}", server, err));
|
||||
// broken response, no retry
|
||||
debug!("parse DNS message from {} failed: {}", server, err);
|
||||
break;
|
||||
}
|
||||
};
|
||||
// 4) check resp code
|
||||
|
||||
if resp.response_code() != ResponseCode::NoError {
|
||||
last_err = Some(anyhow!(
|
||||
"DNS response from {} for {} error: {}",
|
||||
debug!(
|
||||
"error DNS response from {} for {}: {}",
|
||||
server,
|
||||
host,
|
||||
resp.response_code()
|
||||
));
|
||||
// error response, no retry
|
||||
//
|
||||
// TODO Needs more careful investigations, I'm not quite sure about
|
||||
// this.
|
||||
);
|
||||
break;
|
||||
}
|
||||
// 5) find address
|
||||
|
||||
let mut ips = Vec::new();
|
||||
for ans in resp.answers() {
|
||||
// TODO checks?
|
||||
@@ -316,14 +311,10 @@ impl DnsClient {
|
||||
// response with 0 records
|
||||
//
|
||||
// TODO Not sure how to due with this.
|
||||
last_err = Some(anyhow!(
|
||||
"no records in DNS response from {} for {}",
|
||||
server,
|
||||
host
|
||||
));
|
||||
debug!("no records in DNS response from {} for {}", server, host);
|
||||
break;
|
||||
}
|
||||
// 6) return cache entry
|
||||
|
||||
let elapsed = tokio::time::Instant::now().duration_since(start);
|
||||
let ttl = resp.answers().iter().next().unwrap().ttl();
|
||||
debug!(
|
||||
@@ -337,15 +328,15 @@ impl DnsClient {
|
||||
|
||||
let Some(deadline) = Instant::now().checked_add(Duration::from_secs(ttl.into()))
|
||||
else {
|
||||
last_err = Some(anyhow!("invalid ttl"));
|
||||
debug!("invalid ttl");
|
||||
break;
|
||||
};
|
||||
|
||||
let entry = CacheEntry { ips, deadline };
|
||||
debug!("ips for {}: {:#?}", host, &entry);
|
||||
debug!("ips for {}: {:?}", host, &entry);
|
||||
return Ok(entry);
|
||||
}
|
||||
Err(last_err.unwrap_or_else(|| anyhow!("all lookup attempts for {} failed", host)))
|
||||
Err(anyhow!("all lookup attempts for {} failed", host))
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
|
||||
@@ -739,9 +739,10 @@ impl MuxConnector {
|
||||
return None;
|
||||
}
|
||||
if self.streams.lock().await.len() >= self.concurrency {
|
||||
debug!(
|
||||
trace!(
|
||||
"exceeding allowed concurrency ({}): {}",
|
||||
self.session_id, self.concurrency
|
||||
self.session_id,
|
||||
self.concurrency
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user