1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-17 14:43:31 +01:00

better usage for Framed type

This commit is contained in:
Nikolay Kim 2018-12-16 16:26:24 -08:00
parent cd5435e5ee
commit 640c39fdc8
2 changed files with 34 additions and 55 deletions

View File

@ -21,8 +21,7 @@ path = "src/lib.rs"
actix-service = "0.1.1" actix-service = "0.1.1"
actix-codec = "0.1.0" actix-codec = "0.1.0"
actix-rt = "0.1.0" actix-rt = "0.1.0"
# io
bytes = "0.4" bytes = "0.4"
futures = "0.1" futures = "0.1"
tokio-timer = "0.2.8" tokio-timer = "0.2.8"
log = "0.4"

View File

@ -7,7 +7,8 @@ use actix_rt::Arbiter;
use actix_service::{IntoNewService, IntoService, NewService, Service}; use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use futures::unsync::mpsc; use futures::unsync::mpsc;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::{Async, Future, Poll, Sink, Stream};
use log::debug;
type Request<U> = <U as Decoder>::Item; type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item; type Response<U> = <U as Encoder>::Item;
@ -179,10 +180,8 @@ where
state: TransportState<S, U>, state: TransportState<S, U>,
framed: Framed<T, U>, framed: Framed<T, U>,
request: Option<Request<U>>, request: Option<Request<U>>,
response: Option<Response<U>>,
write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>, write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>,
write_tx: mpsc::Sender<Result<Response<U>, S::Error>>, write_tx: mpsc::Sender<Result<Response<U>, S::Error>>,
flushed: bool,
} }
enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> { enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> {
@ -210,8 +209,6 @@ where
service: service.into_service(), service: service.into_service(),
state: TransportState::Processing, state: TransportState::Processing,
request: None, request: None,
response: None,
flushed: true,
} }
} }
@ -247,7 +244,7 @@ where
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: 'static, <U as Encoder>::Error: std::fmt::Debug + 'static,
{ {
fn poll_service(&mut self) -> bool { fn poll_service(&mut self) -> bool {
match self.service.poll_ready() { match self.service.poll_ready() {
@ -307,59 +304,42 @@ where
/// write to sink /// write to sink
fn poll_response(&mut self) -> bool { fn poll_response(&mut self) -> bool {
let mut item = self.response.take();
loop { loop {
item = if let Some(msg) = item { while !self.framed.is_write_buf_full() {
self.flushed = false; match self.write_rx.poll() {
match self.framed.start_send(msg) { Ok(Async::Ready(Some(msg))) => match msg {
Ok(AsyncSink::Ready) => None, Ok(msg) => {
Ok(AsyncSink::NotReady(item)) => Some(item), if let Err(err) = self.framed.force_send(msg) {
Err(err) => { self.state = TransportState::EncoderError(
self.state = FramedTransportError::Encoder(err),
TransportState::EncoderError(FramedTransportError::Encoder(err)); );
return true; return true;
} }
} }
} else { Err(err) => {
None self.state =
}; TransportState::Error(FramedTransportError::Service(err));
return true;
// flush sink }
if !self.flushed { },
match self.framed.poll_complete() {
Ok(Async::Ready(_)) => {
self.flushed = true;
}
Ok(Async::NotReady) => break, Ok(Async::NotReady) => break,
Err(err) => { Err(_) => panic!("Bug in actix-net code"),
self.state = Ok(Async::Ready(None)) => panic!("Bug in actix-net code"),
TransportState::EncoderError(FramedTransportError::Encoder(err));
return true;
}
} }
} }
// check channel if !self.framed.is_write_buf_empty() {
if self.flushed { match self.framed.poll_complete() {
if item.is_none() { Ok(Async::NotReady) => break,
match self.write_rx.poll() { Err(err) => {
Ok(Async::Ready(Some(msg))) => match msg { debug!("Error sending data: {:?}", err);
Ok(msg) => item = Some(msg), self.state =
Err(err) => { TransportState::EncoderError(FramedTransportError::Encoder(err));
self.state = return true;
TransportState::Error(FramedTransportError::Service(err));
return true;
}
},
Ok(Async::NotReady) => break,
Err(_) => panic!("Bug in gw code"),
Ok(Async::Ready(None)) => panic!("Bug in gw code"),
} }
} else { Ok(Async::Ready(_)) => (),
continue;
} }
} else { } else {
self.response = item;
break; break;
} }
} }
@ -376,7 +356,7 @@ where
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: 'static, <U as Encoder>::Error: std::fmt::Debug + 'static,
{ {
type Item = (); type Item = ();
type Error = FramedTransportError<S::Error, U>; type Error = FramedTransportError<S::Error, U>;
@ -391,7 +371,7 @@ where
} }
} }
TransportState::Error(err) => { TransportState::Error(err) => {
if self.poll_response() || self.flushed { if self.poll_response() || !self.framed.is_write_buf_empty() {
Err(err) Err(err)
} else { } else {
self.state = TransportState::Error(err); self.state = TransportState::Error(err);