Skip to content

Infinite (SSE) streams always raises hyper::Error(IncompleteMessage) / connection closed before message completed #3759

Open
@peku33

Description

@peku33

Version
1.4.1 full

Platform
windows 11 64b

Description
For (possibly infinite) SSE streams, when client disconnects (ex. closes connection), hyper connection always returns error of type hyper::Error(IncompleteMessage).

I tried this code:
I used server example from official website + tokio IntervalStream for body:

use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddr};

use futures::StreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{header, Request, Response};
use hyper_util::rt::TokioIo;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_stream::wrappers::IntervalStream;

async fn hello(
    _: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, Infallible> {
    let stream = IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
        .enumerate()
        .map(|(count, _)| {
            let data = Bytes::from(format!("data: Hello {}!\n\n", count));
            let frame = Frame::data(data);
            Ok::<_, Infallible>(frame)
        });

    let http_response = Response::builder()
        .header(header::CONTENT_TYPE, "text/event-stream")
        .body(BodyExt::boxed(StreamBody::new(stream)))
        .unwrap();

    Ok(http_response)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 3000));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);
        tokio::task::spawn(async move {
            if let Err(err) = http1::Builder::new()
                .serve_connection(io, service_fn(hello))
                .await
            {
                eprintln!("Error serving connection: {}", err);
            }
        });
    }

    Ok(())
}

I expected to see this happen: serve_connection shouldn't return Err on client disconnect. Ok should be returned and stream dropped.

Instead, this happened: Error of type hyper::Error(IncompleteMessage) is returned.

Metadata

Metadata

Assignees

No one assigned

    Labels

    C-bugCategory: bug. Something is wrong. This is bad!

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions