From 9f0a288e4b3b6578e186c72af5d60d4dd17be803 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 26 Jan 2019 21:41:28 -0800 Subject: [PATCH] refactor FramedTransport --- actix-utils/src/framed.rs | 268 ++++++++++++++++++-------------------- 1 file changed, 124 insertions(+), 144 deletions(-) diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 980ff5e4..044483f4 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -1,14 +1,17 @@ //! Framed dispatcher service and related utilities +use std::collections::VecDeque; use std::marker::PhantomData; use std::mem; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, FutureResult}; -use futures::unsync::mpsc; +use futures::task::AtomicTask; use futures::{Async, Future, Poll, Sink, Stream}; use log::debug; +use crate::cell::Cell; + type Request = ::Item; type Response = ::Item; @@ -19,13 +22,11 @@ pub struct FramedNewService { impl FramedNewService where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService, Response = Response>, - <>>::Service as Service>>::Future: 'static, - <>>::Service as Service>>::Error: 'static, - ::Item: 'static, - ::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, { pub fn new>>(factory: F1) -> Self { Self { @@ -49,13 +50,11 @@ where impl NewService> for FramedNewService where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService, Response = Response> + Clone, - <>>::Service as Service>>::Future: 'static, - <>>::Service as Service>>::Error: 'static, - ::Item: 'static, - ::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, { type Response = FramedTransport; type Error = S::InitError; @@ -90,13 +89,11 @@ where impl Service> for FramedService where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService, Response = Response>, - <>>::Service as Service>>::Future: 'static, - <>>::Service as Service>>::Error: 'static, - ::Item: 'static, - ::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, { type Response = FramedTransport; type Error = S::InitError; @@ -118,13 +115,11 @@ where #[doc(hidden)] pub struct FramedServiceResponseFuture where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService, Response = Response>, - <>>::Service as Service>>::Future: 'static, - <>>::Service as Service>>::Error: 'static, - ::Item: 'static, - ::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, { fut: S::Future, framed: Option>, @@ -132,13 +127,11 @@ where impl Future for FramedServiceResponseFuture where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService, Response = Response>, - <>>::Service as Service>>::Future: 'static, - <>>::Service as Service>>::Error: 'static, - ::Item: 'static, - ::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, { type Item = FramedTransport; type Error = S::InitError; @@ -171,16 +164,13 @@ impl From for FramedTransportError { /// and pass then to the service. pub struct FramedTransport where - S: Service, Response = Response>, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, + S: Service, Response = Response> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Encoder + Decoder + 'static, + ::Error: std::fmt::Debug, { - service: S, - state: TransportState, - framed: Framed, - request: Option>, - write_rx: mpsc::Receiver, S::Error>>, - write_tx: mpsc::Sender, S::Error>>, + inner: Cell>, + inner2: Cell>, } enum TransportState>, U: Encoder + Decoder> { @@ -190,74 +180,31 @@ enum TransportState>, U: Encoder + Decoder> { Stopping, } -impl FramedTransport +struct FramedTransportInner where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - S: Service, Response = Response>, - S::Future: 'static, - S::Error: 'static, - ::Error: 'static, + S: Service, Response = Response> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Encoder + Decoder + 'static, + ::Error: std::fmt::Debug, { - pub fn new>>(framed: Framed, service: F) -> Self { - let (write_tx, write_rx) = mpsc::channel(16); - FramedTransport { - framed, - write_rx, - write_tx, - service: service.into_service(), - state: TransportState::Processing, - request: None, - } - } - - /// Get reference to a service wrapped by `FramedTransport` instance. - pub fn get_ref(&self) -> &S { - &self.service - } - - /// Get mutable reference to a service wrapped by `FramedTransport` - /// instance. - pub fn get_mut(&mut self) -> &mut S { - &mut self.service - } - - /// Get reference to a framed instance wrapped by `FramedTransport` - /// instance. - pub fn get_framed(&self) -> &Framed { - &self.framed - } - - /// Get mutable reference to a framed instance wrapped by `FramedTransport` - /// instance. - pub fn get_framed_mut(&mut self) -> &mut Framed { - &mut self.framed - } + service: S, + state: TransportState, + framed: Framed, + buf: VecDeque, S::Error>>, + task: AtomicTask, } -impl FramedTransport +impl FramedTransportInner where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - S: Service, Response = Response>, - S::Future: 'static, - S::Error: 'static, - ::Item: 'static, - ::Error: std::fmt::Debug + 'static, + S: Service, Response = Response> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, { - fn poll_service(&mut self) -> bool { - match self.service.poll_ready() { - Ok(Async::Ready(_)) => { - if let Some(item) = self.request.take() { - let sender = self.write_tx.clone(); - tokio_current_thread::spawn( - self.service - .call(item) - .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), - ); - } - - loop { + fn poll_service(&mut self, cell: &Cell>) -> bool { + loop { + match self.service.poll_ready() { + Ok(Async::Ready(_)) => loop { let item = match self.framed.poll() { Ok(Async::Ready(Some(el))) => el, Err(err) => { @@ -272,32 +219,21 @@ where } }; - match self.service.poll_ready() { - Ok(Async::Ready(_)) => { - let sender = self.write_tx.clone(); - tokio_current_thread::spawn( - self.service - .call(item) - .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), - ); - } - Ok(Async::NotReady) => { - self.request = Some(item); - return false; - } - Err(err) => { - self.state = - TransportState::Error(FramedTransportError::Service(err)); - return true; - } - } + self.task.register(); + let mut cell = cell.clone(); + tokio_current_thread::spawn(self.service.call(item).then(move |item| { + let inner = cell.get_mut(); + inner.buf.push_back(item); + inner.task.notify(); + Ok(()) + })); + }, + Ok(Async::NotReady) => return false, + Err(err) => { + self.state = TransportState::Error(FramedTransportError::Service(err)); + return true; } } - Ok(Async::NotReady) => false, - Err(err) => { - self.state = TransportState::Error(FramedTransportError::Service(err)); - true - } } } @@ -305,8 +241,8 @@ where fn poll_response(&mut self) -> bool { loop { while !self.framed.is_write_buf_full() { - match self.write_rx.poll() { - Ok(Async::Ready(Some(msg))) => match msg { + if let Some(msg) = self.buf.pop_front() { + match msg { Ok(msg) => { if let Err(err) = self.framed.force_send(msg) { self.state = TransportState::FramedError( @@ -320,10 +256,9 @@ where TransportState::Error(FramedTransportError::Service(err)); return true; } - }, - Ok(Async::NotReady) => break, - Err(_) => panic!("Bug in actix-net code"), - Ok(Async::Ready(None)) => panic!("Bug in actix-net code"), + } + } else { + break; } } @@ -347,35 +282,80 @@ where } } +impl FramedTransport +where + S: Service, Response = Response> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, +{ + pub fn new>>(framed: Framed, service: F) -> Self { + let inner = Cell::new(FramedTransportInner { + framed, + service: service.into_service(), + state: TransportState::Processing, + buf: VecDeque::new(), + task: AtomicTask::new(), + }); + + FramedTransport { + inner2: inner.clone(), + inner, + } + } + + /// Get reference to a service wrapped by `FramedTransport` instance. + pub fn get_ref(&self) -> &S { + &self.inner.get_ref().service + } + + /// Get mutable reference to a service wrapped by `FramedTransport` + /// instance. + pub fn get_mut(&mut self) -> &mut S { + &mut self.inner.get_mut().service + } + + /// Get reference to a framed instance wrapped by `FramedTransport` + /// instance. + pub fn get_framed(&self) -> &Framed { + &self.inner.get_ref().framed + } + + /// Get mutable reference to a framed instance wrapped by `FramedTransport` + /// instance. + pub fn get_framed_mut(&mut self) -> &mut Framed { + &mut self.inner.get_mut().framed + } +} + impl Future for FramedTransport where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - S: Service, Response = Response>, - S::Future: 'static, - S::Error: 'static, - ::Item: 'static, - ::Error: std::fmt::Debug + 'static, + S: Service, Response = Response> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + ::Error: std::fmt::Debug, { type Item = (); type Error = FramedTransportError; fn poll(&mut self) -> Poll { - match mem::replace(&mut self.state, TransportState::Processing) { + let inner = self.inner.get_mut(); + + match mem::replace(&mut inner.state, TransportState::Processing) { TransportState::Processing => { - if self.poll_service() || self.poll_response() { + if inner.poll_service(&self.inner2) || inner.poll_response() { self.poll() } else { Ok(Async::NotReady) } } TransportState::Error(err) => { - if self.framed.is_write_buf_empty() - || (self.poll_response() || self.framed.is_write_buf_empty()) + if inner.framed.is_write_buf_empty() + || (inner.poll_response() || inner.framed.is_write_buf_empty()) { Err(err) } else { - self.state = TransportState::Error(err); + inner.state = TransportState::Error(err); Ok(Async::NotReady) } }