//! HTTP/2 protocol. use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::time::{sleep_until, Sleep}; use bytes::Bytes; use futures_core::{ready, Stream}; use h2::{ server::{handshake, Connection, Handshake}, RecvStream, }; use crate::{ config::ServiceConfig, error::{DispatchError, PayloadError}, }; mod dispatcher; mod service; pub use self::dispatcher::Dispatcher; pub use self::service::H2Service; /// HTTP/2 peer stream. pub struct Payload { stream: RecvStream, } impl Payload { pub(crate) fn new(stream: RecvStream) -> Self { Self { stream } } } impl Stream for Payload { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); match ready!(Pin::new(&mut this.stream).poll_data(cx)) { Some(Ok(chunk)) => { let len = chunk.len(); match this.stream.flow_control().release_capacity(len) { Ok(()) => Poll::Ready(Some(Ok(chunk))), Err(err) => Poll::Ready(Some(Err(err.into()))), } } Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), None => Poll::Ready(None), } } } pub(crate) fn handshake_with_timeout( io: T, config: &ServiceConfig, ) -> HandshakeWithTimeout where T: AsyncRead + AsyncWrite + Unpin, { HandshakeWithTimeout { handshake: handshake(io), timer: config .client_request_deadline() .map(|deadline| Box::pin(sleep_until(deadline.into()))), } } pub(crate) struct HandshakeWithTimeout { handshake: Handshake, timer: Option>>, } impl Future for HandshakeWithTimeout where T: AsyncRead + AsyncWrite + Unpin, { type Output = Result<(Connection, Option>>), DispatchError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match Pin::new(&mut this.handshake).poll(cx)? { // return the timer on success handshake; its slot can be re-used for h2 ping-pong Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))), Poll::Pending => match this.timer.as_mut() { Some(timer) => { ready!(timer.as_mut().poll(cx)); Poll::Ready(Err(DispatchError::SlowRequestTimeout)) } None => Poll::Pending, }, } } } #[cfg(test)] mod tests { use std::panic::{RefUnwindSafe, UnwindSafe}; use static_assertions::assert_impl_all; use super::*; assert_impl_all!(Payload: Unpin, Send, Sync, UnwindSafe, RefUnwindSafe); }