diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index bf0abb046..550d27c24 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -409,7 +409,7 @@ where if self.flags.contains(Flags::SHUTDOWN) { return Err(DispatchError::DisconnectTimeout); } else if self.ka_timer.as_mut().unwrap().deadline() >= self.ka_expire { - // check for any outstanding response processing + // check for any outstanding tasks if self.state.is_empty() && self.framed.is_write_buf_empty() { if self.flags.contains(Flags::STARTED) { trace!("Keep-alive timeout, close connection"); @@ -427,12 +427,16 @@ where } } else { // timeout on first request (slow request) return 408 - trace!("Slow request timeout"); - self.flags.insert(Flags::STARTED | Flags::DISCONNECTED); - let _ = self.send_response( - Response::RequestTimeout().finish().drop_body(), - (), - ); + if !self.flags.contains(Flags::STARTED) { + trace!("Slow request timeout"); + let _ = self.send_response( + Response::RequestTimeout().finish().drop_body(), + (), + ); + } else { + trace!("Keep-alive connection timeout"); + } + self.flags.insert(Flags::STARTED | Flags::SHUTDOWN); self.state = State::None; } } else if let Some(deadline) = self.config.keep_alive_expire() { @@ -493,11 +497,14 @@ where false } // disconnect if keep-alive is not enabled - else if inner.flags.contains(Flags::STARTED) && !inner - .flags - .intersects(Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED) + else if inner.flags.contains(Flags::STARTED) + && !inner.flags.intersects(Flags::KEEPALIVE) { true + } + // disconnect if shutdown + else if inner.flags.contains(Flags::SHUTDOWN) { + true } else { return Ok(Async::NotReady); } diff --git a/tests/test_server.rs b/tests/test_server.rs index c6e03e285..16c65aac1 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -4,7 +4,9 @@ extern crate actix_net; extern crate bytes; extern crate futures; -use std::{io::Read, io::Write, net}; +use std::io::{Read, Write}; +use std::time::Duration; +use std::{net, thread}; use actix_net::service::NewServiceExt; use bytes::Bytes; @@ -62,6 +64,132 @@ fn test_malformed_request() { assert!(data.starts_with("HTTP/1.1 400 Bad Request")); } +#[test] +fn test_keepalive() { + let srv = test::TestServer::with_factory(|| { + h1::H1Service::build() + .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .map(|_| ()) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); +} + +#[test] +fn test_keepalive_timeout() { + let srv = test::TestServer::with_factory(|| { + h1::H1Service::build() + .keep_alive(1) + .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .map(|_| ()) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + thread::sleep(Duration::from_millis(1100)); + + let mut data = vec![0; 1024]; + let res = stream.read(&mut data).unwrap(); + assert_eq!(res, 0); +} + +#[test] +fn test_keepalive_close() { + let srv = test::TestServer::with_factory(|| { + h1::H1Service::build() + .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .map(|_| ()) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = + stream.write_all(b"GET /test/tests/test HTTP/1.1\r\nconnection: close\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + + let mut data = vec![0; 1024]; + let res = stream.read(&mut data).unwrap(); + assert_eq!(res, 0); +} + +#[test] +fn test_keepalive_http10_default_close() { + let srv = test::TestServer::with_factory(|| { + h1::H1Service::build() + .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .map(|_| ()) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.0\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + + let mut data = vec![0; 1024]; + let res = stream.read(&mut data).unwrap(); + assert_eq!(res, 0); +} + +#[test] +fn test_keepalive_http10() { + let srv = test::TestServer::with_factory(|| { + h1::H1Service::build() + .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .map(|_| ()) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream + .write_all(b"GET /test/tests/test HTTP/1.0\r\nconnection: keep-alive\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.0\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + + let mut data = vec![0; 1024]; + let res = stream.read(&mut data).unwrap(); + assert_eq!(res, 0); +} + +#[test] +fn test_keepalive_disabled() { + let srv = test::TestServer::with_factory(|| { + h1::H1Service::build() + .keep_alive(KeepAlive::Disabled) + .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .map(|_| ()) + }); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n"); + let mut data = vec![0; 1024]; + let _ = stream.read(&mut data); + assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + + let mut data = vec![0; 1024]; + let res = stream.read(&mut data).unwrap(); + assert_eq!(res, 0); +} + #[test] fn test_content_length() { use actix_http::http::{