Skip to content

Bug with response using struct implementing bytes::Buf #3650

@tom-kuchler

Description

@tom-kuchler

Version
hyper: 1.3.1
hyper-util 0.1.3
bytes: 1.6.0
tokio: 1.37,0

Platform
Linux LAPTOP-CSN0P74T 5.15.146.1-microsoft-standard-WSL2

Summary
For a project I'm building I wanted to provide serialization of some internal structures and send them out as responses.
To do this I implemented a struct that implementes hyper::body::Body and implemented bytes::Buf for my internal structure.
When sending the structure I found that parts of the Buf had been replaced by different data.
Specifically I found that not all parts of the Buf were read and instead different data was inserted.

Minimal Viable Example
Here is a minimal example that implements the Buf interface over a Vec but only returns single byte slices.

use std::{cmp::min, convert::Infallible, net::SocketAddr};

use hyper::{body::Frame, server::conn::http1, service::service_fn, Error, Request, Response};
use tokio::{net::TcpListener, spawn};

const TEST_MESSAGE: &str =
    "test message that needs to be a few bytes long for the sake of demonstration";

struct MinimalBytes {
    current: usize,
    data: Vec<u8>,
}

impl bytes::Buf for MinimalBytes {
    fn remaining(&self) -> usize {
        return self.data.len() - self.current;
    }

    fn advance(&mut self, cnt: usize) {
        if self.current + cnt > self.data.len() {
            panic!("Attempted to advance past end of MinimalBytes");
        }
        self.current += cnt;
    }

    fn chunk(&self) -> &[u8] {
        let end = min(self.current + 1, self.data.len());
        return &self.data[self.current..end];
    }
}

struct MinimalBody {
    frame: Option<MinimalBytes>,
}

impl hyper::body::Body for MinimalBody {
    type Data = MinimalBytes;
    type Error = Error;
    fn poll_frame(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
        std::task::Poll::Ready(
            self.get_mut()
                .frame
                .take()
                .and_then(|min_bytes| Some(Ok(Frame::data(min_bytes)))),
        )
    }
}

async fn test_service(
    _: Request<hyper::body::Incoming>,
) -> Result<Response<MinimalBody>, Infallible> {
    return Ok(Response::new(MinimalBody {
        frame: Some(MinimalBytes {
            current: 0,
            data: String::from(TEST_MESSAGE).into_bytes(),
        }),
    }));
}

#[tokio::main]
async fn main() {
    // start server
    let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
    let listener = TcpListener::bind(addr).await.unwrap();
    loop {
        let (stream, _) = listener.accept().await.unwrap();

        let io = hyper_util::rt::TokioIo::new(stream);

        spawn(async move {
            if let Err(err) = http1::Builder::new()
                .serve_connection(io, service_fn(test_service))
                .await
            {
                eprintln!("Error serving connection: {:?}", err);
            }
        });
    }
}

What I would expect to happen is a response that contains the test message. What happens instead is that after every byte of that message it inserts the 7 bytes \r\n0\r\n\r\n and then advances past 7 bytes in the actual buffer without every asking for those chunks.
I have checked it with a few different steps sizes, and it always seems to insert these specific 7 bytes if a chunk is returned that is not equal to the total size of remaining.
If the chunk length is equal to remaining then everything works.

Let me know if there is some additional information I could provide or something I could easily look into to help fix this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    B-upstreamBlocked: needs a change in a dependency or the compiler.C-bugCategory: bug. Something is wrong. This is bad!

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions