Open
Description
Version
1.4.1 full
Platform
windows 11 64b
Description
For (possibly infinite) SSE streams, when client disconnects (ex. closes connection), hyper connection always returns error of type hyper::Error(IncompleteMessage)
.
I tried this code:
I used server example from official website + tokio IntervalStream for body:
use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddr};
use futures::StreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{header, Request, Response};
use hyper_util::rt::TokioIo;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_stream::wrappers::IntervalStream;
async fn hello(
_: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, Infallible> {
let stream = IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
.enumerate()
.map(|(count, _)| {
let data = Bytes::from(format!("data: Hello {}!\n\n", count));
let frame = Frame::data(data);
Ok::<_, Infallible>(frame)
});
let http_response = Response::builder()
.header(header::CONTENT_TYPE, "text/event-stream")
.body(BodyExt::boxed(StreamBody::new(stream)))
.unwrap();
Ok(http_response)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 3000));
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(hello))
.await
{
eprintln!("Error serving connection: {}", err);
}
});
}
Ok(())
}
I expected to see this happen: serve_connection
shouldn't return Err on client disconnect. Ok should be returned and stream dropped.
Instead, this happened: Error of type hyper::Error(IncompleteMessage)
is returned.