1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-28 01:52:57 +01:00

drop connection if handler get dropped without consuming payload

This commit is contained in:
Nikolay Kim 2018-02-27 16:08:57 -08:00
parent 9b06eac720
commit e2c8f17c2c
5 changed files with 35 additions and 30 deletions

View File

@ -71,24 +71,12 @@ impl ClientResponse {
self.as_ref().version self.as_ref().version
} }
/// Get a mutable reference to the headers.
#[inline]
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.as_mut().headers
}
/// Get the status from the server. /// Get the status from the server.
#[inline] #[inline]
pub fn status(&self) -> StatusCode { pub fn status(&self) -> StatusCode {
self.as_ref().status self.as_ref().status
} }
/// Set the `StatusCode` for this response.
#[inline]
pub fn set_status(&mut self, status: StatusCode) {
self.as_mut().status = status
}
/// Load request cookies. /// Load request cookies.
pub fn cookies(&self) -> Result<&Vec<Cookie<'static>>, CookieParseError> { pub fn cookies(&self) -> Result<&Vec<Cookie<'static>>, CookieParseError> {
if self.as_ref().cookies.is_none() { if self.as_ref().cookies.is_none() {

View File

@ -8,6 +8,13 @@ use futures::{Async, Poll, Stream};
use error::PayloadError; use error::PayloadError;
#[derive(Debug, PartialEq)]
pub(crate) enum PayloadStatus {
Read,
Pause,
Dropped,
}
/// Buffered stream of bytes chunks /// Buffered stream of bytes chunks
/// ///
/// Payload stores chunks in a vector. First chunk can be received with `.readany()` method. /// Payload stores chunks in a vector. First chunk can be received with `.readany()` method.
@ -100,7 +107,7 @@ pub(crate) trait PayloadWriter {
fn feed_data(&mut self, data: Bytes); fn feed_data(&mut self, data: Bytes);
/// Need read data /// Need read data
fn need_read(&self) -> bool; fn need_read(&self) -> PayloadStatus;
} }
/// Sender part of the payload stream /// Sender part of the payload stream
@ -129,11 +136,17 @@ impl PayloadWriter for PayloadSender {
} }
#[inline] #[inline]
fn need_read(&self) -> bool { fn need_read(&self) -> PayloadStatus {
// we check need_read only if Payload (other side) is alive,
// otherwise always return true (consume payload)
if let Some(shared) = self.inner.upgrade() { if let Some(shared) = self.inner.upgrade() {
shared.borrow().need_read if shared.borrow().need_read {
PayloadStatus::Read
} else {
PayloadStatus::Pause
}
} else { } else {
false PayloadStatus::Dropped
} }
} }
} }

View File

@ -18,7 +18,7 @@ use body::{Body, Binary};
use error::PayloadError; use error::PayloadError;
use httprequest::HttpInnerMessage; use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadWriter}; use payload::{PayloadSender, PayloadWriter, PayloadStatus};
use super::shared::SharedBytes; use super::shared::SharedBytes;
@ -120,7 +120,7 @@ impl PayloadWriter for PayloadType {
} }
#[inline] #[inline]
fn need_read(&self) -> bool { fn need_read(&self) -> PayloadStatus {
match *self { match *self {
PayloadType::Sender(ref sender) => sender.need_read(), PayloadType::Sender(ref sender) => sender.need_read(),
PayloadType::Encoding(ref enc) => enc.need_read(), PayloadType::Encoding(ref enc) => enc.need_read(),
@ -352,7 +352,7 @@ impl PayloadWriter for EncodedPayload {
} }
#[inline] #[inline]
fn need_read(&self) -> bool { fn need_read(&self) -> PayloadStatus {
self.inner.need_read() self.inner.need_read()
} }
} }

View File

@ -18,7 +18,7 @@ use pipeline::Pipeline;
use httpcodes::HTTPNotFound; use httpcodes::HTTPNotFound;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use error::{ParseError, PayloadError, ResponseError}; use error::{ParseError, PayloadError, ResponseError};
use payload::{Payload, PayloadWriter}; use payload::{Payload, PayloadWriter, PayloadStatus};
use super::{utils, Writer}; use super::{utils, Writer};
use super::h1writer::H1Writer; use super::h1writer::H1Writer;
@ -190,7 +190,7 @@ impl<T, H> Http1<T, H>
true true
}; };
let retry = self.reader.need_read(); let retry = self.reader.need_read() == PayloadStatus::Read;
loop { loop {
// check in-flight messages // check in-flight messages
@ -227,7 +227,7 @@ impl<T, H> Http1<T, H>
}, },
// no more IO for this iteration // no more IO for this iteration
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
if self.reader.need_read() && !retry { if self.reader.need_read() == PayloadStatus::Read && !retry {
return Ok(Async::Ready(true)); return Ok(Async::Ready(true));
} }
io = true; io = true;
@ -341,6 +341,7 @@ struct PayloadInfo {
enum ReaderError { enum ReaderError {
Disconnect, Disconnect,
Payload, Payload,
PayloadDropped,
Error(ParseError), Error(ParseError),
} }
@ -352,11 +353,11 @@ impl Reader {
} }
#[inline] #[inline]
fn need_read(&self) -> bool { fn need_read(&self) -> PayloadStatus {
if let Some(ref info) = self.payload { if let Some(ref info) = self.payload {
info.tx.need_read() info.tx.need_read()
} else { } else {
true PayloadStatus::Read
} }
} }
@ -392,8 +393,10 @@ impl Reader {
settings: &WorkerSettings<H>) -> Poll<HttpRequest, ReaderError> settings: &WorkerSettings<H>) -> Poll<HttpRequest, ReaderError>
where T: IoStream where T: IoStream
{ {
if !self.need_read() { match self.need_read() {
return Ok(Async::NotReady) PayloadStatus::Read => (),
PayloadStatus::Pause => return Ok(Async::NotReady),
PayloadStatus::Dropped => return Err(ReaderError::PayloadDropped),
} }
// read payload // read payload

View File

@ -21,7 +21,7 @@ use error::PayloadError;
use httpcodes::HTTPNotFound; use httpcodes::HTTPNotFound;
use httpmessage::HttpMessage; use httpmessage::HttpMessage;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use payload::{Payload, PayloadWriter}; use payload::{Payload, PayloadWriter, PayloadStatus};
use super::h2writer::H2Writer; use super::h2writer::H2Writer;
use super::encoding::PayloadType; use super::encoding::PayloadType;
@ -105,7 +105,7 @@ impl<T, H> Http2<T, H>
item.poll_payload(); item.poll_payload();
if !item.flags.contains(EntryFlags::EOF) { if !item.flags.contains(EntryFlags::EOF) {
let retry = item.payload.need_read(); let retry = item.payload.need_read() == PayloadStatus::Read;
loop { loop {
match item.task.poll_io(&mut item.stream) { match item.task.poll_io(&mut item.stream) {
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
@ -116,7 +116,8 @@ impl<T, H> Http2<T, H>
not_ready = false; not_ready = false;
}, },
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
if item.payload.need_read() && !retry { if item.payload.need_read() == PayloadStatus::Read && !retry
{
continue continue
} }
}, },
@ -307,7 +308,7 @@ impl Entry {
fn poll_payload(&mut self) { fn poll_payload(&mut self) {
if !self.flags.contains(EntryFlags::REOF) { if !self.flags.contains(EntryFlags::REOF) {
if self.payload.need_read() { if self.payload.need_read() == PayloadStatus::Read {
if let Err(err) = self.recv.release_capacity().release_capacity(32_768) { if let Err(err) = self.recv.release_capacity().release_capacity(32_768) {
self.payload.set_error(PayloadError::Http2(err)) self.payload.set_error(PayloadError::Http2(err))
} }