mirror of
https://github.com/actix/actix-extras.git
synced 2025-01-23 15:24:36 +01:00
handle socket read disconnect
This commit is contained in:
parent
e59abfd716
commit
2d51831899
@ -6,6 +6,11 @@
|
|||||||
|
|
||||||
* Added the ability to pass a custom `TlsConnector`.
|
* Added the ability to pass a custom `TlsConnector`.
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Handle socket read disconnect
|
||||||
|
|
||||||
|
|
||||||
## [0.7.4] - 2018-08-23
|
## [0.7.4] - 2018-08-23
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@ -22,13 +22,14 @@ use super::{HttpHandler, HttpHandlerTask, IoStream};
|
|||||||
const MAX_PIPELINED_MESSAGES: usize = 16;
|
const MAX_PIPELINED_MESSAGES: usize = 16;
|
||||||
|
|
||||||
bitflags! {
|
bitflags! {
|
||||||
struct Flags: u8 {
|
pub struct Flags: u8 {
|
||||||
const STARTED = 0b0000_0001;
|
const STARTED = 0b0000_0001;
|
||||||
const ERROR = 0b0000_0010;
|
const ERROR = 0b0000_0010;
|
||||||
const KEEPALIVE = 0b0000_0100;
|
const KEEPALIVE = 0b0000_0100;
|
||||||
const SHUTDOWN = 0b0000_1000;
|
const SHUTDOWN = 0b0000_1000;
|
||||||
const DISCONNECTED = 0b0001_0000;
|
const READ_DISCONNECTED = 0b0001_0000;
|
||||||
const POLLED = 0b0010_0000;
|
const WRITE_DISCONNECTED = 0b0010_0000;
|
||||||
|
const POLLED = 0b0100_0000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,7 +94,7 @@ where
|
|||||||
buf: BytesMut, is_eof: bool,
|
buf: BytesMut, is_eof: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Http1 {
|
Http1 {
|
||||||
flags: Flags::KEEPALIVE | if is_eof { Flags::DISCONNECTED } else { Flags::empty() },
|
flags: if is_eof { Flags::READ_DISCONNECTED } else { Flags::KEEPALIVE },
|
||||||
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
stream: H1Writer::new(stream, Rc::clone(&settings)),
|
||||||
decoder: H1Decoder::new(),
|
decoder: H1Decoder::new(),
|
||||||
payload: None,
|
payload: None,
|
||||||
@ -117,6 +118,10 @@ where
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn can_read(&self) -> bool {
|
fn can_read(&self) -> bool {
|
||||||
|
if self.flags.intersects(Flags::ERROR | Flags::READ_DISCONNECTED) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(ref info) = self.payload {
|
if let Some(ref info) = self.payload {
|
||||||
info.need_read() == PayloadStatus::Read
|
info.need_read() == PayloadStatus::Read
|
||||||
} else {
|
} else {
|
||||||
@ -125,6 +130,8 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn notify_disconnect(&mut self) {
|
fn notify_disconnect(&mut self) {
|
||||||
|
self.flags.insert(Flags::WRITE_DISCONNECTED);
|
||||||
|
|
||||||
// notify all tasks
|
// notify all tasks
|
||||||
self.stream.disconnected();
|
self.stream.disconnected();
|
||||||
for task in &mut self.tasks {
|
for task in &mut self.tasks {
|
||||||
@ -163,11 +170,15 @@ where
|
|||||||
|
|
||||||
// shutdown
|
// shutdown
|
||||||
if self.flags.contains(Flags::SHUTDOWN) {
|
if self.flags.contains(Flags::SHUTDOWN) {
|
||||||
|
if self.flags.intersects(
|
||||||
|
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED) {
|
||||||
|
return Ok(Async::Ready(()))
|
||||||
|
}
|
||||||
match self.stream.poll_completed(true) {
|
match self.stream.poll_completed(true) {
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Error sending data: {}", err);
|
debug!("Error sendips ng data: {}", err);
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -197,11 +208,9 @@ where
|
|||||||
self.flags.insert(Flags::POLLED);
|
self.flags.insert(Flags::POLLED);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// read io from socket
|
// read io from socket
|
||||||
if !self.flags.intersects(Flags::ERROR)
|
if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES {
|
||||||
&& self.tasks.len() < MAX_PIPELINED_MESSAGES
|
|
||||||
&& self.can_read()
|
|
||||||
{
|
|
||||||
match self.stream.get_mut().read_available(&mut self.buf) {
|
match self.stream.get_mut().read_available(&mut self.buf) {
|
||||||
Ok(Async::Ready((read_some, disconnected))) => {
|
Ok(Async::Ready((read_some, disconnected))) => {
|
||||||
if read_some {
|
if read_some {
|
||||||
@ -209,7 +218,7 @@ where
|
|||||||
}
|
}
|
||||||
if disconnected {
|
if disconnected {
|
||||||
// delay disconnect until all tasks have finished.
|
// delay disconnect until all tasks have finished.
|
||||||
self.flags.insert(Flags::DISCONNECTED);
|
self.flags.insert(Flags::READ_DISCONNECTED);
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() {
|
||||||
self.client_disconnect();
|
self.client_disconnect();
|
||||||
}
|
}
|
||||||
@ -231,7 +240,9 @@ where
|
|||||||
let mut idx = 0;
|
let mut idx = 0;
|
||||||
while idx < self.tasks.len() {
|
while idx < self.tasks.len() {
|
||||||
// only one task can do io operation in http/1
|
// only one task can do io operation in http/1
|
||||||
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF) {
|
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF)
|
||||||
|
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
|
||||||
|
{
|
||||||
// io is corrupted, send buffer
|
// io is corrupted, send buffer
|
||||||
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
|
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
|
||||||
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
|
||||||
@ -295,7 +306,6 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// cleanup finished tasks
|
// cleanup finished tasks
|
||||||
let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES;
|
|
||||||
while !self.tasks.is_empty() {
|
while !self.tasks.is_empty() {
|
||||||
if self.tasks[0]
|
if self.tasks[0]
|
||||||
.flags
|
.flags
|
||||||
@ -306,15 +316,13 @@ where
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// read more message
|
|
||||||
if max && self.tasks.len() >= MAX_PIPELINED_MESSAGES {
|
|
||||||
return Ok(Async::Ready(true));
|
|
||||||
}
|
|
||||||
|
|
||||||
// check stream state
|
// check stream state
|
||||||
if self.flags.contains(Flags::STARTED) {
|
if self.flags.contains(Flags::STARTED) {
|
||||||
match self.stream.poll_completed(false) {
|
match self.stream.poll_completed(false) {
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => {
|
||||||
|
return Ok(Async::NotReady)
|
||||||
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Error sending data: {}", err);
|
debug!("Error sending data: {}", err);
|
||||||
self.notify_disconnect();
|
self.notify_disconnect();
|
||||||
@ -332,8 +340,7 @@ where
|
|||||||
// deal with keep-alive and steam eof (client-side write shutdown)
|
// deal with keep-alive and steam eof (client-side write shutdown)
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() {
|
||||||
// handle stream eof
|
// handle stream eof
|
||||||
if self.flags.contains(Flags::DISCONNECTED) {
|
if self.flags.contains(Flags::READ_DISCONNECTED) {
|
||||||
self.client_disconnect();
|
|
||||||
return Ok(Async::Ready(false));
|
return Ok(Async::Ready(false));
|
||||||
}
|
}
|
||||||
// no keep-alive
|
// no keep-alive
|
||||||
@ -451,7 +458,12 @@ where
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => break,
|
Ok(None) => {
|
||||||
|
if self.flags.contains(Flags::READ_DISCONNECTED) && self.tasks.is_empty() {
|
||||||
|
self.client_disconnect();
|
||||||
|
}
|
||||||
|
break
|
||||||
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.flags.insert(Flags::ERROR);
|
self.flags.insert(Flags::ERROR);
|
||||||
if let Some(mut payload) = self.payload.take() {
|
if let Some(mut payload) = self.payload.take() {
|
||||||
|
@ -63,7 +63,9 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
|
|||||||
self.flags = Flags::KEEPALIVE;
|
self.flags = Flags::KEEPALIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disconnected(&mut self) {}
|
pub fn disconnected(&mut self) {
|
||||||
|
self.flags.insert(Flags::DISCONNECTED);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn keepalive(&self) -> bool {
|
pub fn keepalive(&self) -> bool {
|
||||||
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
|
||||||
@ -268,10 +270,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
|||||||
let pl: &[u8] = payload.as_ref();
|
let pl: &[u8] = payload.as_ref();
|
||||||
let n = match Self::write_data(&mut self.stream, pl) {
|
let n = match Self::write_data(&mut self.stream, pl) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if err.kind() == io::ErrorKind::WriteZero {
|
|
||||||
self.disconnected();
|
self.disconnected();
|
||||||
}
|
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
@ -315,14 +314,15 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
|
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
|
||||||
|
if self.flags.contains(Flags::DISCONNECTED) {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::Other, "disconnected"));
|
||||||
|
}
|
||||||
|
|
||||||
if !self.buffer.is_empty() {
|
if !self.buffer.is_empty() {
|
||||||
let written = {
|
let written = {
|
||||||
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
|
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if err.kind() == io::ErrorKind::WriteZero {
|
|
||||||
self.disconnected();
|
self.disconnected();
|
||||||
}
|
|
||||||
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
@ -339,7 +339,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
|
|||||||
self.stream.poll_flush()?;
|
self.stream.poll_flush()?;
|
||||||
self.stream.shutdown()
|
self.stream.shutdown()
|
||||||
} else {
|
} else {
|
||||||
self.stream.poll_flush()
|
Ok(self.stream.poll_flush()?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user