From cfad5bf1f343a472f8dbaf9c6dba525c59cb19b9 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 8 Oct 2018 07:47:42 -0700 Subject: [PATCH] enable slow request timeout for h2 dispatcher --- src/server/h1.rs | 27 +++++------- src/server/h2.rs | 106 +++++++++++++++++++++++++++------------------ src/server/http.rs | 5 --- src/server/mod.rs | 14 ++---- 4 files changed, 78 insertions(+), 74 deletions(-) diff --git a/src/server/h1.rs b/src/server/h1.rs index 4fb730f7..0fb72ef7 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -203,7 +203,7 @@ where #[inline] pub fn poll(&mut self) -> Poll<(), HttpDispatchError> { // check connection keep-alive - self.poll_keep_alive()?; + self.poll_keepalive()?; // shutdown if self.flags.contains(Flags::SHUTDOWN) { @@ -277,23 +277,21 @@ where } /// keep-alive timer. returns `true` is keep-alive, otherwise drop - fn poll_keep_alive(&mut self) -> Result<(), HttpDispatchError> { + fn poll_keepalive(&mut self) -> Result<(), HttpDispatchError> { if let Some(ref mut timer) = self.ka_timer { match timer.poll() { Ok(Async::Ready(_)) => { + // if we get timer during shutdown, just drop connection + if self.flags.contains(Flags::SHUTDOWN) { + let io = self.stream.get_mut(); + let _ = IoStream::set_linger(io, Some(Duration::from_secs(0))); + let _ = IoStream::shutdown(io, Shutdown::Both); + return Err(HttpDispatchError::ShutdownTimeout); + } if timer.deadline() >= self.ka_expire { // check for any outstanding request handling if self.tasks.is_empty() { - // if we get timer during shutdown, just drop connection - if self.flags.contains(Flags::SHUTDOWN) { - let io = self.stream.get_mut(); - let _ = IoStream::set_linger( - io, - Some(Duration::from_secs(0)), - ); - let _ = IoStream::shutdown(io, Shutdown::Both); - return Err(HttpDispatchError::ShutdownTimeout); - } else if !self.flags.contains(Flags::STARTED) { + if !self.flags.contains(Flags::STARTED) { // timeout on first request (slow request) return 408 trace!("Slow request timeout"); self.flags @@ -315,9 +313,8 @@ where return Ok(()); } } - } else if let Some(deadline) = self.settings.keep_alive_expire() - { - timer.reset(deadline) + } else if let Some(dl) = self.settings.keep_alive_expire() { + timer.reset(dl) } } else { timer.reset(self.ka_expire) diff --git a/src/server/h2.rs b/src/server/h2.rs index 2fe2fa07..6ad9af70 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -27,7 +27,8 @@ use super::{HttpHandler, HttpHandlerTask, IoStream, Writer}; bitflags! { struct Flags: u8 { - const DISCONNECTED = 0b0000_0010; + const DISCONNECTED = 0b0000_0001; + const SHUTDOWN = 0b0000_0010; } } @@ -42,8 +43,9 @@ where addr: Option, state: State>, tasks: VecDeque>, - keepalive_timer: Option, extensions: Option>, + ka_expire: Instant, + ka_timer: Option, } enum State { @@ -62,6 +64,16 @@ where ) -> Self { let addr = io.peer_addr(); let extensions = io.extensions(); + + // keep-alive timeout + let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer { + (delay.deadline(), Some(delay)) + } else if let Some(delay) = settings.keep_alive_timer() { + (delay.deadline(), Some(delay)) + } else { + (settings.now(), None) + }; + Http2 { flags: Flags::empty(), tasks: VecDeque::new(), @@ -72,14 +84,14 @@ where addr, settings, extensions, - keepalive_timer, + ka_expire, + ka_timer, } } pub(crate) fn shutdown(&mut self) { self.state = State::Empty; self.tasks.clear(); - self.keepalive_timer.take(); } pub fn settings(&self) -> &ServiceConfig { @@ -87,21 +99,16 @@ where } pub fn poll(&mut self) -> Poll<(), HttpDispatchError> { + self.poll_keepalive()?; + // server if let State::Connection(ref mut conn) = self.state { - // keep-alive timer - if let Some(ref mut timeout) = self.keepalive_timer { - match timeout.poll() { - Ok(Async::Ready(_)) => { - trace!("Keep-alive timeout, close connection"); - return Ok(Async::Ready(())); - } - Ok(Async::NotReady) => (), - Err(_) => unreachable!(), - } - } - loop { + // shutdown connection + if self.flags.contains(Flags::SHUTDOWN) { + return conn.poll_close().map_err(|e| e.into()); + } + let mut not_ready = true; let disconnected = self.flags.contains(Flags::DISCONNECTED); @@ -216,8 +223,12 @@ where not_ready = false; let (parts, body) = req.into_parts(); - // stop keepalive timer - self.keepalive_timer.take(); + // update keep-alive expire + if self.ka_timer.is_some() { + if let Some(expire) = self.settings.keep_alive_expire() { + self.ka_expire = expire; + } + } self.tasks.push_back(Entry::new( parts, @@ -228,36 +239,14 @@ where self.extensions.clone(), )); } - Ok(Async::NotReady) => { - // start keep-alive timer - if self.tasks.is_empty() { - if self.settings.keep_alive_enabled() { - if self.keepalive_timer.is_none() { - if let Some(ka) = self.settings.keep_alive() { - trace!("Start keep-alive timer"); - let mut timeout = - Delay::new(Instant::now() + ka); - // register timeout - let _ = timeout.poll(); - self.keepalive_timer = Some(timeout); - } - } - } else { - // keep-alive disable, drop connection - return conn.poll_close().map_err(|e| e.into()); - } - } else { - // keep-alive unset, rely on operating system - return Ok(Async::NotReady); - } - } + Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { trace!("Connection error: {}", err); - self.flags.insert(Flags::DISCONNECTED); + self.flags.insert(Flags::SHUTDOWN); for entry in &mut self.tasks { entry.task.disconnected() } - self.keepalive_timer.take(); + continue; } } } @@ -289,6 +278,37 @@ where self.poll() } + + /// keep-alive timer. returns `true` is keep-alive, otherwise drop + fn poll_keepalive(&mut self) -> Result<(), HttpDispatchError> { + if let Some(ref mut timer) = self.ka_timer { + match timer.poll() { + Ok(Async::Ready(_)) => { + // if we get timer during shutdown, just drop connection + if self.flags.contains(Flags::SHUTDOWN) { + return Err(HttpDispatchError::ShutdownTimeout); + } + if timer.deadline() >= self.ka_expire { + // check for any outstanding request handling + if self.tasks.is_empty() { + return Err(HttpDispatchError::ShutdownTimeout); + } else if let Some(dl) = self.settings.keep_alive_expire() { + timer.reset(dl) + } + } else { + timer.reset(self.ka_expire) + } + } + Ok(Async::NotReady) => (), + Err(e) => { + error!("Timer error {:?}", e); + return Err(HttpDispatchError::Unknown); + } + } + } + + Ok(()) + } } bitflags! { diff --git a/src/server/http.rs b/src/server/http.rs index 6a7790c1..9ecd4a5d 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -197,11 +197,6 @@ where } /// Disable `HTTP/2` support - // #[doc(hidden)] - // #[deprecated( - // since = "0.7.4", - // note = "please use acceptor service with proper ServerFlags parama" - // )] pub fn no_http2(mut self) -> Self { self.no_http2 = true; self diff --git a/src/server/mod.rs b/src/server/mod.rs index 3277dba5..8d719516 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -12,7 +12,7 @@ //! to serve incoming HTTP requests. //! //! As the server uses worker pool, the factory function is restricted to trait bounds -//! `Sync + Send + 'static` so that each worker would be able to accept Application +//! `Send + Clone + 'static` so that each worker would be able to accept Application //! without a need for synchronization. //! //! If you wish to share part of state among all workers you should @@ -29,13 +29,9 @@ //! Each TLS implementation is provided with [AcceptorService](trait.AcceptorService.html) //! that describes how HTTP Server accepts connections. //! -//! For `bind` and `listen` there are corresponding `bind_with` and `listen_with` that accepts +//! For `bind` and `listen` there are corresponding `bind_ssl|tls|rustls` and `listen_ssl|tls|rustls` that accepts //! these services. //! -//! By default, acceptor would work with both HTTP2 and HTTP1 protocols. -//! But it can be controlled using [ServerFlags](struct.ServerFlags.html) which -//! can be supplied when creating `AcceptorService`. -//! //! **NOTE:** `native-tls` doesn't support `HTTP2` yet //! //! ## Signal handling and shutdown @@ -87,17 +83,13 @@ //! // load ssl keys //! let config = load_ssl(); //! -//! // Create acceptor service for only HTTP1 protocol -//! // You can use ::new(config) to leave defaults -//! let acceptor = server::RustlsAcceptor::with_flags(config, actix_web::server::ServerFlags::HTTP1); -//! //! // create and start server at once //! server::new(|| { //! App::new() //! // register simple handler, handle all methods //! .resource("/index.html", |r| r.f(index)) //! })) -//! }).bind_with("127.0.0.1:8080", acceptor) +//! }).bind_rustls("127.0.0.1:8443", config) //! .unwrap() //! .start(); //!