1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

add kee-alive tests

This commit is contained in:
Nikolay Kim 2018-11-20 10:55:50 -08:00
parent e1fc6dea84
commit 186d3d727a
2 changed files with 146 additions and 11 deletions

View File

@ -409,7 +409,7 @@ where
if self.flags.contains(Flags::SHUTDOWN) { if self.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout); return Err(DispatchError::DisconnectTimeout);
} else if self.ka_timer.as_mut().unwrap().deadline() >= self.ka_expire { } else if self.ka_timer.as_mut().unwrap().deadline() >= self.ka_expire {
// check for any outstanding response processing // check for any outstanding tasks
if self.state.is_empty() && self.framed.is_write_buf_empty() { if self.state.is_empty() && self.framed.is_write_buf_empty() {
if self.flags.contains(Flags::STARTED) { if self.flags.contains(Flags::STARTED) {
trace!("Keep-alive timeout, close connection"); trace!("Keep-alive timeout, close connection");
@ -427,12 +427,16 @@ where
} }
} else { } else {
// timeout on first request (slow request) return 408 // timeout on first request (slow request) return 408
if !self.flags.contains(Flags::STARTED) {
trace!("Slow request timeout"); trace!("Slow request timeout");
self.flags.insert(Flags::STARTED | Flags::DISCONNECTED);
let _ = self.send_response( let _ = self.send_response(
Response::RequestTimeout().finish().drop_body(), Response::RequestTimeout().finish().drop_body(),
(), (),
); );
} else {
trace!("Keep-alive connection timeout");
}
self.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
self.state = State::None; self.state = State::None;
} }
} else if let Some(deadline) = self.config.keep_alive_expire() { } else if let Some(deadline) = self.config.keep_alive_expire() {
@ -493,11 +497,14 @@ where
false false
} }
// disconnect if keep-alive is not enabled // disconnect if keep-alive is not enabled
else if inner.flags.contains(Flags::STARTED) && !inner else if inner.flags.contains(Flags::STARTED)
.flags && !inner.flags.intersects(Flags::KEEPALIVE)
.intersects(Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED)
{ {
true true
}
// disconnect if shutdown
else if inner.flags.contains(Flags::SHUTDOWN) {
true
} else { } else {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }

View File

@ -4,7 +4,9 @@ extern crate actix_net;
extern crate bytes; extern crate bytes;
extern crate futures; extern crate futures;
use std::{io::Read, io::Write, net}; use std::io::{Read, Write};
use std::time::Duration;
use std::{net, thread};
use actix_net::service::NewServiceExt; use actix_net::service::NewServiceExt;
use bytes::Bytes; use bytes::Bytes;
@ -62,6 +64,132 @@ fn test_malformed_request() {
assert!(data.starts_with("HTTP/1.1 400 Bad Request")); assert!(data.starts_with("HTTP/1.1 400 Bad Request"));
} }
#[test]
fn test_keepalive() {
let srv = test::TestServer::with_factory(|| {
h1::H1Service::build()
.finish(|_| future::ok::<_, ()>(Response::Ok().finish()))
.map(|_| ())
});
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
}
#[test]
fn test_keepalive_timeout() {
let srv = test::TestServer::with_factory(|| {
h1::H1Service::build()
.keep_alive(1)
.finish(|_| future::ok::<_, ()>(Response::Ok().finish()))
.map(|_| ())
});
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
thread::sleep(Duration::from_millis(1100));
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
}
#[test]
fn test_keepalive_close() {
let srv = test::TestServer::with_factory(|| {
h1::H1Service::build()
.finish(|_| future::ok::<_, ()>(Response::Ok().finish()))
.map(|_| ())
});
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ =
stream.write_all(b"GET /test/tests/test HTTP/1.1\r\nconnection: close\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
}
#[test]
fn test_keepalive_http10_default_close() {
let srv = test::TestServer::with_factory(|| {
h1::H1Service::build()
.finish(|_| future::ok::<_, ()>(Response::Ok().finish()))
.map(|_| ())
});
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.0\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
}
#[test]
fn test_keepalive_http10() {
let srv = test::TestServer::with_factory(|| {
h1::H1Service::build()
.finish(|_| future::ok::<_, ()>(Response::Ok().finish()))
.map(|_| ())
});
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream
.write_all(b"GET /test/tests/test HTTP/1.0\r\nconnection: keep-alive\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.0\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
}
#[test]
fn test_keepalive_disabled() {
let srv = test::TestServer::with_factory(|| {
h1::H1Service::build()
.keep_alive(KeepAlive::Disabled)
.finish(|_| future::ok::<_, ()>(Response::Ok().finish()))
.map(|_| ())
});
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
}
#[test] #[test]
fn test_content_length() { fn test_content_length() {
use actix_http::http::{ use actix_http::http::{