Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add send datagram support in h3 client #255

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions h3/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ use http::request;
use tracing::{info, instrument, trace};

use crate::{
connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
error::{Code, Error, ErrorLevel},
frame::FrameStream,
proto::{frame::Frame, headers::Header, push::PushId},
qpack,
quic::{self, StreamId},
stream::{self, BufRecvStream},
connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
error::{Code, Error, ErrorLevel},
ext::Datagram,
frame::FrameStream,
proto::{frame::Frame, headers::Header, push::PushId},
qpack, quic::{self, RecvDatagramExt, SendDatagramExt, StreamId},
stream::{self, BufRecvStream}
};

use super::stream::RequestStream;
use super::stream::{ReadDatagram, RequestStream};

/// HTTP/3 request sender
///
Expand Down Expand Up @@ -456,3 +456,37 @@ where
Poll::Pending
}
}

impl<C, B> Connection<C, B>
where
C: quic::Connection<B> + SendDatagramExt<B>,
B: Buf,
{
/// Sends a datagram
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error> {
self.inner
.conn
.send_datagram(Datagram::new(stream_id, data))?;

#[cfg(feature = "tracing")]
tracing::info!("Sent datagram");

Ok(())
}
}

impl<C, B> Connection<C, B>
where
C: quic::Connection<B> + RecvDatagramExt,
B: Buf,
{
/// Reads an incoming datagram
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub fn read_datagram(&mut self) -> ReadDatagram<C, B> {
ReadDatagram {
conn: self,
_marker: PhantomData,
}
}
}
52 changes: 45 additions & 7 deletions h3/src/client/stream.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use bytes::Buf;
use futures_util::future;
use futures_util::{future, future::Future, ready};
use http::{HeaderMap, Response};
use pin_project_lite::pin_project;
#[cfg(feature = "tracing")]
use tracing::instrument;

use crate::{
connection::{self, ConnectionState, SharedStateRef},
error::{Code, Error, ErrorLevel},
proto::{frame::Frame, headers::Header},
qpack,
quic::{self},
connection::{self, ConnectionState, SharedStateRef}, error::{Code, Error, ErrorLevel}, ext::Datagram, proto::{frame::Frame, headers::Header}, qpack, quic::{self, RecvDatagramExt, SendStream as _, StreamId}
};
use std::convert::TryFrom;
use super::connection::Connection;
use std::{convert::TryFrom, marker::PhantomData, task::{Context, Poll},};

/// Manage request bodies transfer, response and trailers.
///
Expand Down Expand Up @@ -168,6 +166,11 @@ where
// rename `cancel()` ?
self.inner.stream.stop_sending(error_code)
}

/// Returns the underlying stream id
pub fn id(&self) -> StreamId {
self.inner.stream.id()
}
}

impl<S, B> RequestStream<S, B>
Expand Down Expand Up @@ -208,6 +211,11 @@ where
//# implementation resets the sending parts of streams and aborts reading
//# on the receiving parts of streams; see Section 2.4 of
//# [QUIC-TRANSPORT].

/// Returns the underlying stream id
pub fn send_id(&self) -> StreamId {
self.inner.stream.send_id()
}
}

impl<S, B> RequestStream<S, B>
Expand All @@ -226,3 +234,33 @@ where
(RequestStream { inner: send }, RequestStream { inner: recv })
}
}

pin_project! {
/// Future for [`Connection::read_datagram`]
pub struct ReadDatagram<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
pub(super) conn: &'a mut Connection<C, B>,
pub(super) _marker: PhantomData<B>,
}
}

impl<'a, C, B> Future for ReadDatagram<'a, C, B>
where
C: quic::Connection<B> + RecvDatagramExt,
B: Buf,
{
type Output = Result<Option<Datagram<C::Buf>>, Error>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(feature = "tracing")]
tracing::trace!("poll: read_datagram");

match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? {
Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))),
None => Poll::Ready(Ok(None)),
}
}
}
2 changes: 1 addition & 1 deletion h3/src/proto/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl StreamId {
}
}

pub(crate) fn into_inner(self) -> u64 {
pub fn into_inner(self) -> u64 {
self.0
}
}
Expand Down