Skip to content

Streaming responses (from an AsyncWrite) #570

Open
@Nerixyz

Description

@Nerixyz

I think these might be two issues in one:

  1. There's no example (except maybe the SSE one) that has streaming responses. Which is probably because it's hard to come up with a good example for streaming responses.
  2. I haven't found good documentation on how to "pipe" an AsyncWrite into a Stream<Item=Bytes(Mut)> (maybe something for actix-web-lab).

Note: Also see discussion on actix-web Discord: https://discord.com/channels/771444961383153695/771447545154371646/1009110232473014383

The proposed example is one that streams files from a directory (files) as a zip file, i.e. it dynamically creates the zip file.

For this I'm using async_zip which exposes an API that requires the user to pass in an AsyncWrite (ZipFileWriter::new).
To "pipe" the AsyncWrite to a Stream, I'm using a DuplexStream and the BytesCodec.

main.rs
use actix_web::{get, http, App, HttpResponse, HttpServer, Responder};
use async_zip::write::{EntryOptions, ZipFileWriter};
use futures::stream::TryStreamExt;
use std::io;
use tokio::io::AsyncWrite;
use tokio_util::codec;

#[get("/")]
async fn index() -> impl Responder {
    let (to_write, to_read) = tokio::io::duplex(2048);
    tokio::spawn(async move {
        let mut zipper = async_zip::write::ZipFileWriter::new(to_write);
        if let Err(e) = read_dir(&mut zipper).await {
            // TODO: do something
            eprintln!("Failed to write files from directory to zip: {e}")
        }
        if let Err(e) = zipper.close().await {
            // TODO: do something
            eprintln!("Failed to close zipper: {e}")
        }
    });

    let stream = codec::FramedRead::new(to_read, codec::BytesCodec::new()).map_ok(|b| b.freeze());
    HttpResponse::Ok()
        .append_header((
            http::header::CONTENT_DISPOSITION,
            r#"attachment; filename="folder.zip""#,
        ))
        // not sure if this is really necessary,
        // but we're already sending compressed data,
        // so make sure other middleware won't compress this again
        .append_header((http::header::CONTENT_ENCODING, "identity"))
        .streaming(stream)
}

async fn read_dir<W>(zipper: &mut ZipFileWriter<W>) -> Result<(), io::Error>
where
    W: AsyncWrite + Unpin,
{
    let mut dir = tokio::fs::read_dir("files").await?;
    while let Ok(Some(entry)) = dir.next_entry().await {
        if !entry.metadata().await.map(|m| m.is_file()).unwrap_or(false) {
            continue;
        }
        let mut file = match tokio::fs::OpenOptions::new()
            .read(true)
            .open(entry.path())
            .await
        {
            Ok(f) => f,
            Err(_) => continue, // we can't read the file
        };
        let filename = match entry.file_name().into_string() {
            Ok(s) => s,
            Err(_) => continue, // the file has a non UTF-8 name
        };

        let mut entry = zipper
            .write_entry_stream(EntryOptions::new(filename, async_zip::Compression::Deflate))
            .await
            .map_err(zip_to_io_err)?;
        tokio::io::copy(&mut file, &mut entry).await?;
        entry.close().await.map_err(zip_to_io_err)?;
    }
    Ok(())
}

fn zip_to_io_err(e: async_zip::error::ZipError) -> io::Error {
    io::Error::new(io::ErrorKind::Other, e)
}

#[actix_web::main]
async fn main() -> io::Result<()> {
    HttpServer::new(move || App::new().service(index))
        .bind(("127.0.0.1", 8080))?
        .run()
        .await
}

As I explained on Discord, using a DuplexStream is probably overkill, since it's supposed to be used bi-directional (see example in tokio docs), so I tried to extract the internal Pipe used by the tokio implementation and made a pipe specifically for (buffered) piping of AsyncWrite to a Stream<BytesMut>. I'm not sure if this should be included in actix-web-lab as a utility when dealing with AsyncWrite (or maybe in some other crate?).

async_pipe.rs
use std::{sync::{Arc, Mutex, MutexGuard}, task::{self,Waker, Poll}, pin::Pin};

use bytes::{BytesMut, Buf};
use futures::Stream;
use tokio::io::AsyncWrite;

/// The `AsyncWrite` half of an [`async_pipe`]
pub struct AsyncPipeWriter(Arc<Mutex<Pipe>>);
/// The `Stream` half of an [`async_pipe`]
pub struct AsyncPipeReader(Arc<Mutex<Pipe>>);

/// Creates buffered pipe that pipes writes from an `AsyncWrite` to a `Stream<Item=BytesMut>`.
/// 
/// `max_buf_size` is the maximum amount of bytes that can be written to the pipe's internal buffer
/// before further writes return `Poll::Pending`.
pub fn async_pipe(max_buf_size: usize) -> (AsyncPipeWriter, AsyncPipeReader) {
    let pipe = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
    (AsyncPipeWriter(pipe.clone()), AsyncPipeReader(pipe))
}

/// A unidirectional IO over a piece of memory.
///
/// Data can be written to the pipe, and reading will return that data.
/// 
/// [tokio's](https://github.com/tokio-rs/tokio/blob/de81985762a242c77361a6ab9de198372ca85987/tokio/src/io/util/mem.rs#L54-L76)
/// internal representation of a pipe for a `tokio::io::DuplexStream`.
#[derive(Debug)]
struct Pipe {
    /// The buffer storing the bytes written, also read from.
    ///
    /// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
    /// functionality already.
    buffer: BytesMut,
    /// Determines if the write side has been closed.
    is_closed: bool,
    /// The maximum amount of bytes that can be written before returning
    /// `Poll::Pending`.
    max_buf_size: usize,
    /// If the `read` side has been polled and is pending, this is the waker
    /// for that parked task.
    read_waker: Option<Waker>,
    /// If the `write` side has filled the `max_buf_size` and returned
    /// `Poll::Pending`, this is the waker for that parked task.
    write_waker: Option<Waker>,
}

impl Pipe {
    fn new(max_buf_size: usize) -> Self {
        Pipe {
            buffer: BytesMut::new(),
            is_closed: false,
            max_buf_size,
            read_waker: None,
            write_waker: None,
        }
    }

    fn close_write(&mut self) {
        self.is_closed = true;
        // needs to notify any readers that no more data will come
        if let Some(waker) = self.read_waker.take() {
            waker.wake();
        }
    }

    fn close_read(&mut self) {
        self.is_closed = true;
        // needs to notify any writers that they have to abort
        if let Some(waker) = self.write_waker.take() {
            waker.wake();
        }
    }

    fn poll_read_internal(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>
    ) -> Poll<Option<BytesMut>> {
        if self.buffer.has_remaining() {
            let bytes = std::mem::take(&mut self.buffer);
            if let Some(waker) = self.write_waker.take() {
                waker.wake();
            }
            Poll::Ready(Some(bytes))
        } else if self.is_closed {
            Poll::Ready(None)
        } else {
            self.read_waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }

    fn poll_write_internal(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        if self.is_closed {
            return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
        }
        let avail = self.max_buf_size - self.buffer.len();
        if avail == 0 {
            self.write_waker = Some(cx.waker().clone());
            return Poll::Pending;
        }

        let len = buf.len().min(avail);
        self.buffer.extend_from_slice(&buf[..len]);
        if let Some(waker) = self.read_waker.take() {
            waker.wake();
        }
        Poll::Ready(Ok(len))
    }
}

impl AsyncWrite for Pipe {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        self.poll_write_internal(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        _: &mut task::Context<'_>,
    ) -> Poll<std::io::Result<()>> {
        self.close_write();
        Poll::Ready(Ok(()))
    }
}

impl AsyncWrite for AsyncPipeWriter {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        Pin::new(&mut *always_lock(&self.0)).poll_write(cx, buf)
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut *always_lock(&self.0)).poll_flush(cx)
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut *always_lock(&self.0)).poll_shutdown(cx)
    }
}

impl Stream for Pipe {
    type Item = BytesMut;

    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_read_internal(cx)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let remaining = self.buffer.remaining();
        (remaining, Some(remaining))
    }
}

impl Stream for AsyncPipeReader {
    type Item = BytesMut;

    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut *always_lock(&self.0)).poll_next(cx)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        always_lock(&self.0).size_hint()
    }
}

impl Drop for AsyncPipeWriter {
    fn drop(&mut self) {
        // notify the other side of the closure
        always_lock(&self.0).close_write();
    }
}

impl Drop for AsyncPipeReader {
    fn drop(&mut self) {
        // notify the other side of the closure
        always_lock(&self.0).close_read();
    }
}

#[inline]
fn always_lock<T>(mtx: &Mutex<T>) -> MutexGuard<T> {
    match mtx.lock() {
        Ok(g) => g,
        Err(e) => e.into_inner(),
    }
}
index handler with async_pipe
#[get("/")]
async fn index() -> impl Responder {
    let (to_write, stream) = async_pipe(2048);
    tokio::spawn(async move {
        let mut zipper = async_zip::write::ZipFileWriter::new(to_write);
        if let Err(e) = read_dir(&mut zipper).await {
            // TODO: do something
            eprintln!("Failed to write files from directory to zip: {e}")
        }
        if let Err(e) = zipper.close().await {
            // TODO: do something
            eprintln!("Failed to close zipper: {e}")
        }
    });
    HttpResponse::Ok()
        .append_header((
            http::header::CONTENT_DISPOSITION,
            r#"attachment; filename="folder.zip""#,
        ))
        // not sure if this is really necessary,
        // but we're already sending compressed data,
        // so make sure other middleware won't compress this again
        .append_header((http::header::CONTENT_ENCODING, "identity"))
        .streaming(stream.map(|b| Ok::<_, io::Error>(b.freeze())))
}
cargo.toml
[package]
name = "actix-zippy"
version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4.1.0"
async_zip = "0.0.8"
bytes = "1.2.1"
futures = "0.3.23"
tokio = { version = "1.20.1", features = ["io-util", "fs"] }
tokio-stream = "0.1.9"
tokio-util = { version = "0.7.3", features = ["codec"] }

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions