amux: fix stream frame payload size

This commit is contained in:
eric
2022-02-23 15:51:49 +08:00
parent 32d6c95226
commit 61968bd077

View File

@@ -32,6 +32,7 @@ pub mod outbound;
pub const FRAME_STREAM: u8 = 0x01;
pub const FRAME_STREAM_FIN: u8 = 0x02;
pub const MAX_STREAM_FRAME_DATA_LEN: u16 = u16::MAX;
pub fn random_u16() -> u16 {
use rand::{rngs::StdRng, RngCore, SeedableRng};
@@ -59,7 +60,8 @@ impl MuxFrame {
MuxFrame::Stream(id, data) => {
buf.put_u8(FRAME_STREAM);
buf.put_u16(*id as u16);
buf.put_u16(data.len() as u16); // FIXME check len
assert!(data.len() <= MAX_STREAM_FRAME_DATA_LEN.into());
buf.put_u16(data.len() as u16);
buf.put_slice(data);
}
MuxFrame::StreamFin(id) => {
@@ -88,7 +90,7 @@ pub type Streams = Arc<Mutex<HashMap<StreamId, Sender<Vec<u8>>>>>;
enum TaskState {
Idle,
Pending(Pin<Box<dyn Future<Output = io::Result<()>> + 'static + Sync + Send>>),
Pending(Pin<Box<dyn Future<Output = io::Result<usize>> + 'static + Sync + Send>>),
}
pub struct MuxStream {
@@ -180,14 +182,19 @@ impl AsyncWrite for MuxStream {
loop {
match self.write_state {
TaskState::Idle => {
let frame = MuxFrame::Stream(self.stream_id, buf.to_vec());
let to_write = min(buf.len(), MAX_STREAM_FRAME_DATA_LEN.into());
let frame = MuxFrame::Stream(self.stream_id, buf[..to_write].to_vec());
let tx = self.frame_write_tx.clone();
let task =
Box::pin(async move { tx.send(frame).map_err(|_| broken_pipe()).await });
let task = Box::pin(async move {
tx.send(frame)
.map_ok(|_| to_write)
.map_err(|_| broken_pipe())
.await
});
self.write_state = TaskState::Pending(task);
}
TaskState::Pending(ref mut task) => {
let res = ready!(task.as_mut().poll(cx).map_ok(|_| buf.len()));
let res = ready!(task.as_mut().poll(cx));
self.write_state = TaskState::Idle;
return Poll::Ready(res);
}
@@ -205,8 +212,12 @@ impl AsyncWrite for MuxStream {
TaskState::Idle => {
let frame = MuxFrame::StreamFin(self.stream_id);
let tx = self.frame_write_tx.clone();
let task =
Box::pin(async move { tx.send(frame).map_err(|_| broken_pipe()).await });
let task = Box::pin(async move {
tx.send(frame)
.map_ok(|_| 0) // FIXME temp workaround the signature
.map_err(|_| broken_pipe())
.await
});
self.shutdown_state = TaskState::Pending(task);
}
TaskState::Pending(ref mut task) => {
@@ -226,8 +237,11 @@ pub struct MuxConnection<S> {
backpressure_boundary: usize,
}
fn unknown_frame() -> io::Error {
io::Error::new(io::ErrorKind::Interrupted, "unknown frame type")
fn unknown_frame(t: u8) -> io::Error {
io::Error::new(
io::ErrorKind::Interrupted,
format!("unknown frame type {}", t),
)
}
impl<S> MuxConnection<S> {
@@ -293,7 +307,7 @@ impl<S> MuxConnection<S> {
Ok(Some(frame))
}
_ => Err(unknown_frame()),
_ => Err(unknown_frame(buf[0])),
}
}
@@ -442,7 +456,8 @@ impl MuxSession {
}
}
// Borken pipe.
Err(_) => {
Err(e) => {
log::debug!("receiving frame failed: {}", e);
break;
}
}