diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index ea149b1e0..7326dfff1 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -10,11 +10,15 @@ use std::{ }; use actix_codec::{AsyncRead, AsyncWrite}; +use actix_rt::time::Sleep; use actix_service::Service; use actix_utils::future::poll_fn; use bytes::{Bytes, BytesMut}; use futures_core::ready; -use h2::server::{Connection, SendResponse}; +use h2::{ + server::{Connection, SendResponse}, + Ping, PingPong, +}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; use pin_project_lite::pin_project; @@ -36,29 +40,46 @@ pin_project! { on_connect_data: OnConnectData, config: ServiceConfig, peer_addr: Option, - _phantom: PhantomData, + ping_pong: Option, + _phantom: PhantomData } } -impl Dispatcher { +impl Dispatcher +where + T: AsyncRead + AsyncWrite + Unpin, +{ pub(crate) fn new( flow: Rc>, - connection: Connection, + mut connection: Connection, on_connect_data: OnConnectData, config: ServiceConfig, peer_addr: Option, ) -> Self { + let ping_pong = config.keep_alive_timer().map(|timer| H2PingPong { + timer: Box::pin(timer), + on_flight: false, + ping_pong: connection.ping_pong().unwrap(), + }); + Self { flow, config, peer_addr, connection, on_connect_data, + ping_pong, _phantom: PhantomData, } } } +struct H2PingPong { + timer: Pin>, + on_flight: bool, + ping_pong: PingPong, +} + impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, @@ -77,54 +98,92 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some((req, tx)) = - ready!(Pin::new(&mut this.connection).poll_accept(cx)?) - { - let (parts, body) = req.into_parts(); - let pl = crate::h2::Payload::new(body); - let pl = Payload::::H2(pl); - let mut req = Request::with_payload(pl); + loop { + match Pin::new(&mut this.connection).poll_accept(cx)? { + Poll::Ready(Some((req, tx))) => { + let (parts, body) = req.into_parts(); + let pl = crate::h2::Payload::new(body); + let pl = Payload::::H2(pl); + let mut req = Request::with_payload(pl); - let head = req.head_mut(); - head.uri = parts.uri; - head.method = parts.method; - head.version = parts.version; - head.headers = parts.headers.into(); - head.peer_addr = this.peer_addr; + let head = req.head_mut(); + head.uri = parts.uri; + head.method = parts.method; + head.version = parts.version; + head.headers = parts.headers.into(); + head.peer_addr = this.peer_addr; - // merge on_connect_ext data into request extensions - this.on_connect_data.merge_into(&mut req); + // merge on_connect_ext data into request extensions + this.on_connect_data.merge_into(&mut req); - let fut = this.flow.service.call(req); - let config = this.config.clone(); + let fut = this.flow.service.call(req); + let config = this.config.clone(); - // multiplex request handling with spawn task - actix_rt::spawn(async move { - // resolve service call and send response. - let res = match fut.await { - Ok(res) => handle_response(res.into(), tx, config).await, - Err(err) => { - let res: Response = err.into(); - handle_response(res, tx, config).await - } - }; + // multiplex request handling with spawn task + actix_rt::spawn(async move { + // resolve service call and send response. + let res = match fut.await { + Ok(res) => handle_response(res.into(), tx, config).await, + Err(err) => { + let res: Response = err.into(); + handle_response(res, tx, config).await + } + }; - // log error. - if let Err(err) = res { - match err { - DispatchError::SendResponse(err) => { - trace!("Error sending HTTP/2 response: {:?}", err) + // log error. + if let Err(err) = res { + match err { + DispatchError::SendResponse(err) => { + trace!("Error sending HTTP/2 response: {:?}", err) + } + DispatchError::SendData(err) => warn!("{:?}", err), + DispatchError::ResponseBody(err) => { + error!("Response payload stream error: {:?}", err) + } + } } - DispatchError::SendData(err) => warn!("{:?}", err), - DispatchError::ResponseBody(err) => { - error!("Response payload stream error: {:?}", err) - } - } + }); } - }); - } + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => match this.ping_pong.as_mut() { + Some(ping_pong) => loop { + if ping_pong.on_flight { + // When have on flight ping pong. poll pong and and keep alive timer. + // on success pong received update keep alive timer to determine the next timing of + // ping pong. + match ping_pong.ping_pong.poll_pong(cx)? { + Poll::Ready(_) => { + ping_pong.on_flight = false; - Poll::Ready(Ok(())) + let dead_line = + this.config.keep_alive_expire().unwrap(); + ping_pong.timer.as_mut().reset(dead_line); + } + Poll::Pending => { + return ping_pong + .timer + .as_mut() + .poll(cx) + .map(|_| Ok(())) + } + } + } else { + // When there is no on flight ping pong. keep alive timer is used to wait for next + // timing of ping pong. Therefore at this point it serves as an interval instead. + ready!(ping_pong.timer.as_mut().poll(cx)); + + ping_pong.ping_pong.send_ping(Ping::opaque())?; + + let dead_line = this.config.keep_alive_expire().unwrap(); + ping_pong.timer.as_mut().reset(dead_line); + + ping_pong.on_flight = true; + } + }, + None => return Poll::Pending, + }, + } + } } } diff --git a/actix-http/tests/test_h2_ping_pong.rs b/actix-http/tests/test_h2_ping_pong.rs new file mode 100644 index 000000000..5e03785a1 --- /dev/null +++ b/actix-http/tests/test_h2_ping_pong.rs @@ -0,0 +1,77 @@ +use std::io; + +use actix_http::{error::Error, HttpService, Response}; +use actix_server::Server; + +#[actix_rt::test] +async fn h2_ping_pong() -> io::Result<()> { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + + let lst = std::net::TcpListener::bind("127.0.0.1:0")?; + + let addr = lst.local_addr().unwrap(); + + let join = std::thread::spawn(move || { + actix_rt::System::new().block_on(async move { + let handle = Server::build() + .disable_signals() + .workers(1) + .listen("h2_ping_pong", lst, || { + HttpService::build() + .keep_alive(3) + .h2(|_| async { Ok::<_, Error>(Response::ok()) }) + .tcp() + })? + .run(); + + tx.send(handle.clone()).unwrap(); + + handle.await + }) + }); + + let handle = rx.recv().unwrap(); + + let (sync_tx, rx) = std::sync::mpsc::sync_channel(1); + + // use a separate thread for h2 client so it can be blocked. + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { + let stream = tokio::net::TcpStream::connect(addr).await.unwrap(); + + let (mut tx, conn) = h2::client::handshake(stream).await.unwrap(); + + tokio::spawn(async move { conn.await.unwrap() }); + + let (res, _) = tx.send_request(::http::Request::new(()), true).unwrap(); + let res = res.await.unwrap(); + + assert_eq!(res.status().as_u16(), 200); + + sync_tx.send(()).unwrap(); + + // intentionally block the client thread so it can not answer ping pong. + std::thread::sleep(std::time::Duration::from_secs(1000)); + }) + }); + + rx.recv().unwrap(); + + let now = std::time::Instant::now(); + + // stop server gracefully. this step would take up to 30 seconds. + handle.stop(true).await; + + // join server thread. only when connection are all gone this step would finish. + join.join().unwrap()?; + + // check the time used for join server thread so it's known that the server shutdown + // is from keep alive and not server graceful shutdown timeout. + assert!(now.elapsed() < std::time::Duration::from_secs(30)); + + Ok(()) +}