1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

handle socket shutdown for h1 connections

This commit is contained in:
Nikolay Kim 2019-03-18 09:44:48 -07:00
parent fd86d73a03
commit 00b7dc7887
7 changed files with 81 additions and 77 deletions

View File

@ -5,7 +5,7 @@ use bytes::Bytes;
use futures::future::{err, Either}; use futures::future::{err, Either};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use h2::{client::SendRequest, SendStream}; use h2::{client::SendRequest, SendStream};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, HttpTryFrom, Method, Version}; use http::{request::Request, HttpTryFrom, Method, Version};
use crate::body::{BodyLength, MessageBody}; use crate::body::{BodyLength, MessageBody};
@ -45,7 +45,7 @@ where
*req.version_mut() = Version::HTTP_2; *req.version_mut() = Version::HTTP_2;
let mut skip_len = true; let mut skip_len = true;
let mut has_date = false; // let mut has_date = false;
// Content length // Content length
let _ = match length { let _ = match length {
@ -72,7 +72,7 @@ where
match *key { match *key {
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
CONTENT_LENGTH if skip_len => continue, CONTENT_LENGTH if skip_len => continue,
DATE => has_date = true, // DATE => has_date = true,
_ => (), _ => (),
} }
req.headers_mut().append(key, value.clone()); req.headers_mut().append(key, value.clone());

View File

@ -85,7 +85,7 @@ impl ServiceConfig {
ka_enabled, ka_enabled,
client_timeout, client_timeout,
client_disconnect, client_disconnect,
timer: DateService::with(Duration::from_millis(500)), timer: DateService::new(),
})) }))
} }
@ -204,14 +204,12 @@ impl fmt::Write for Date {
struct DateService(Rc<DateServiceInner>); struct DateService(Rc<DateServiceInner>);
struct DateServiceInner { struct DateServiceInner {
interval: Duration,
current: UnsafeCell<Option<(Date, Instant)>>, current: UnsafeCell<Option<(Date, Instant)>>,
} }
impl DateServiceInner { impl DateServiceInner {
fn new(interval: Duration) -> Self { fn new() -> Self {
DateServiceInner { DateServiceInner {
interval,
current: UnsafeCell::new(None), current: UnsafeCell::new(None),
} }
} }
@ -232,8 +230,8 @@ impl DateServiceInner {
} }
impl DateService { impl DateService {
fn with(resolution: Duration) -> Self { fn new() -> Self {
DateService(Rc::new(DateServiceInner::new(resolution))) DateService(Rc::new(DateServiceInner::new()))
} }
fn check_date(&self) { fn check_date(&self) {

View File

@ -7,7 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_service::Service; use actix_service::Service;
use actix_utils::cloneable::CloneableService; use actix_utils::cloneable::CloneableService;
use bitflags::bitflags; use bitflags::bitflags;
use futures::{try_ready, Async, Future, Poll, Sink, Stream}; use futures::{Async, Future, Poll, Sink, Stream};
use log::{debug, error, trace}; use log::{debug, error, trace};
use tokio_timer::Delay; use tokio_timer::Delay;
@ -32,6 +32,7 @@ bitflags! {
const POLLED = 0b0000_1000; const POLLED = 0b0000_1000;
const SHUTDOWN = 0b0010_0000; const SHUTDOWN = 0b0010_0000;
const DISCONNECTED = 0b0100_0000; const DISCONNECTED = 0b0100_0000;
const DROPPING = 0b1000_0000;
} }
} }
@ -56,7 +57,6 @@ where
state: State<S, B>, state: State<S, B>,
payload: Option<PayloadSender>, payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>, messages: VecDeque<DispatcherMessage>,
unhandled: Option<Request>,
ka_expire: Instant, ka_expire: Instant,
ka_timer: Option<Delay>, ka_timer: Option<Delay>,
@ -131,7 +131,6 @@ where
state: State::None, state: State::None,
error: None, error: None,
messages: VecDeque::new(), messages: VecDeque::new(),
unhandled: None,
service, service,
flags, flags,
config, config,
@ -411,8 +410,19 @@ where
/// keep-alive timer /// keep-alive timer
fn poll_keepalive(&mut self) -> Result<(), DispatchError> { fn poll_keepalive(&mut self) -> Result<(), DispatchError> {
if self.ka_timer.is_none() { if self.ka_timer.is_none() {
return Ok(()); // shutdown timeout
if self.flags.contains(Flags::SHUTDOWN) {
if let Some(interval) = self.config.client_disconnect_timer() {
self.ka_timer = Some(Delay::new(interval));
} else {
self.flags.insert(Flags::DISCONNECTED);
return Ok(());
}
} else {
return Ok(());
}
} }
match self.ka_timer.as_mut().unwrap().poll().map_err(|e| { match self.ka_timer.as_mut().unwrap().poll().map_err(|e| {
error!("Timer error {:?}", e); error!("Timer error {:?}", e);
DispatchError::Unknown DispatchError::Unknown
@ -436,6 +446,8 @@ where
let _ = timer.poll(); let _ = timer.poll();
} }
} else { } else {
// no shutdown timeout, drop socket
self.flags.insert(Flags::DISCONNECTED);
return Ok(()); return Ok(());
} }
} else { } else {
@ -483,61 +495,55 @@ where
#[inline] #[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let shutdown = if let Some(ref mut inner) = self.inner { let inner = self.inner.as_mut().unwrap();
if inner.flags.contains(Flags::SHUTDOWN) {
inner.poll_keepalive()?; if inner.flags.contains(Flags::SHUTDOWN) {
try_ready!(inner.poll_flush()); inner.poll_keepalive()?;
true if inner.flags.contains(Flags::DISCONNECTED) {
Ok(Async::Ready(()))
} else { } else {
inner.poll_keepalive()?; // try_ready!(inner.poll_flush());
inner.poll_request()?; match inner.framed.get_mut().shutdown()? {
loop { Async::Ready(_) => Ok(Async::Ready(())),
inner.poll_response()?; Async::NotReady => Ok(Async::NotReady),
if let Async::Ready(false) = inner.poll_flush()? {
break;
}
}
if inner.flags.contains(Flags::DISCONNECTED) {
return Ok(Async::Ready(()));
}
// keep-alive and stream errors
if inner.state.is_empty() && inner.framed.is_write_buf_empty() {
if let Some(err) = inner.error.take() {
return Err(err);
}
// unhandled request (upgrade or connect)
else if inner.unhandled.is_some() {
false
}
// disconnect if keep-alive is not enabled
else if inner.flags.contains(Flags::STARTED)
&& !inner.flags.intersects(Flags::KEEPALIVE)
{
true
}
// disconnect if shutdown
else if inner.flags.contains(Flags::SHUTDOWN) {
true
} else {
return Ok(Async::NotReady);
}
} else {
return Ok(Async::NotReady);
} }
} }
} else { } else {
unreachable!() inner.poll_keepalive()?;
}; inner.poll_request()?;
loop {
inner.poll_response()?;
if let Async::Ready(false) = inner.poll_flush()? {
break;
}
}
let mut inner = self.inner.take().unwrap(); if inner.flags.contains(Flags::DISCONNECTED) {
return Ok(Async::Ready(()));
}
// TODO: shutdown // keep-alive and stream errors
Ok(Async::Ready(())) if inner.state.is_empty() && inner.framed.is_write_buf_empty() {
//Ok(Async::Ready(HttpServiceResult::Shutdown( if let Some(err) = inner.error.take() {
// inner.framed.into_inner(), return Err(err);
//))) }
// disconnect if keep-alive is not enabled
else if inner.flags.contains(Flags::STARTED)
&& !inner.flags.intersects(Flags::KEEPALIVE)
{
inner.flags.insert(Flags::SHUTDOWN);
self.poll()
}
// disconnect if shutdown
else if inner.flags.contains(Flags::SHUTDOWN) {
self.poll()
} else {
return Ok(Async::NotReady);
}
} else {
return Ok(Async::NotReady);
}
}
} }
} }

View File

@ -56,7 +56,7 @@ where
config: ServiceConfig, config: ServiceConfig,
timeout: Option<Delay>, timeout: Option<Delay>,
) -> Self { ) -> Self {
let keepalive = config.keep_alive_enabled(); // let keepalive = config.keep_alive_enabled();
// let flags = if keepalive { // let flags = if keepalive {
// Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED // Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED
// } else { // } else {

View File

@ -41,7 +41,7 @@ impl From<PayloadStream> for Payload {
impl<S> Payload<S> { impl<S> Payload<S> {
/// Takes current payload and replaces it with `None` value /// Takes current payload and replaces it with `None` value
fn take(&mut self) -> Payload<S> { pub fn take(&mut self) -> Payload<S> {
std::mem::replace(self, Payload::None) std::mem::replace(self, Payload::None)
} }
} }

View File

@ -169,7 +169,7 @@ where
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
let (io, params, proto) = req.into_parts(); let (io, _, proto) = req.into_parts();
match proto { match proto {
Protocol::Http2 => { Protocol::Http2 => {
let io = Io { let io = Io {

View File

@ -25,19 +25,19 @@ impl Protocol {
} }
} }
fn is_http(self) -> bool { // fn is_http(self) -> bool {
match self { // match self {
Protocol::Https | Protocol::Http => true, // Protocol::Https | Protocol::Http => true,
_ => false, // _ => false,
} // }
} // }
fn is_secure(self) -> bool { // fn is_secure(self) -> bool {
match self { // match self {
Protocol::Https | Protocol::Wss => true, // Protocol::Https | Protocol::Wss => true,
_ => false, // _ => false,
} // }
} // }
fn port(self) -> u16 { fn port(self) -> u16 {
match self { match self {