1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-22 17:07:19 +02:00

Compare commits

...

16 Commits

Author SHA1 Message Date
Nikolay Kim
f40153fca4 fix node::insert() method, missing next element 2018-09-17 11:39:03 -07:00
Nikolay Kim
764103566d update changes 2018-09-17 10:48:37 -07:00
Nikolay Kim
bfb2f2e9e1 fix node.remove(), update next node pointer 2018-09-17 10:25:45 -07:00
Nikolay Kim
599e6b3385 refactor channel node remove operation 2018-09-17 05:29:07 -07:00
Nikolay Kim
03e318f446 update changes 2018-09-15 17:10:53 -07:00
Nikolay Kim
7449884ce3 fix wrong error message for path deserialize for i32 #510 2018-09-15 17:09:07 -07:00
Nikolay Kim
bbe69e5b8d update version 2018-09-15 10:00:54 -07:00
Nikolay Kim
9d1eefc38f use 5 seconds keep-alive timer by default 2018-09-15 09:57:54 -07:00
Nikolay Kim
d65c72b44d use server keep-alive timer as slow request timer 2018-09-15 09:55:38 -07:00
Nikolay Kim
c3f8b5cf22 clippy warnings 2018-09-11 11:25:32 -07:00
Nikolay Kim
70a3f317d3 fix failing requests to test server #508 2018-09-11 11:24:05 -07:00
Nikolay Kim
513c8ec1ce Merge pull request #505 from Neopallium/master
Fix issue with HttpChannel linked list.
2018-09-11 11:18:33 -07:00
Robert G. Jakabosky
04608b2ea6 Update changes. 2018-09-12 00:27:15 +08:00
Robert G. Jakabosky
70b45659e2 Make Node's traverse method take a closure instead of calling shutdown on each HttpChannel. 2018-09-12 00:27:15 +08:00
Robert G. Jakabosky
e0ae6b10cd Fix bug with HttpChannel linked list. 2018-09-12 00:27:15 +08:00
Maciej Piechotka
003b05b095 Don't ignore errors in std::fmt::Debug implementations (#506) 2018-09-11 14:57:55 +03:00
17 changed files with 163 additions and 102 deletions

View File

@@ -1,5 +1,29 @@
# Changes # Changes
## [0.7.8] - 2018-09-17
### Added
* Use server `Keep-Alive` setting as slow request timeout #439
### Changed
* Use 5 seconds keep-alive timer by default.
### Fixed
* Fixed wrong error message for i16 type #510
## [0.7.7] - 2018-09-11
### Fixed
* Fix linked list of HttpChannels #504
* Fix requests to TestServer fail #508
## [0.7.6] - 2018-09-07 ## [0.7.6] - 2018-09-07
### Fixed ### Fixed

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-web" name = "actix-web"
version = "0.7.6" version = "0.7.8"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust." description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
readme = "README.md" readme = "README.md"

View File

@@ -254,16 +254,16 @@ impl ClientRequest {
impl fmt::Debug for ClientRequest { impl fmt::Debug for ClientRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!( writeln!(
f, f,
"\nClientRequest {:?} {}:{}", "\nClientRequest {:?} {}:{}",
self.version, self.method, self.uri self.version, self.method, self.uri
); )?;
let _ = writeln!(f, " headers:"); writeln!(f, " headers:")?;
for (key, val) in self.headers.iter() { for (key, val) in self.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val); writeln!(f, " {:?}: {:?}", key, val)?;
} }
res Ok(())
} }
} }
@@ -750,16 +750,16 @@ fn parts<'a>(
impl fmt::Debug for ClientRequestBuilder { impl fmt::Debug for ClientRequestBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref parts) = self.request { if let Some(ref parts) = self.request {
let res = writeln!( writeln!(
f, f,
"\nClientRequestBuilder {:?} {}:{}", "\nClientRequestBuilder {:?} {}:{}",
parts.version, parts.method, parts.uri parts.version, parts.method, parts.uri
); )?;
let _ = writeln!(f, " headers:"); writeln!(f, " headers:")?;
for (key, val) in parts.headers.iter() { for (key, val) in parts.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val); writeln!(f, " {:?}: {:?}", key, val)?;
} }
res Ok(())
} else { } else {
write!(f, "ClientRequestBuilder(Consumed)") write!(f, "ClientRequestBuilder(Consumed)")
} }

View File

@@ -95,12 +95,12 @@ impl ClientResponse {
impl fmt::Debug for ClientResponse { impl fmt::Debug for ClientResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status()); writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status())?;
let _ = writeln!(f, " headers:"); writeln!(f, " headers:")?;
for (key, val) in self.headers().iter() { for (key, val) in self.headers().iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val); writeln!(f, " {:?}: {:?}", key, val)?;
} }
res Ok(())
} }
} }

View File

@@ -175,7 +175,7 @@ impl<'de, S: 'de> Deserializer<'de> for PathDeserializer<'de, S> {
parse_single_value!(deserialize_bool, visit_bool, "bool"); parse_single_value!(deserialize_bool, visit_bool, "bool");
parse_single_value!(deserialize_i8, visit_i8, "i8"); parse_single_value!(deserialize_i8, visit_i8, "i8");
parse_single_value!(deserialize_i16, visit_i16, "i16"); parse_single_value!(deserialize_i16, visit_i16, "i16");
parse_single_value!(deserialize_i32, visit_i32, "i16"); parse_single_value!(deserialize_i32, visit_i32, "i32");
parse_single_value!(deserialize_i64, visit_i64, "i64"); parse_single_value!(deserialize_i64, visit_i64, "i64");
parse_single_value!(deserialize_u8, visit_u8, "u8"); parse_single_value!(deserialize_u8, visit_u8, "u8");
parse_single_value!(deserialize_u16, visit_u16, "u16"); parse_single_value!(deserialize_u16, visit_u16, "u16");

View File

@@ -354,24 +354,24 @@ impl<S> FromRequest<S> for HttpRequest<S> {
impl<S> fmt::Debug for HttpRequest<S> { impl<S> fmt::Debug for HttpRequest<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!( writeln!(
f, f,
"\nHttpRequest {:?} {}:{}", "\nHttpRequest {:?} {}:{}",
self.version(), self.version(),
self.method(), self.method(),
self.path() self.path()
); )?;
if !self.query_string().is_empty() { if !self.query_string().is_empty() {
let _ = writeln!(f, " query: ?{:?}", self.query_string()); writeln!(f, " query: ?{:?}", self.query_string())?;
} }
if !self.match_info().is_empty() { if !self.match_info().is_empty() {
let _ = writeln!(f, " params: {:?}", self.match_info()); writeln!(f, " params: {:?}", self.match_info())?;
} }
let _ = writeln!(f, " headers:"); writeln!(f, " headers:")?;
for (key, val) in self.headers().iter() { for (key, val) in self.headers().iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val); writeln!(f, " {:?}: {:?}", key, val)?;
} }
res Ok(())
} }
} }

View File

@@ -441,13 +441,13 @@ where
impl<S> fmt::Debug for Field<S> { impl<S> fmt::Debug for Field<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!(f, "\nMultipartField: {}", self.ct); writeln!(f, "\nMultipartField: {}", self.ct)?;
let _ = writeln!(f, " boundary: {}", self.inner.borrow().boundary); writeln!(f, " boundary: {}", self.inner.borrow().boundary)?;
let _ = writeln!(f, " headers:"); writeln!(f, " headers:")?;
for (key, val) in self.headers.iter() { for (key, val) in self.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val); writeln!(f, " {:?}: {:?}", key, val)?;
} }
res Ok(())
} }
} }

View File

@@ -832,7 +832,7 @@ impl ResourceDef {
}).expect("malformed param"); }).expect("malformed param");
let (mut param, rem) = pattern.split_at(close_idx + 1); let (mut param, rem) = pattern.split_at(close_idx + 1);
param = &param[1..param.len() - 1]; // Remove outer brackets param = &param[1..param.len() - 1]; // Remove outer brackets
let (name, pattern) = match param.find(":") { let (name, pattern) = match param.find(':') {
Some(idx) => { Some(idx) => {
let (name, pattern) = param.split_at(idx); let (name, pattern) = param.split_at(idx);
(name, &pattern[1..]) (name, &pattern[1..])
@@ -849,7 +849,7 @@ impl ResourceDef {
fn parse( fn parse(
mut pattern: &str, for_prefix: bool, mut pattern: &str, for_prefix: bool,
) -> (String, Vec<PatternElement>, bool, usize) { ) -> (String, Vec<PatternElement>, bool, usize) {
if pattern.find("{").is_none() { if pattern.find('{').is_none() {
return ( return (
String::from(pattern), String::from(pattern),
vec![PatternElement::Str(String::from(pattern))], vec![PatternElement::Str(String::from(pattern))],
@@ -861,7 +861,7 @@ impl ResourceDef {
let mut elems = Vec::new(); let mut elems = Vec::new();
let mut re = String::from("^"); let mut re = String::from("^");
while let Some(idx) = pattern.find("{") { while let Some(idx) = pattern.find('{') {
let (prefix, rem) = pattern.split_at(idx); let (prefix, rem) = pattern.split_at(idx);
elems.push(PatternElement::Str(String::from(prefix))); elems.push(PatternElement::Str(String::from(prefix)));
re.push_str(&escape(prefix)); re.push_str(&escape(prefix));

View File

@@ -821,11 +821,9 @@ mod tests {
scope scope
.route("/path1", Method::GET, |_: HttpRequest<_>| { .route("/path1", Method::GET, |_: HttpRequest<_>| {
HttpResponse::Ok() HttpResponse::Ok()
}).route( }).route("/path1", Method::DELETE, |_: HttpRequest<_>| {
"/path1", HttpResponse::Ok()
Method::DELETE, })
|_: HttpRequest<_>| HttpResponse::Ok(),
)
}).finish(); }).finish();
let req = TestRequest::with_uri("/app/path1").request(); let req = TestRequest::with_uri("/app/path1").request();

View File

@@ -451,10 +451,13 @@ impl Accept {
Delay::new( Delay::new(
Instant::now() + Duration::from_millis(510), Instant::now() + Duration::from_millis(510),
).map_err(|_| ()) ).map_err(|_| ())
.and_then(move |_| { .and_then(
let _ = r.set_readiness(mio::Ready::readable()); move |_| {
let _ =
r.set_readiness(mio::Ready::readable());
Ok(()) Ok(())
}), },
),
); );
Ok(()) Ok(())
}, },

View File

@@ -5,6 +5,7 @@ use std::{io, ptr, time};
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{h1, h2, ConnectionTag, HttpHandler, IoStream}; use super::{h1, h2, ConnectionTag, HttpHandler, IoStream};
@@ -30,6 +31,7 @@ where
{ {
proto: Option<HttpProtocol<T, H>>, proto: Option<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>, node: Option<Node<HttpChannel<T, H>>>,
ka_timeout: Option<Delay>,
_tag: ConnectionTag, _tag: ConnectionTag,
} }
@@ -42,9 +44,11 @@ where
settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>, settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> { ) -> HttpChannel<T, H> {
let _tag = settings.connection(); let _tag = settings.connection();
let ka_timeout = settings.keep_alive_timer();
HttpChannel { HttpChannel {
_tag, _tag,
ka_timeout,
node: None, node: None,
proto: Some(HttpProtocol::Unknown( proto: Some(HttpProtocol::Unknown(
settings, settings,
@@ -55,7 +59,7 @@ where
} }
} }
fn shutdown(&mut self) { pub(crate) fn shutdown(&mut self) {
match self.proto { match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => { Some(HttpProtocol::H1(ref mut h1)) => {
let io = h1.io(); let io = h1.io();
@@ -68,6 +72,18 @@ where
} }
} }
impl<T, H> Drop for HttpChannel<T, H>
where
T: IoStream,
H: HttpHandler + 'static,
{
fn drop(&mut self) {
if let Some(mut node) = self.node.take() {
node.remove()
}
}
}
impl<T, H> Future for HttpChannel<T, H> impl<T, H> Future for HttpChannel<T, H>
where where
T: IoStream, T: IoStream,
@@ -77,7 +93,19 @@ where
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.node.is_some() { // keep-alive timer
if let Some(ref mut timer) = self.ka_timeout {
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Slow request timed out, close connection");
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => (),
Err(_) => panic!("Something is really wrong"),
}
}
if self.node.is_none() {
let el = self as *mut _; let el = self as *mut _;
self.node = Some(Node::new(el)); self.node = Some(Node::new(el));
let _ = match self.proto { let _ = match self.proto {
@@ -97,28 +125,10 @@ where
let mut is_eof = false; let mut is_eof = false;
let kind = match self.proto { let kind = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => { Some(HttpProtocol::H1(ref mut h1)) => {
let result = h1.poll(); return h1.poll();
match result {
Ok(Async::Ready(())) | Err(_) => {
if let Some(n) = self.node.as_mut() {
n.remove()
};
}
_ => (),
}
return result;
} }
Some(HttpProtocol::H2(ref mut h2)) => { Some(HttpProtocol::H2(ref mut h2)) => {
let result = h2.poll(); return h2.poll();
match result {
Ok(Async::Ready(())) | Err(_) => {
if let Some(n) = self.node.as_mut() {
n.remove()
};
}
_ => (),
}
return result;
} }
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => { Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
let mut disconnect = false; let mut disconnect = false;
@@ -137,9 +147,6 @@ where
} }
if disconnect { if disconnect {
debug!("Ignored premature client disconnection"); debug!("Ignored premature client disconnection");
if let Some(n) = self.node.as_mut() {
n.remove()
};
return Err(()); return Err(());
} }
@@ -161,7 +168,12 @@ where
match kind { match kind {
ProtocolKind::Http1 => { ProtocolKind::Http1 => {
self.proto = Some(HttpProtocol::H1(h1::Http1::new( self.proto = Some(HttpProtocol::H1(h1::Http1::new(
settings, io, addr, buf, is_eof, settings,
io,
addr,
buf,
is_eof,
self.ka_timeout.take(),
))); )));
return self.poll(); return self.poll();
} }
@@ -171,6 +183,7 @@ where
io, io,
addr, addr,
buf.freeze(), buf.freeze(),
self.ka_timeout.take(),
))); )));
return self.poll(); return self.poll();
} }
@@ -195,13 +208,14 @@ impl<T> Node<T> {
} }
} }
fn insert<I>(&mut self, next: &mut Node<I>) { fn insert<I>(&mut self, next_el: &mut Node<I>) {
unsafe { unsafe {
let next: *mut Node<T> = next as *const _ as *mut _; let next: *mut Node<T> = next_el as *const _ as *mut _;
if let Some(ref mut next2) = self.next { if let Some(next2) = self.next {
let n = next2.as_mut().unwrap(); let n = next2.as_mut().unwrap();
n.prev = Some(next); n.prev = Some(next);
next_el.next = Some(next2 as *mut _);
} }
self.next = Some(next); self.next = Some(next);
@@ -214,11 +228,14 @@ impl<T> Node<T> {
unsafe { unsafe {
self.element = ptr::null_mut(); self.element = ptr::null_mut();
let next = self.next.take(); let next = self.next.take();
let mut prev = self.prev.take(); let prev = self.prev.take();
if let Some(ref mut prev) = prev { if let Some(prev) = prev {
prev.as_mut().unwrap().next = next; prev.as_mut().unwrap().next = next;
} }
if let Some(next) = next {
next.as_mut().unwrap().prev = prev;
}
} }
} }
} }
@@ -232,7 +249,7 @@ impl Node<()> {
} }
} }
pub(crate) fn traverse<T, H>(&self) pub(crate) fn traverse<T, H, F: Fn(&mut HttpChannel<T, H>)>(&self, f: F)
where where
T: IoStream, T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
@@ -247,7 +264,7 @@ impl Node<()> {
if !n.element.is_null() { if !n.element.is_null() {
let ch: &mut HttpChannel<T, H> = let ch: &mut HttpChannel<T, H> =
&mut *(&mut *(n.element as *mut _) as *mut () as *mut _); &mut *(&mut *(n.element as *mut _) as *mut () as *mut _);
ch.shutdown(); f(ch);
} }
} }
} else { } else {

View File

@@ -91,7 +91,7 @@ where
{ {
pub fn new( pub fn new(
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>, settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
buf: BytesMut, is_eof: bool, buf: BytesMut, is_eof: bool, keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
Http1 { Http1 {
flags: if is_eof { flags: if is_eof {
@@ -103,10 +103,10 @@ where
decoder: H1Decoder::new(), decoder: H1Decoder::new(),
payload: None, payload: None,
tasks: VecDeque::new(), tasks: VecDeque::new(),
keepalive_timer: None,
addr, addr,
buf, buf,
settings, settings,
keepalive_timer,
} }
} }
@@ -364,7 +364,7 @@ where
if self.keepalive_timer.is_none() && keep_alive > 0 { if self.keepalive_timer.is_none() && keep_alive > 0 {
trace!("Start keep-alive timer"); trace!("Start keep-alive timer");
let mut timer = let mut timer =
Delay::new(Instant::now() + Duration::new(keep_alive, 0)); Delay::new(Instant::now() + Duration::from_secs(keep_alive));
// register timer // register timer
let _ = timer.poll(); let _ = timer.poll();
self.keepalive_timer = Some(timer); self.keepalive_timer = Some(timer);
@@ -632,7 +632,7 @@ mod tests {
let readbuf = BytesMut::new(); let readbuf = BytesMut::new();
let settings = Rc::new(wrk_settings()); let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
h1.poll_io(); h1.poll_io();
h1.poll_io(); h1.poll_io();
assert_eq!(h1.tasks.len(), 1); assert_eq!(h1.tasks.len(), 1);
@@ -645,7 +645,7 @@ mod tests {
BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..])); BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
let settings = Rc::new(wrk_settings()); let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true, None);
h1.poll_io(); h1.poll_io();
assert_eq!(h1.tasks.len(), 1); assert_eq!(h1.tasks.len(), 1);
} }
@@ -656,7 +656,7 @@ mod tests {
let readbuf = BytesMut::new(); let readbuf = BytesMut::new();
let settings = Rc::new(wrk_settings()); let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
h1.poll_io(); h1.poll_io();
h1.poll_io(); h1.poll_io();
assert!(h1.flags.contains(Flags::ERROR)); assert!(h1.flags.contains(Flags::ERROR));

View File

@@ -59,6 +59,7 @@ where
{ {
pub fn new( pub fn new(
settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes, settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes,
keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
let extensions = io.extensions(); let extensions = io.extensions();
Http2 { Http2 {
@@ -68,10 +69,10 @@ where
unread: if buf.is_empty() { None } else { Some(buf) }, unread: if buf.is_empty() { None } else { Some(buf) },
inner: io, inner: io,
})), })),
keepalive_timer: None,
addr, addr,
settings, settings,
extensions, extensions,
keepalive_timer,
} }
} }

View File

@@ -3,12 +3,11 @@ use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::{io, mem, net, time}; use std::{io, mem, net, time};
use actix::{Actor, Addr, AsyncContext, Context, Handler, System}; use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System};
use futures::{Future, Stream}; use futures::{Future, Stream};
use net2::{TcpBuilder, TcpStreamExt}; use net2::{TcpBuilder, TcpStreamExt};
use num_cpus; use num_cpus;
use tokio_current_thread::spawn;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
@@ -71,7 +70,7 @@ where
factory: Arc::new(f), factory: Arc::new(f),
host: None, host: None,
backlog: 2048, backlog: 2048,
keep_alive: KeepAlive::Os, keep_alive: KeepAlive::Timeout(5),
shutdown_timeout: 30, shutdown_timeout: 30,
exit: false, exit: false,
no_http2: false, no_http2: false,
@@ -132,7 +131,7 @@ where
/// Set server keep-alive setting. /// Set server keep-alive setting.
/// ///
/// By default keep alive is set to a `Os`. /// By default keep alive is set to a 5 seconds.
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self { pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
self.keep_alive = val.into(); self.keep_alive = val.into();
self self
@@ -585,7 +584,7 @@ where
type Result = (); type Result = ();
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
spawn(HttpChannel::new( Arbiter::spawn(HttpChannel::new(
Rc::clone(&self.settings), Rc::clone(&self.settings),
msg.io, msg.io,
msg.peer, msg.peer,
@@ -637,7 +636,9 @@ where
fn shutdown(&self, force: bool) { fn shutdown(&self, force: bool) {
if force { if force {
self.settings.head().traverse::<TcpStream, H>(); self.settings
.head()
.traverse(|ch: &mut HttpChannel<TcpStream, H>| ch.shutdown());
} }
} }
} }
@@ -693,7 +694,7 @@ where
}; };
let _ = io.set_nodelay(true); let _ = io.set_nodelay(true);
spawn(HttpChannel::new(h, io, peer)); Arbiter::spawn(HttpChannel::new(h, io, peer));
} }
} }
@@ -753,10 +754,10 @@ where
let _ = io.set_nodelay(true); let _ = io.set_nodelay(true);
let rate = h.connection_rate(); let rate = h.connection_rate();
spawn(self.acceptor.accept(io).then(move |res| { Arbiter::spawn(self.acceptor.accept(io).then(move |res| {
drop(rate); drop(rate);
match res { match res {
Ok(io) => spawn(HttpChannel::new(h, io, peer)), Ok(io) => Arbiter::spawn(HttpChannel::new(h, io, peer)),
Err(err) => trace!("Can not establish connection: {}", err), Err(err) => trace!("Can not establish connection: {}", err),
} }
Ok(()) Ok(())

View File

@@ -13,7 +13,7 @@ use http::StatusCode;
use lazycell::LazyCell; use lazycell::LazyCell;
use parking_lot::Mutex; use parking_lot::Mutex;
use time; use time;
use tokio_timer::Interval; use tokio_timer::{Delay, Interval};
use super::channel::Node; use super::channel::Node;
use super::message::{Request, RequestPool}; use super::message::{Request, RequestPool};
@@ -197,6 +197,16 @@ impl<H> WorkerSettings<H> {
&self.h &self.h
} }
pub fn keep_alive_timer(&self) -> Option<Delay> {
if self.keep_alive != 0 {
Some(Delay::new(
Instant::now() + Duration::from_secs(self.keep_alive),
))
} else {
None
}
}
pub fn keep_alive(&self) -> u64 { pub fn keep_alive(&self) -> u64 {
self.keep_alive self.keep_alive
} }

View File

@@ -120,6 +120,7 @@ impl TestServer {
HttpServer::new(factory) HttpServer::new(factory)
.disable_signals() .disable_signals()
.listen(tcp) .listen(tcp)
.keep_alive(5)
.start(); .start();
tx.send((System::current(), local_addr, TestServer::get_conn())) tx.send((System::current(), local_addr, TestServer::get_conn()))
@@ -328,6 +329,7 @@ impl<S: 'static> TestServerBuilder<S> {
config(&mut app); config(&mut app);
vec![app] vec![app]
}).workers(1) }).workers(1)
.keep_alive(5)
.disable_signals(); .disable_signals();
tx.send((System::current(), addr, TestServer::get_conn())) tx.send((System::current(), addr, TestServer::get_conn()))

View File

@@ -407,24 +407,29 @@ fn test_client_cookie_handling() {
let cookie2 = cookie2b.clone(); let cookie2 = cookie2b.clone();
app.handler(move |req: &HttpRequest| { app.handler(move |req: &HttpRequest| {
// Check cookies were sent correctly // Check cookies were sent correctly
req.cookie("cookie1").ok_or_else(err) req.cookie("cookie1")
.and_then(|c1| if c1.value() == "value1" { .ok_or_else(err)
.and_then(|c1| {
if c1.value() == "value1" {
Ok(()) Ok(())
} else { } else {
Err(err()) Err(err())
}) }
.and_then(|()| req.cookie("cookie2").ok_or_else(err)) }).and_then(|()| req.cookie("cookie2").ok_or_else(err))
.and_then(|c2| if c2.value() == "value2" { .and_then(|c2| {
if c2.value() == "value2" {
Ok(()) Ok(())
} else { } else {
Err(err()) Err(err())
}
}) })
// Send some cookies back // Send some cookies back
.map(|_| HttpResponse::Ok() .map(|_| {
HttpResponse::Ok()
.cookie(cookie1.clone()) .cookie(cookie1.clone())
.cookie(cookie2.clone()) .cookie(cookie2.clone())
.finish() .finish()
) })
}) })
}); });