From 640c39fdc82f5766f0cd6c030dade0c4bdcc8154 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 16 Dec 2018 16:26:24 -0800 Subject: [PATCH] better usage for Framed type --- actix-utils/Cargo.toml | 3 +- actix-utils/src/framed.rs | 86 +++++++++++++++------------------------ 2 files changed, 34 insertions(+), 55 deletions(-) diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 4033cee2..e6303e25 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -21,8 +21,7 @@ path = "src/lib.rs" actix-service = "0.1.1" actix-codec = "0.1.0" actix-rt = "0.1.0" - -# io bytes = "0.4" futures = "0.1" tokio-timer = "0.2.8" +log = "0.4" \ No newline at end of file diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index bb6e9c13..cdceeaf4 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -7,7 +7,8 @@ use actix_rt::Arbiter; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, FutureResult}; use futures::unsync::mpsc; -use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; +use futures::{Async, Future, Poll, Sink, Stream}; +use log::debug; type Request = ::Item; type Response = ::Item; @@ -179,10 +180,8 @@ where state: TransportState, framed: Framed, request: Option>, - response: Option>, write_rx: mpsc::Receiver, S::Error>>, write_tx: mpsc::Sender, S::Error>>, - flushed: bool, } enum TransportState>, U: Encoder + Decoder> { @@ -210,8 +209,6 @@ where service: service.into_service(), state: TransportState::Processing, request: None, - response: None, - flushed: true, } } @@ -247,7 +244,7 @@ where S::Future: 'static, S::Error: 'static, ::Item: 'static, - ::Error: 'static, + ::Error: std::fmt::Debug + 'static, { fn poll_service(&mut self) -> bool { match self.service.poll_ready() { @@ -307,59 +304,42 @@ where /// write to sink fn poll_response(&mut self) -> bool { - let mut item = self.response.take(); loop { - item = if let Some(msg) = item { - self.flushed = false; - match self.framed.start_send(msg) { - Ok(AsyncSink::Ready) => None, - Ok(AsyncSink::NotReady(item)) => Some(item), - Err(err) => { - self.state = - TransportState::EncoderError(FramedTransportError::Encoder(err)); - return true; - } - } - } else { - None - }; - - // flush sink - if !self.flushed { - match self.framed.poll_complete() { - Ok(Async::Ready(_)) => { - self.flushed = true; - } + while !self.framed.is_write_buf_full() { + match self.write_rx.poll() { + Ok(Async::Ready(Some(msg))) => match msg { + Ok(msg) => { + if let Err(err) = self.framed.force_send(msg) { + self.state = TransportState::EncoderError( + FramedTransportError::Encoder(err), + ); + return true; + } + } + Err(err) => { + self.state = + TransportState::Error(FramedTransportError::Service(err)); + return true; + } + }, Ok(Async::NotReady) => break, - Err(err) => { - self.state = - TransportState::EncoderError(FramedTransportError::Encoder(err)); - return true; - } + Err(_) => panic!("Bug in actix-net code"), + Ok(Async::Ready(None)) => panic!("Bug in actix-net code"), } } - // check channel - if self.flushed { - if item.is_none() { - match self.write_rx.poll() { - Ok(Async::Ready(Some(msg))) => match msg { - Ok(msg) => item = Some(msg), - Err(err) => { - self.state = - TransportState::Error(FramedTransportError::Service(err)); - return true; - } - }, - Ok(Async::NotReady) => break, - Err(_) => panic!("Bug in gw code"), - Ok(Async::Ready(None)) => panic!("Bug in gw code"), + if !self.framed.is_write_buf_empty() { + match self.framed.poll_complete() { + Ok(Async::NotReady) => break, + Err(err) => { + debug!("Error sending data: {:?}", err); + self.state = + TransportState::EncoderError(FramedTransportError::Encoder(err)); + return true; } - } else { - continue; + Ok(Async::Ready(_)) => (), } } else { - self.response = item; break; } } @@ -376,7 +356,7 @@ where S::Future: 'static, S::Error: 'static, ::Item: 'static, - ::Error: 'static, + ::Error: std::fmt::Debug + 'static, { type Item = (); type Error = FramedTransportError; @@ -391,7 +371,7 @@ where } } TransportState::Error(err) => { - if self.poll_response() || self.flushed { + if self.poll_response() || !self.framed.is_write_buf_empty() { Err(err) } else { self.state = TransportState::Error(err);