From 205cac82ce372103bd6bb089c6ba81bb2a4dee4a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 26 Jun 2019 15:19:40 +0600 Subject: [PATCH] add custom framed dispatcher service --- Cargo.toml | 1 + actix-ioframe/CHANGES.md | 5 + actix-ioframe/Cargo.toml | 30 +++ actix-ioframe/LICENSE-APACHE | 1 + actix-ioframe/LICENSE-MIT | 1 + actix-ioframe/src/cell.rs | 39 ++++ actix-ioframe/src/connect.rs | 122 ++++++++++++ actix-ioframe/src/dispatcher.rs | 302 +++++++++++++++++++++++++++++ actix-ioframe/src/error.rs | 49 +++++ actix-ioframe/src/item.rs | 78 ++++++++ actix-ioframe/src/lib.rs | 13 ++ actix-ioframe/src/service.rs | 323 ++++++++++++++++++++++++++++++++ actix-ioframe/src/sink.rs | 44 +++++ 13 files changed, 1008 insertions(+) create mode 100644 actix-ioframe/CHANGES.md create mode 100644 actix-ioframe/Cargo.toml create mode 120000 actix-ioframe/LICENSE-APACHE create mode 120000 actix-ioframe/LICENSE-MIT create mode 100644 actix-ioframe/src/cell.rs create mode 100644 actix-ioframe/src/connect.rs create mode 100644 actix-ioframe/src/dispatcher.rs create mode 100644 actix-ioframe/src/error.rs create mode 100644 actix-ioframe/src/item.rs create mode 100644 actix-ioframe/src/lib.rs create mode 100644 actix-ioframe/src/service.rs create mode 100644 actix-ioframe/src/sink.rs diff --git a/Cargo.toml b/Cargo.toml index 13313ba1..3f778044 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "actix-test-server", "actix-threadpool", "actix-tower", + "actix-ioframe", "actix-utils", "router", ] diff --git a/actix-ioframe/CHANGES.md b/actix-ioframe/CHANGES.md new file mode 100644 index 00000000..a958a328 --- /dev/null +++ b/actix-ioframe/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## [0.1.0] - 2019-xx-xx + +* Initial release diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml new file mode 100644 index 00000000..6d17fb0a --- /dev/null +++ b/actix-ioframe/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "actix-ioframe" +version = "0.1.0" +authors = ["Nikolay Kim "] +description = "Actix framed service" +keywords = ["network", "framework", "async", "futures"] +homepage = "https://actix.rs" +repository = "https://github.com/actix/actix-net.git" +documentation = "https://docs.rs/actix-ioframed/" +categories = ["network-programming", "asynchronous"] +license = "MIT/Apache-2.0" +exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] +edition = "2018" +workspace = ".." + +[lib] +name = "actix_ioframe" +path = "src/lib.rs" + +[dependencies] +actix-service = "0.4.0" +actix-codec = "0.1.1" +bytes = "0.4" +either = "1.5.2" +futures = "0.1.25" +tokio-current-thread = "0.1.4" +log = "0.4" + +[dev-dependencies] +actix-rt = "0.2.2" diff --git a/actix-ioframe/LICENSE-APACHE b/actix-ioframe/LICENSE-APACHE new file mode 120000 index 00000000..965b606f --- /dev/null +++ b/actix-ioframe/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/actix-ioframe/LICENSE-MIT b/actix-ioframe/LICENSE-MIT new file mode 120000 index 00000000..76219eb7 --- /dev/null +++ b/actix-ioframe/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/actix-ioframe/src/cell.rs b/actix-ioframe/src/cell.rs new file mode 100644 index 00000000..419709ce --- /dev/null +++ b/actix-ioframe/src/cell.rs @@ -0,0 +1,39 @@ +//! Custom cell impl + +use std::cell::UnsafeCell; +use std::fmt; +use std::rc::Rc; + +pub(crate) struct Cell { + inner: Rc>, +} + +impl Clone for Cell { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl fmt::Debug for Cell { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt(f) + } +} + +impl Cell { + pub fn new(inner: T) -> Self { + Self { + inner: Rc::new(UnsafeCell::new(inner)), + } + } + + pub fn get_ref(&self) -> &T { + unsafe { &*self.inner.as_ref().get() } + } + + pub fn get_mut(&mut self) -> &mut T { + unsafe { &mut *self.inner.as_ref().get() } + } +} diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs new file mode 100644 index 00000000..3302e141 --- /dev/null +++ b/actix-ioframe/src/connect.rs @@ -0,0 +1,122 @@ +use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; +use futures::unsync::mpsc; + +use crate::cell::Cell; +use crate::dispatcher::FramedMessage; +use crate::sink::Sink; + +pub struct Connect { + io: Io, + codec: Codec, + state: St, + // rx: mpsc::UnboundedReceiver::Item>>, + // sink: Sink<::Item>, +} + +impl Connect { + pub(crate) fn new(io: Io) -> Self { + Self { + io, + codec: (), + state: (), + } + } +} + +impl Connect { + pub fn codec(self, codec: Codec) -> Connect { + Connect { + codec, + io: self.io, + state: self.state, + } + } + + pub fn state(self, state: St) -> Connect { + Connect { + state, + io: self.io, + codec: self.codec, + } + } + + pub fn state_fn(self, f: F) -> Connect + where + F: FnOnce(&Connect) -> St, + { + Connect { + state: f(&self), + io: self.io, + codec: self.codec, + } + } +} + +impl Connect +where + C: Encoder + Decoder, + Io: AsyncRead + AsyncWrite, +{ + pub fn into_result(self) -> ConnectResult { + let (tx, rx) = mpsc::unbounded(); + let sink = Sink::new(tx); + + ConnectResult { + state: Cell::new(self.state), + framed: Framed::new(self.io, self.codec), + rx, + sink, + } + } +} + +pub struct ConnectResult { + pub(crate) state: Cell, + pub(crate) framed: Framed, + pub(crate) rx: mpsc::UnboundedReceiver::Item>>, + pub(crate) sink: Sink<::Item>, +} + +impl ConnectResult { + #[inline] + pub fn sink(&self) -> &Sink<::Item> { + &self.sink + } +} + +impl futures::Stream for ConnectResult +where + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, +{ + type Item = ::Item; + type Error = ::Error; + + fn poll(&mut self) -> futures::Poll, Self::Error> { + self.framed.poll() + } +} + +impl futures::Sink for ConnectResult +where + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, +{ + type SinkItem = ::Item; + type SinkError = ::Error; + + fn start_send( + &mut self, + item: Self::SinkItem, + ) -> futures::StartSend { + self.framed.start_send(item) + } + + fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { + self.framed.poll_complete() + } + + fn close(&mut self) -> futures::Poll<(), Self::SinkError> { + self.framed.close() + } +} diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs new file mode 100644 index 00000000..e27e62d7 --- /dev/null +++ b/actix-ioframe/src/dispatcher.rs @@ -0,0 +1,302 @@ +//! Framed dispatcher service and related utilities +use std::collections::VecDeque; +use std::mem; + +use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; +use actix_service::{IntoService, Service}; +use futures::task::AtomicTask; +use futures::unsync::{mpsc, oneshot}; +use futures::{Async, Future, Poll, Sink as FutureSink, Stream}; +use log::debug; + +use crate::cell::Cell; +use crate::error::ServiceError; +use crate::item::Item; +use crate::sink::Sink; + +type Request = Item; +type Response = ::Item; + +pub(crate) enum FramedMessage { + Message(T), + Close, + WaitClose(oneshot::Sender<()>), +} + +/// FramedTransport - is a future that reads frames from Framed object +/// and pass then to the service. +pub(crate) struct FramedDispatcher +where + S: Service, Response = Option>>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Encoder + Decoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + service: S, + sink: Sink<::Item>, + state: Cell, + dispatch_state: State, + framed: Framed, + rx: Option::Item>>>, + inner: Cell::Item, S::Error>>, +} + +impl FramedDispatcher +where + S: Service, Response = Option>>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + pub(crate) fn new>( + framed: Framed, + state: Cell, + service: F, + rx: mpsc::UnboundedReceiver::Item>>, + sink: Sink<::Item>, + ) -> Self { + FramedDispatcher { + framed, + state, + sink, + rx: Some(rx), + service: service.into_service(), + dispatch_state: State::Processing, + inner: Cell::new(FramedDispatcherInner { + buf: VecDeque::new(), + task: AtomicTask::new(), + }), + } + } +} + +enum State { + Processing, + Error(ServiceError), + FramedError(ServiceError), + FlushAndStop(Vec>), + Stopping, +} + +impl State { + fn stop(&mut self, tx: Option>) { + match self { + State::FlushAndStop(ref mut vec) => { + if let Some(tx) = tx { + vec.push(tx) + } + } + State::Processing => { + *self = State::FlushAndStop(if let Some(tx) = tx { + vec![tx] + } else { + Vec::new() + }) + } + State::Error(_) | State::FramedError(_) | State::Stopping => { + if let Some(tx) = tx { + let _ = tx.send(()); + } + } + } + } +} + +struct FramedDispatcherInner { + buf: VecDeque>, + task: AtomicTask, +} + +impl FramedDispatcher +where + S: Service, Response = Option>>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + fn poll_read(&mut self) -> bool { + loop { + match self.service.poll_ready() { + Ok(Async::Ready(_)) => { + let item = match self.framed.poll() { + Ok(Async::Ready(Some(el))) => el, + Err(err) => { + self.dispatch_state = + State::FramedError(ServiceError::Decoder(err)); + return true; + } + Ok(Async::NotReady) => return false, + Ok(Async::Ready(None)) => { + self.dispatch_state = State::Stopping; + return true; + } + }; + + let mut cell = self.inner.clone(); + cell.get_mut().task.register(); + tokio_current_thread::spawn( + self.service + .call(Item::new(self.state.clone(), self.sink.clone(), item)) + .then(move |item| { + let item = match item { + Ok(Some(item)) => Ok(item), + Ok(None) => return Ok(()), + Err(err) => Err(err), + }; + let inner = cell.get_mut(); + inner.buf.push_back(item); + inner.task.notify(); + Ok(()) + }), + ); + } + Ok(Async::NotReady) => return false, + Err(err) => { + self.dispatch_state = State::Error(ServiceError::Service(err)); + return true; + } + } + } + } + + /// write to framed object + fn poll_write(&mut self) -> bool { + let inner = self.inner.get_mut(); + let mut rx_done = self.rx.is_none(); + let mut buf_empty = inner.buf.is_empty(); + loop { + while !self.framed.is_write_buf_full() { + if !buf_empty { + match inner.buf.pop_front().unwrap() { + Ok(msg) => { + if let Err(err) = self.framed.force_send(msg) { + self.dispatch_state = + State::FramedError(ServiceError::Encoder(err)); + return true; + } + buf_empty = inner.buf.is_empty(); + } + Err(err) => { + self.dispatch_state = State::Error(ServiceError::Service(err)); + return true; + } + } + } + + if !rx_done && self.rx.is_some() { + match self.rx.as_mut().unwrap().poll() { + Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => { + if let Err(err) = self.framed.force_send(msg) { + self.dispatch_state = + State::FramedError(ServiceError::Encoder(err)); + return true; + } + } + Ok(Async::Ready(Some(FramedMessage::Close))) => { + self.dispatch_state.stop(None); + return true; + } + Ok(Async::Ready(Some(FramedMessage::WaitClose(tx)))) => { + self.dispatch_state.stop(Some(tx)); + return true; + } + Ok(Async::Ready(None)) => { + rx_done = true; + let _ = self.rx.take(); + } + Ok(Async::NotReady) => rx_done = true, + Err(_e) => { + rx_done = true; + let _ = self.rx.take(); + } + } + } + + if rx_done && buf_empty { + break; + } + } + + if !self.framed.is_write_buf_empty() { + match self.framed.poll_complete() { + Ok(Async::NotReady) => break, + Err(err) => { + debug!("Error sending data: {:?}", err); + self.dispatch_state = State::FramedError(ServiceError::Encoder(err)); + return true; + } + Ok(Async::Ready(_)) => (), + } + } else { + break; + } + } + + false + } +} + +impl Future for FramedDispatcher +where + S: Service, Response = Option>>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + type Item = (); + type Error = ServiceError; + + fn poll(&mut self) -> Poll { + match mem::replace(&mut self.dispatch_state, State::Processing) { + State::Processing => { + if self.poll_read() || self.poll_write() { + self.poll() + } else { + Ok(Async::NotReady) + } + } + State::Error(err) => { + if self.framed.is_write_buf_empty() + || (self.poll_write() || self.framed.is_write_buf_empty()) + { + Err(err) + } else { + self.dispatch_state = State::Error(err); + Ok(Async::NotReady) + } + } + State::FlushAndStop(mut vec) => { + if !self.framed.is_write_buf_empty() { + match self.framed.poll_complete() { + Err(err) => { + debug!("Error sending data: {:?}", err); + } + Ok(Async::NotReady) => { + self.dispatch_state = State::FlushAndStop(vec); + return Ok(Async::NotReady); + } + Ok(Async::Ready(_)) => (), + } + }; + for tx in vec.drain(..) { + let _ = tx.send(()); + } + Ok(Async::Ready(())) + } + State::FramedError(err) => Err(err), + State::Stopping => Ok(Async::Ready(())), + } + } +} diff --git a/actix-ioframe/src/error.rs b/actix-ioframe/src/error.rs new file mode 100644 index 00000000..e5d9b1b9 --- /dev/null +++ b/actix-ioframe/src/error.rs @@ -0,0 +1,49 @@ +use std::fmt; + +use actix_codec::{Decoder, Encoder}; + +/// Framed service errors +pub enum ServiceError { + /// Inner service error + Service(E), + /// Encoder parse error + Encoder(::Error), + /// Decoder parse error + Decoder(::Error), +} + +impl From for ServiceError { + fn from(err: E) -> Self { + ServiceError::Service(err) + } +} + +impl fmt::Debug for ServiceError +where + E: fmt::Debug, + ::Error: fmt::Debug, + ::Error: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + ServiceError::Service(ref e) => write!(fmt, "ServiceError::Service({:?})", e), + ServiceError::Encoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e), + ServiceError::Decoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e), + } + } +} + +impl fmt::Display for ServiceError +where + E: fmt::Display, + ::Error: fmt::Debug, + ::Error: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + ServiceError::Service(ref e) => write!(fmt, "{}", e), + ServiceError::Encoder(ref e) => write!(fmt, "{:?}", e), + ServiceError::Decoder(ref e) => write!(fmt, "{:?}", e), + } + } +} diff --git a/actix-ioframe/src/item.rs b/actix-ioframe/src/item.rs new file mode 100644 index 00000000..f326d0a9 --- /dev/null +++ b/actix-ioframe/src/item.rs @@ -0,0 +1,78 @@ +use std::fmt; +use std::ops::{Deref, DerefMut}; + +use actix_codec::{Decoder, Encoder}; + +use crate::cell::Cell; +use crate::sink::Sink; + +pub struct Item { + state: Cell, + sink: Sink<::Item>, + item: ::Item, +} + +impl Item +where + Codec: Encoder + Decoder, +{ + pub(crate) fn new( + state: Cell, + sink: Sink<::Item>, + item: ::Item, + ) -> Self { + Item { state, sink, item } + } + + #[inline] + pub fn state(&self) -> &St { + self.state.get_ref() + } + + #[inline] + pub fn state_mut(&mut self) -> &mut St { + self.state.get_mut() + } + + #[inline] + pub fn sink(&self) -> &Sink<::Item> { + &self.sink + } + + #[inline] + pub fn into_inner(self) -> ::Item { + self.item + } +} + +impl Deref for Item +where + Codec: Encoder + Decoder, +{ + type Target = ::Item; + + #[inline] + fn deref(&self) -> &::Item { + &self.item + } +} + +impl DerefMut for Item +where + Codec: Encoder + Decoder, +{ + #[inline] + fn deref_mut(&mut self) -> &mut ::Item { + &mut self.item + } +} + +impl fmt::Debug for Item +where + Codec: Encoder + Decoder, + ::Item: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("FramedItem").field(&self.item).finish() + } +} diff --git a/actix-ioframe/src/lib.rs b/actix-ioframe/src/lib.rs new file mode 100644 index 00000000..fabfaa8d --- /dev/null +++ b/actix-ioframe/src/lib.rs @@ -0,0 +1,13 @@ +mod cell; +mod connect; +mod dispatcher; +mod error; +mod item; +mod service; +mod sink; + +pub use self::connect::{Connect, ConnectResult}; +pub use self::error::ServiceError; +pub use self::item::Item; +pub use self::service::{Builder, NewServiceBuilder, ServiceBuilder}; +pub use self::sink::Sink; diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs new file mode 100644 index 00000000..9819c9d1 --- /dev/null +++ b/actix-ioframe/src/service.rs @@ -0,0 +1,323 @@ +use std::marker::PhantomData; +use std::rc::Rc; + +use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; +use actix_service::{IntoNewService, IntoService, NewService, Service}; +use futures::{Async, Future, Poll}; + +use crate::connect::{Connect, ConnectResult}; +use crate::dispatcher::FramedDispatcher; +use crate::error::ServiceError; +use crate::item::Item; + +type RequestItem = Item; +type ResponseItem = Option<::Item>; + +/// Service builder - structure that follows the builder pattern +/// for building instances for framed services. +pub struct Builder(PhantomData<(St, Codec)>); + +impl Builder { + pub fn new() -> Builder { + Builder(PhantomData) + } + + /// Construct framed handler service with specified connect service + pub fn service(self, connect: F) -> ServiceBuilder + where + F: IntoService, + Io: AsyncRead + AsyncWrite, + C: Service, Response = ConnectResult>, + Codec: Decoder + Encoder, + { + ServiceBuilder { + connect: connect.into_service(), + _t: PhantomData, + } + } + + /// Construct framed handler new service with specified connect service + pub fn factory(self, connect: F) -> NewServiceBuilder + where + F: IntoNewService, + Io: AsyncRead + AsyncWrite, + C: NewService< + Config = (), + Request = Connect, + Response = ConnectResult, + >, + C::Error: 'static, + C::Future: 'static, + Codec: Decoder + Encoder, + { + NewServiceBuilder { + connect: connect.into_new_service(), + _t: PhantomData, + } + } +} + +pub struct ServiceBuilder { + connect: C, + _t: PhantomData<(St, Io, Codec)>, +} + +impl ServiceBuilder +where + St: 'static, + Io: AsyncRead + AsyncWrite, + C: Service, Response = ConnectResult>, + C::Error: 'static, + Codec: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + pub fn finish( + self, + service: F, + ) -> impl Service> + where + F: IntoNewService, + T: NewService< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + > + 'static, + { + FramedServiceImpl { + connect: self.connect, + handler: Rc::new(service.into_new_service()), + _t: PhantomData, + } + } +} + +pub struct NewServiceBuilder { + connect: C, + _t: PhantomData<(St, Io, Codec)>, +} + +impl NewServiceBuilder +where + St: 'static, + Io: AsyncRead + AsyncWrite, + C: NewService, Response = ConnectResult>, + C::Error: 'static, + C::Future: 'static, + Codec: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + pub fn finish( + self, + service: F, + ) -> impl NewService< + Config = (), + Request = Io, + Response = (), + Error = ServiceError, + > + where + F: IntoNewService, + T: NewService< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + > + 'static, + { + FramedService { + connect: self.connect, + handler: Rc::new(service.into_new_service()), + _t: PhantomData, + } + } +} + +pub(crate) struct FramedService { + connect: C, + handler: Rc, + _t: PhantomData<(St, Io, Codec)>, +} + +impl NewService for FramedService +where + St: 'static, + Io: AsyncRead + AsyncWrite, + C: NewService, Response = ConnectResult>, + C::Error: 'static, + C::Future: 'static, + T: NewService< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + > + 'static, + Codec: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + type Config = (); + type Request = Io; + type Response = (); + type Error = ServiceError; + type InitError = C::InitError; + type Service = FramedServiceImpl; + type Future = Box>; + + fn new_service(&self, _: &()) -> Self::Future { + let handler = self.handler.clone(); + + // create connect service and then create service impl + Box::new( + self.connect + .new_service(&()) + .map(move |connect| FramedServiceImpl { + connect, + handler, + _t: PhantomData, + }), + ) + } +} + +pub struct FramedServiceImpl { + connect: C, + handler: Rc, + _t: PhantomData<(St, Io, Codec)>, +} + +impl Service for FramedServiceImpl +where + // St: 'static, + Io: AsyncRead + AsyncWrite, + C: Service, Response = ConnectResult>, + C::Error: 'static, + T: NewService< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + <::Service as Service>::Future: 'static, + Codec: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + type Request = Io; + type Response = (); + type Error = ServiceError; + type Future = FramedServiceImplResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.connect.poll_ready().map_err(|e| e.into()) + } + + fn call(&mut self, req: Io) -> Self::Future { + FramedServiceImplResponse { + inner: FramedServiceImplResponseInner::Connect( + self.connect.call(Connect::new(req)), + self.handler.clone(), + ), + } + } +} + +pub struct FramedServiceImplResponse +where + C: Service, Response = ConnectResult>, + C::Error: 'static, + T: NewService< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + <::Service as Service>::Future: 'static, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + inner: FramedServiceImplResponseInner, +} + +enum FramedServiceImplResponseInner +where + C: Service, Response = ConnectResult>, + C::Error: 'static, + T: NewService< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + <::Service as Service>::Future: 'static, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + Connect(C::Future, Rc), + Handler(T::Future, Option>), + Dispatcher(FramedDispatcher), +} + +impl Future for FramedServiceImplResponse +where + C: Service, Response = ConnectResult>, + C::Error: 'static, + T: NewService< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + <::Service as Service>::Future: 'static, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + type Item = (); + type Error = ServiceError; + + fn poll(&mut self) -> Poll { + match self.inner { + FramedServiceImplResponseInner::Connect(ref mut fut, ref handler) => { + match fut.poll()? { + Async::Ready(res) => { + self.inner = FramedServiceImplResponseInner::Handler( + handler.new_service(res.state.get_ref()), + Some(res), + ); + self.poll() + } + Async::NotReady => Ok(Async::NotReady), + } + } + FramedServiceImplResponseInner::Handler(ref mut fut, ref mut res) => { + match fut.poll()? { + Async::Ready(handler) => { + let res = res.take().unwrap(); + self.inner = + FramedServiceImplResponseInner::Dispatcher(FramedDispatcher::new( + res.framed, res.state, handler, res.rx, res.sink, + )); + self.poll() + } + Async::NotReady => Ok(Async::NotReady), + } + } + FramedServiceImplResponseInner::Dispatcher(ref mut fut) => fut.poll(), + } + } +} diff --git a/actix-ioframe/src/sink.rs b/actix-ioframe/src/sink.rs new file mode 100644 index 00000000..4ebf7909 --- /dev/null +++ b/actix-ioframe/src/sink.rs @@ -0,0 +1,44 @@ +use std::fmt; + +use futures::unsync::{mpsc, oneshot}; +use futures::Future; + +use crate::dispatcher::FramedMessage; + +pub struct Sink(mpsc::UnboundedSender>); + +impl Clone for Sink { + fn clone(&self) -> Self { + Sink(self.0.clone()) + } +} + +impl Sink { + pub(crate) fn new(tx: mpsc::UnboundedSender>) -> Self { + Sink(tx) + } + + /// Close connection + pub fn close(&self) { + let _ = self.0.unbounded_send(FramedMessage::Close); + } + + /// Close connection + pub fn wait_close(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.0.unbounded_send(FramedMessage::WaitClose(tx)); + + rx.map_err(|_| ()) + } + + /// Send item + pub fn send(&self, item: T) { + let _ = self.0.unbounded_send(FramedMessage::Message(item)); + } +} + +impl fmt::Debug for Sink { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Sink").finish() + } +}