1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-30 18:44:35 +01:00

add keep alive to h2 through ping pong (#2433)

This commit is contained in:
fakeshadow 2021-11-04 23:15:23 +08:00 committed by GitHub
parent ec6d284a8e
commit 6ec2d7b909
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 180 additions and 44 deletions

View File

@ -10,11 +10,15 @@ use std::{
}; };
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::Sleep;
use actix_service::Service; use actix_service::Service;
use actix_utils::future::poll_fn; use actix_utils::future::poll_fn;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::ready; use futures_core::ready;
use h2::server::{Connection, SendResponse}; use h2::{
server::{Connection, SendResponse},
Ping, PingPong,
};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use log::{error, trace}; use log::{error, trace};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -36,29 +40,46 @@ pin_project! {
on_connect_data: OnConnectData, on_connect_data: OnConnectData,
config: ServiceConfig, config: ServiceConfig,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
_phantom: PhantomData<B>, ping_pong: Option<H2PingPong>,
_phantom: PhantomData<B>
} }
} }
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> { impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn new( pub(crate) fn new(
flow: Rc<HttpFlow<S, X, U>>, flow: Rc<HttpFlow<S, X, U>>,
connection: Connection<T, Bytes>, mut connection: Connection<T, Bytes>,
on_connect_data: OnConnectData, on_connect_data: OnConnectData,
config: ServiceConfig, config: ServiceConfig,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
) -> Self { ) -> Self {
let ping_pong = config.keep_alive_timer().map(|timer| H2PingPong {
timer: Box::pin(timer),
on_flight: false,
ping_pong: connection.ping_pong().unwrap(),
});
Self { Self {
flow, flow,
config, config,
peer_addr, peer_addr,
connection, connection,
on_connect_data, on_connect_data,
ping_pong,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
} }
struct H2PingPong {
timer: Pin<Box<Sleep>>,
on_flight: bool,
ping_pong: PingPong,
}
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
@ -77,9 +98,9 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
while let Some((req, tx)) = loop {
ready!(Pin::new(&mut this.connection).poll_accept(cx)?) match Pin::new(&mut this.connection).poll_accept(cx)? {
{ Poll::Ready(Some((req, tx))) => {
let (parts, body) = req.into_parts(); let (parts, body) = req.into_parts();
let pl = crate::h2::Payload::new(body); let pl = crate::h2::Payload::new(body);
let pl = Payload::<crate::payload::PayloadStream>::H2(pl); let pl = Payload::<crate::payload::PayloadStream>::H2(pl);
@ -123,8 +144,46 @@ where
} }
}); });
} }
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => match this.ping_pong.as_mut() {
Some(ping_pong) => loop {
if ping_pong.on_flight {
// When have on flight ping pong. poll pong and and keep alive timer.
// on success pong received update keep alive timer to determine the next timing of
// ping pong.
match ping_pong.ping_pong.poll_pong(cx)? {
Poll::Ready(_) => {
ping_pong.on_flight = false;
Poll::Ready(Ok(())) let dead_line =
this.config.keep_alive_expire().unwrap();
ping_pong.timer.as_mut().reset(dead_line);
}
Poll::Pending => {
return ping_pong
.timer
.as_mut()
.poll(cx)
.map(|_| Ok(()))
}
}
} else {
// When there is no on flight ping pong. keep alive timer is used to wait for next
// timing of ping pong. Therefore at this point it serves as an interval instead.
ready!(ping_pong.timer.as_mut().poll(cx));
ping_pong.ping_pong.send_ping(Ping::opaque())?;
let dead_line = this.config.keep_alive_expire().unwrap();
ping_pong.timer.as_mut().reset(dead_line);
ping_pong.on_flight = true;
}
},
None => return Poll::Pending,
},
}
}
} }
} }

View File

@ -0,0 +1,77 @@
use std::io;
use actix_http::{error::Error, HttpService, Response};
use actix_server::Server;
#[actix_rt::test]
async fn h2_ping_pong() -> io::Result<()> {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let lst = std::net::TcpListener::bind("127.0.0.1:0")?;
let addr = lst.local_addr().unwrap();
let join = std::thread::spawn(move || {
actix_rt::System::new().block_on(async move {
let handle = Server::build()
.disable_signals()
.workers(1)
.listen("h2_ping_pong", lst, || {
HttpService::build()
.keep_alive(3)
.h2(|_| async { Ok::<_, Error>(Response::ok()) })
.tcp()
})?
.run();
tx.send(handle.clone()).unwrap();
handle.await
})
});
let handle = rx.recv().unwrap();
let (sync_tx, rx) = std::sync::mpsc::sync_channel(1);
// use a separate thread for h2 client so it can be blocked.
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut tx, conn) = h2::client::handshake(stream).await.unwrap();
tokio::spawn(async move { conn.await.unwrap() });
let (res, _) = tx.send_request(::http::Request::new(()), true).unwrap();
let res = res.await.unwrap();
assert_eq!(res.status().as_u16(), 200);
sync_tx.send(()).unwrap();
// intentionally block the client thread so it can not answer ping pong.
std::thread::sleep(std::time::Duration::from_secs(1000));
})
});
rx.recv().unwrap();
let now = std::time::Instant::now();
// stop server gracefully. this step would take up to 30 seconds.
handle.stop(true).await;
// join server thread. only when connection are all gone this step would finish.
join.join().unwrap()?;
// check the time used for join server thread so it's known that the server shutdown
// is from keep alive and not server graceful shutdown timeout.
assert!(now.elapsed() < std::time::Duration::from_secs(30));
Ok(())
}