From 5779da0f49ecb66ce012e714a810a851265843e4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 29 Dec 2019 13:42:42 +0600 Subject: [PATCH] refactor service and state manahement --- actix-ioframe/CHANGES.md | 8 + actix-ioframe/Cargo.toml | 11 +- actix-ioframe/src/connect.rs | 49 ++-- actix-ioframe/src/dispatcher.rs | 216 +++++++--------- actix-ioframe/src/item.rs | 82 ------ actix-ioframe/src/lib.rs | 8 +- actix-ioframe/src/service.rs | 390 ++++++++++++++--------------- actix-ioframe/src/sink.rs | 45 ---- actix-ioframe/tests/test_server.rs | 63 ++--- 9 files changed, 345 insertions(+), 527 deletions(-) delete mode 100644 actix-ioframe/src/item.rs delete mode 100644 actix-ioframe/src/sink.rs diff --git a/actix-ioframe/CHANGES.md b/actix-ioframe/CHANGES.md index 17b79828..1b332a52 100644 --- a/actix-ioframe/CHANGES.md +++ b/actix-ioframe/CHANGES.md @@ -1,5 +1,13 @@ # Changes +## [0.5.0] - 2019-12-29 + +* Simplify state management + +* Allow to set custom output stream + +* Removed disconnect callback + ## [0.4.1] - 2019-12-11 * Disconnect callback accepts owned state diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 4bfde348..c20da1ff 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-ioframe" -version = "0.4.1" +version = "0.5.0" authors = ["Nikolay Kim "] description = "Actix framed service" keywords = ["network", "framework", "async", "futures"] @@ -10,19 +10,18 @@ documentation = "https://docs.rs/actix-ioframe/" categories = ["network-programming", "asynchronous"] license = "MIT/Apache-2.0" edition = "2018" -workspace = ".." [lib] name = "actix_ioframe" path = "src/lib.rs" [dependencies] -actix-service = "1.0.0" +actix-service = "1.0.1" actix-codec = "0.2.0" -actix-utils = "1.0.1" +actix-utils = "1.0.4" actix-rt = "1.0.0" -bytes = "0.5" -either = "1.5.2" +bytes = "0.5.3" +either = "1.5.3" futures = "0.3.1" pin-project = "0.4.6" log = "0.4" diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs index 29fcb1e2..61705951 100644 --- a/actix-ioframe/src/connect.rs +++ b/actix-ioframe/src/connect.rs @@ -3,17 +3,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; +use actix_utils::mpsc::Receiver; use futures::Stream; -use crate::sink::Sink; - -pub struct Connect +pub struct Connect where Codec: Encoder + Decoder, { io: Io, - sink: Sink<::Item>, - _t: PhantomData<(St, Codec)>, + _t: PhantomData, } impl Connect @@ -21,36 +19,33 @@ where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, { - pub(crate) fn new(io: Io, sink: Sink<::Item>) -> Self { + pub(crate) fn new(io: Io) -> Self { Self { io, - sink, _t: PhantomData, } } - pub fn codec(self, codec: Codec) -> ConnectResult { + pub fn codec( + self, + codec: Codec, + ) -> ConnectResult::Item>> { ConnectResult { state: (), - sink: self.sink, + out: None, framed: Framed::new(self.io, codec), } } } #[pin_project::pin_project] -pub struct ConnectResult { +pub struct ConnectResult { pub(crate) state: St, + pub(crate) out: Option, pub(crate) framed: Framed, - pub(crate) sink: Sink<::Item>, } -impl ConnectResult { - #[inline] - pub fn sink(&self) -> &Sink<::Item> { - &self.sink - } - +impl ConnectResult { #[inline] pub fn get_ref(&self) -> &Io { self.framed.get_ref() @@ -61,17 +56,28 @@ impl ConnectResult { self.framed.get_mut() } + pub fn out(self, out: U) -> ConnectResult + where + U: Stream::Item> + Unpin, + { + ConnectResult { + state: self.state, + framed: self.framed, + out: Some(out), + } + } + #[inline] - pub fn state(self, state: S) -> ConnectResult { + pub fn state(self, state: S) -> ConnectResult { ConnectResult { state, framed: self.framed, - sink: self.sink, + out: self.out, } } } -impl Stream for ConnectResult +impl Stream for ConnectResult where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, @@ -83,7 +89,8 @@ where } } -impl futures::Sink<::Item> for ConnectResult +impl futures::Sink<::Item> + for ConnectResult where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index 8cb22c3a..1565f797 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -1,81 +1,56 @@ //! Framed dispatcher service and related utilities use std::pin::Pin; -use std::rc::Rc; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_service::{IntoService, Service}; -use actix_utils::{mpsc, oneshot}; -use futures::{FutureExt, Stream}; +use actix_service::Service; +use actix_utils::mpsc; +use futures::Stream; use log::debug; use crate::error::ServiceError; -use crate::item::Item; -use crate::sink::Sink; -type Request = Item; +type Request = ::Item; type Response = ::Item; -pub(crate) enum Message { - Item(T), - WaitClose(oneshot::Sender<()>), - Close, -} - /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. -#[pin_project::pin_project] -pub(crate) struct Dispatcher +pub(crate) struct Dispatcher where - St: Clone, - S: Service, Response = Option>>, + S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, U: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { service: S, - sink: Sink<::Item>, - state: St, - dispatch_state: FramedState, + sink: Option, + state: FramedState, framed: Framed, - rx: mpsc::Receiver::Item>, S::Error>>, - tx: mpsc::Sender::Item>, S::Error>>, - disconnect: Option>, + rx: mpsc::Receiver::Item, S::Error>>, } -impl Dispatcher +impl Dispatcher where - St: Clone, - S: Service, Response = Option>>, + S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { - pub(crate) fn new>( - framed: Framed, - state: St, - service: F, - sink: Sink<::Item>, - rx: mpsc::Receiver::Item>, S::Error>>, - disconnect: Option>, - ) -> Self { - let tx = rx.sender(); - + pub(crate) fn new(framed: Framed, service: S, sink: Option) -> Self { Dispatcher { - framed, - state, sink, - disconnect, - rx, - tx, - service: service.into_service(), - dispatch_state: FramedState::Processing, + service, + framed, + rx: mpsc::channel().1, + state: FramedState::Processing, } } } @@ -84,33 +59,11 @@ enum FramedState { Processing, Error(ServiceError), FramedError(ServiceError), - FlushAndStop(Vec>), + FlushAndStop, Stopping, } impl FramedState { - fn stop(&mut self, tx: Option>) { - match self { - FramedState::FlushAndStop(ref mut vec) => { - if let Some(tx) = tx { - vec.push(tx) - } - } - FramedState::Processing => { - *self = FramedState::FlushAndStop(if let Some(tx) = tx { - vec![tx] - } else { - Vec::new() - }) - } - FramedState::Error(_) | FramedState::FramedError(_) | FramedState::Stopping => { - if let Some(tx) = tx { - let _ = tx.send(()); - } - } - } - } - fn take_error(&mut self) -> ServiceError { match std::mem::replace(self, FramedState::Processing) { FramedState::Error(err) => err, @@ -126,16 +79,16 @@ impl FramedState { } } -impl Dispatcher +impl Dispatcher where - St: Clone, - S: Service, Response = Option>>, + S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { fn poll_read(&mut self, cx: &mut Context<'_>) -> bool { loop { @@ -144,35 +97,32 @@ where let item = match self.framed.next_item(cx) { Poll::Ready(Some(Ok(el))) => el, Poll::Ready(Some(Err(err))) => { - self.dispatch_state = - FramedState::FramedError(ServiceError::Decoder(err)); + self.state = FramedState::FramedError(ServiceError::Decoder(err)); return true; } Poll::Pending => return false, Poll::Ready(None) => { log::trace!("Client disconnected"); - self.dispatch_state = FramedState::Stopping; + self.state = FramedState::Stopping; return true; } }; - let tx = self.tx.clone(); - actix_rt::spawn( - self.service - .call(Item::new(self.state.clone(), self.sink.clone(), item)) - .map(move |item| { - let item = match item { - Ok(Some(item)) => Ok(Message::Item(item)), - Ok(None) => return, - Err(err) => Err(err), - }; - let _ = tx.send(item); - }), - ); + let tx = self.rx.sender(); + let fut = self.service.call(item); + actix_rt::spawn(async move { + let item = fut.await; + let item = match item { + Ok(Some(item)) => Ok(item), + Ok(None) => return, + Err(err) => Err(err), + }; + let _ = tx.send(item); + }); } Poll::Pending => return false, Poll::Ready(Err(err)) => { - self.dispatch_state = FramedState::Error(ServiceError::Service(err)); + self.state = FramedState::Error(ServiceError::Service(err)); return true; } } @@ -184,27 +134,39 @@ where loop { while !self.framed.is_write_buf_full() { match Pin::new(&mut self.rx).poll_next(cx) { - Poll::Ready(Some(Ok(Message::Item(msg)))) => { + Poll::Ready(Some(Ok(msg))) => { if let Err(err) = self.framed.write(msg) { - self.dispatch_state = - FramedState::FramedError(ServiceError::Encoder(err)); + self.state = FramedState::FramedError(ServiceError::Encoder(err)); return true; } - } - Poll::Ready(Some(Ok(Message::Close))) => { - self.dispatch_state.stop(None); - return true; - } - Poll::Ready(Some(Ok(Message::WaitClose(tx)))) => { - self.dispatch_state.stop(Some(tx)); - return true; + continue; } Poll::Ready(Some(Err(err))) => { - self.dispatch_state = FramedState::Error(ServiceError::Service(err)); + self.state = FramedState::Error(ServiceError::Service(err)); return true; } - Poll::Ready(None) | Poll::Pending => break, + Poll::Ready(None) | Poll::Pending => (), } + + if self.sink.is_some() { + match Pin::new(self.sink.as_mut().unwrap()).poll_next(cx) { + Poll::Ready(Some(msg)) => { + if let Err(err) = self.framed.write(msg) { + self.state = + FramedState::FramedError(ServiceError::Encoder(err)); + return true; + } + continue; + } + Poll::Ready(None) => { + let _ = self.sink.take(); + self.state = FramedState::FlushAndStop; + return true; + } + Poll::Pending => (), + } + } + break; } if !self.framed.is_write_buf_empty() { @@ -213,8 +175,7 @@ where Poll::Ready(Ok(_)) => (), Poll::Ready(Err(err)) => { debug!("Error sending data: {:?}", err); - self.dispatch_state = - FramedState::FramedError(ServiceError::Encoder(err)); + self.state = FramedState::FramedError(ServiceError::Encoder(err)); return true; } } @@ -229,14 +190,16 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll>> { - match self.dispatch_state { - FramedState::Processing => { - if self.poll_read(cx) || self.poll_write(cx) { - self.poll(cx) + match self.state { + FramedState::Processing => loop { + let read = self.poll_read(cx); + let write = self.poll_write(cx); + if read || write { + continue; } else { - Poll::Pending + return Poll::Pending; } - } + }, FramedState::Error(_) => { // flush write buffer if !self.framed.is_write_buf_empty() { @@ -244,12 +207,21 @@ where return Poll::Pending; } } - if let Some(ref disconnect) = self.disconnect { - (&*disconnect)(self.state.clone(), true); - } - Poll::Ready(Err(self.dispatch_state.take_error())) + Poll::Ready(Err(self.state.take_error())) } - FramedState::FlushAndStop(ref mut vec) => { + FramedState::FlushAndStop => { + // drain service responses + match Pin::new(&mut self.rx).poll_next(cx) { + Poll::Ready(Some(Ok(msg))) => { + if let Err(_) = self.framed.write(msg) { + return Poll::Ready(Ok(())); + } + } + Poll::Ready(Some(Err(_))) => return Poll::Ready(Ok(())), + Poll::Ready(None) | Poll::Pending => (), + } + + // flush io if !self.framed.is_write_buf_empty() { match self.framed.flush(cx) { Poll::Ready(Err(err)) => { @@ -261,26 +233,10 @@ where Poll::Ready(_) => (), } }; - for tx in vec.drain(..) { - let _ = tx.send(()); - } - if let Some(ref disconnect) = self.disconnect { - (&*disconnect)(self.state.clone(), false); - } - Poll::Ready(Ok(())) - } - FramedState::FramedError(_) => { - if let Some(ref disconnect) = self.disconnect { - (&*disconnect)(self.state.clone(), true); - } - Poll::Ready(Err(self.dispatch_state.take_framed_error())) - } - FramedState::Stopping => { - if let Some(ref disconnect) = self.disconnect { - (&*disconnect)(self.state.clone(), false); - } Poll::Ready(Ok(())) } + FramedState::FramedError(_) => Poll::Ready(Err(self.state.take_framed_error())), + FramedState::Stopping => Poll::Ready(Ok(())), } } } diff --git a/actix-ioframe/src/item.rs b/actix-ioframe/src/item.rs deleted file mode 100644 index 42008d9e..00000000 --- a/actix-ioframe/src/item.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::fmt; -use std::ops::{Deref, DerefMut}; - -use actix_codec::{Decoder, Encoder}; - -use crate::sink::Sink; - -pub struct Item { - state: St, - sink: Sink<::Item>, - item: ::Item, -} - -impl Item -where - Codec: Encoder + Decoder, -{ - pub(crate) fn new( - state: St, - sink: Sink<::Item>, - item: ::Item, - ) -> Self { - Item { state, sink, item } - } - - #[inline] - pub fn state(&self) -> &St { - &self.state - } - - #[inline] - pub fn state_mut(&mut self) -> &mut St { - &mut self.state - } - - #[inline] - pub fn sink(&self) -> &Sink<::Item> { - &self.sink - } - - #[inline] - pub fn into_inner(self) -> ::Item { - self.item - } - - #[inline] - pub fn into_parts(self) -> (St, Sink<::Item>, ::Item) { - (self.state, self.sink, self.item) - } -} - -impl Deref for Item -where - Codec: Encoder + Decoder, -{ - type Target = ::Item; - - #[inline] - fn deref(&self) -> &::Item { - &self.item - } -} - -impl DerefMut for Item -where - Codec: Encoder + Decoder, -{ - #[inline] - fn deref_mut(&mut self) -> &mut ::Item { - &mut self.item - } -} - -impl fmt::Debug for Item -where - Codec: Encoder + Decoder, - ::Item: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("Item").field(&self.item).finish() - } -} diff --git a/actix-ioframe/src/lib.rs b/actix-ioframe/src/lib.rs index cb4f4ae2..3f82a29f 100644 --- a/actix-ioframe/src/lib.rs +++ b/actix-ioframe/src/lib.rs @@ -1,15 +1,11 @@ -#![deny(rust_2018_idioms, warnings)] +// #![deny(rust_2018_idioms, warnings)] #![allow(clippy::type_complexity, clippy::too_many_arguments)] mod connect; mod dispatcher; mod error; -mod item; mod service; -mod sink; pub use self::connect::{Connect, ConnectResult}; pub use self::error::ServiceError; -pub use self::item::Item; -pub use self::service::{Builder, NewServiceBuilder, ServiceBuilder}; -pub use self::sink::Sink; +pub use self::service::{Builder, FactoryBuilder}; diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs index a6863a30..c9564d7a 100644 --- a/actix-ioframe/src/service.rs +++ b/actix-ioframe/src/service.rs @@ -4,108 +4,57 @@ use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; +use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; -use actix_utils::mpsc; use either::Either; -use futures::future::{FutureExt, LocalBoxFuture}; +use futures::{ready, Stream}; use pin_project::project; use crate::connect::{Connect, ConnectResult}; -use crate::dispatcher::{Dispatcher, Message}; +use crate::dispatcher::Dispatcher; use crate::error::ServiceError; -use crate::item::Item; -use crate::sink::Sink; -type RequestItem = Item; +type RequestItem = ::Item; type ResponseItem = Option<::Item>; -type ServiceResult = Result::Item>, E>; /// Service builder - structure that follows the builder pattern /// for building instances for framed services. -pub struct Builder(PhantomData<(St, Codec)>); - -impl Default for Builder { - fn default() -> Builder { - Builder::new() - } -} - -impl Builder { - pub fn new() -> Builder { - Builder(PhantomData) - } - - /// Construct framed handler service with specified connect service - pub fn service(self, connect: F) -> ServiceBuilder - where - F: IntoService, - Io: AsyncRead + AsyncWrite, - C: Service, Response = ConnectResult>, - Codec: Decoder + Encoder, - { - ServiceBuilder { - connect: connect.into_service(), - disconnect: None, - _t: PhantomData, - } - } - - /// Construct framed handler new service with specified connect service - pub fn factory(self, connect: F) -> NewServiceBuilder - where - F: IntoServiceFactory, - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - C::Error: 'static, - C::Future: 'static, - Codec: Decoder + Encoder, - { - NewServiceBuilder { - connect: connect.into_factory(), - disconnect: None, - _t: PhantomData, - } - } -} - -pub struct ServiceBuilder { +pub struct Builder { connect: C, - disconnect: Option>, - _t: PhantomData<(St, Io, Codec)>, + _t: PhantomData<(St, Io, Codec, Out)>, } -impl ServiceBuilder +impl Builder where - St: Clone, - C: Service, Response = ConnectResult>, + C: Service, Response = ConnectResult>, Io: AsyncRead + AsyncWrite, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { - /// Callback to execute on disconnect - /// - /// Second parameter indicates error occured during disconnect. - pub fn disconnect(mut self, disconnect: F) -> Self + /// Construct framed handler service with specified connect service + pub fn new(connect: F) -> Builder where - F: Fn(St, bool) + 'static, + F: IntoService, + Io: AsyncRead + AsyncWrite, + C: Service, Response = ConnectResult>, + Codec: Decoder + Encoder, + Out: Stream::Item>, { - self.disconnect = Some(Rc::new(disconnect)); - self + Builder { + connect: connect.into_service(), + _t: PhantomData, + } } /// Provide stream items handler service and construct service factory. - pub fn finish(self, service: F) -> FramedServiceImpl + pub fn build(self, service: F) -> FramedServiceImpl where F: IntoServiceFactory, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, @@ -114,211 +63,261 @@ where FramedServiceImpl { connect: self.connect, handler: Rc::new(service.into_factory()), - disconnect: self.disconnect.clone(), _t: PhantomData, } } } -pub struct NewServiceBuilder { +/// Service builder - structure that follows the builder pattern +/// for building instances for framed services. +pub struct FactoryBuilder { connect: C, - disconnect: Option>, - _t: PhantomData<(St, Io, Codec)>, + _t: PhantomData<(St, Io, Codec, Out)>, } -impl NewServiceBuilder +impl FactoryBuilder where - St: Clone, Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, - Response = ConnectResult, + Response = ConnectResult, >, - C::Error: 'static, - C::Future: 'static, Codec: Decoder + Encoder, - ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { - /// Callback to execute on disconnect - /// - /// Second parameter indicates error occured during disconnect. - pub fn disconnect(mut self, disconnect: F) -> Self + /// Construct framed handler new service with specified connect service + pub fn new(connect: F) -> FactoryBuilder where - F: Fn(St, bool) + 'static, + F: IntoServiceFactory, + Io: AsyncRead + AsyncWrite, + C: ServiceFactory< + Config = (), + Request = Connect, + Response = ConnectResult, + >, + Codec: Decoder + Encoder, + Out: Stream::Item> + Unpin, { - self.disconnect = Some(Rc::new(disconnect)); - self + FactoryBuilder { + connect: connect.into_factory(), + _t: PhantomData, + } } - pub fn finish(self, service: F) -> FramedService + pub fn build(self, service: F) -> FramedService where F: IntoServiceFactory, T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - > + 'static, + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, { FramedService { connect: self.connect, handler: Rc::new(service.into_factory()), - disconnect: self.disconnect, _t: PhantomData, } } } -pub struct FramedService { +pub struct FramedService { connect: C, handler: Rc, - disconnect: Option>, - _t: PhantomData<(St, Io, Codec, Cfg)>, + _t: PhantomData<(St, Io, Codec, Out, Cfg)>, } -impl ServiceFactory for FramedService +impl ServiceFactory + for FramedService where - St: Clone + 'static, Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, - Response = ConnectResult, + Response = ConnectResult, >, - C::Error: 'static, - C::Future: 'static, T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - > + 'static, + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + ::Error: 'static, ::Future: 'static, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { type Config = Cfg; type Request = Io; type Response = (); type Error = ServiceError; type InitError = C::InitError; - type Service = FramedServiceImpl; - type Future = LocalBoxFuture<'static, Result>; + type Service = FramedServiceImpl; + type Future = FramedServiceResponse; fn new_service(&self, _: Cfg) -> Self::Future { - let handler = self.handler.clone(); - let disconnect = self.disconnect.clone(); - // create connect service and then create service impl - self.connect - .new_service(()) - .map(move |result| { - result.map(move |connect| FramedServiceImpl { - connect, - handler, - disconnect, - _t: PhantomData, - }) - }) - .boxed_local() + FramedServiceResponse { + fut: self.connect.new_service(()), + handler: self.handler.clone(), + } } } -pub struct FramedServiceImpl { - connect: C, - handler: Rc, - disconnect: Option>, - _t: PhantomData<(St, Io, Codec)>, -} - -impl Service for FramedServiceImpl +#[pin_project::pin_project] +pub struct FramedServiceResponse where - St: Clone, Io: AsyncRead + AsyncWrite, - C: Service, Response = ConnectResult>, - C::Error: 'static, + C: ServiceFactory< + Config = (), + Request = Connect, + Response = ConnectResult, + >, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, + ::Error: 'static, ::Future: 'static, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, +{ + #[pin] + fut: C::Future, + handler: Rc, +} + +impl Future for FramedServiceResponse +where + Io: AsyncRead + AsyncWrite, + C: ServiceFactory< + Config = (), + Request = Connect, + Response = ConnectResult, + >, + T: ServiceFactory< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + ::Error: 'static, + ::Future: 'static, + Codec: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, +{ + type Output = Result, C::InitError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let connect = ready!(this.fut.poll(cx))?; + + Poll::Ready(Ok(FramedServiceImpl { + connect, + handler: this.handler.clone(), + _t: PhantomData, + })) + } +} + +pub struct FramedServiceImpl { + connect: C, + handler: Rc, + _t: PhantomData<(St, Io, Codec, Out)>, +} + +impl Service for FramedServiceImpl +where + Io: AsyncRead + AsyncWrite, + C: Service, Response = ConnectResult>, + T: ServiceFactory< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + ::Error: 'static, + ::Future: 'static, + Codec: Decoder + Encoder, + ::Item: 'static, + ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { type Request = Io; type Response = (); type Error = ServiceError; - type Future = FramedServiceImplResponse; + type Future = FramedServiceImplResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.connect.poll_ready(cx).map_err(|e| e.into()) } fn call(&mut self, req: Io) -> Self::Future { - let (tx, rx) = mpsc::channel(); - let sink = Sink::new(Rc::new(move |msg| { - let _ = tx.send(Ok(msg)); - })); FramedServiceImplResponse { inner: FramedServiceImplResponseInner::Connect( - self.connect.call(Connect::new(req, sink.clone())), + self.connect.call(Connect::new(req)), self.handler.clone(), - self.disconnect.clone(), - Some(rx), ), } } } #[pin_project::pin_project] -pub struct FramedServiceImplResponse +pub struct FramedServiceImplResponse where - St: Clone, - C: Service, Response = ConnectResult>, - C::Error: 'static, + C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, + ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { #[pin] - inner: FramedServiceImplResponseInner, + inner: FramedServiceImplResponseInner, } -impl Future for FramedServiceImplResponse +impl Future for FramedServiceImplResponse where - St: Clone, - C: Service, Response = ConnectResult>, - C::Error: 'static, + C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, + ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { type Output = Result<(), ServiceError>; @@ -338,94 +337,71 @@ where } #[pin_project::pin_project] -enum FramedServiceImplResponseInner +enum FramedServiceImplResponseInner where - St: Clone, - C: Service, Response = ConnectResult>, - C::Error: 'static, + C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, + ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { - Connect( - #[pin] C::Future, - Rc, - Option>, - Option>>, - ), - Handler( - #[pin] T::Future, - Option>, - Option>, - Option>>, - ), - Dispatcher(Dispatcher), + Connect(#[pin] C::Future, Rc), + Handler(#[pin] T::Future, Option>, Option), + Dispatcher(Dispatcher), } -impl FramedServiceImplResponseInner +impl FramedServiceImplResponseInner where - St: Clone, - C: Service, Response = ConnectResult>, - C::Error: 'static, + C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, + ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, + Out: Stream::Item> + Unpin, { #[project] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Either< - FramedServiceImplResponseInner, + FramedServiceImplResponseInner, Poll>>, > { #[project] match self.project() { - FramedServiceImplResponseInner::Connect(fut, handler, disconnect, rx) => { - match fut.poll(cx) { - Poll::Ready(Ok(res)) => { - Either::Left(FramedServiceImplResponseInner::Handler( - handler.new_service(res.state.clone()), - Some(res), - disconnect.take(), - rx.take(), - )) - } - Poll::Pending => Either::Right(Poll::Pending), - Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), - } - } - FramedServiceImplResponseInner::Handler(fut, res, disconnect, rx) => { + FramedServiceImplResponseInner::Connect(fut, handler) => match fut.poll(cx) { + Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler( + handler.new_service(res.state), + Some(res.framed), + res.out, + )), + Poll::Pending => Either::Right(Poll::Pending), + Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), + }, + FramedServiceImplResponseInner::Handler(fut, framed, out) => { match fut.poll(cx) { Poll::Ready(Ok(handler)) => { - let res = res.take().unwrap(); Either::Left(FramedServiceImplResponseInner::Dispatcher( - Dispatcher::new( - res.framed, - res.state, - handler, - res.sink, - rx.take().unwrap(), - disconnect.take(), - ), + Dispatcher::new(framed.take().unwrap(), handler, out.take()), )) } Poll::Pending => Either::Right(Poll::Pending), diff --git a/actix-ioframe/src/sink.rs b/actix-ioframe/src/sink.rs deleted file mode 100644 index eb4ff703..00000000 --- a/actix-ioframe/src/sink.rs +++ /dev/null @@ -1,45 +0,0 @@ -use std::fmt; -use std::rc::Rc; - -use actix_utils::oneshot; -use futures::future::{Future, FutureExt}; - -use crate::dispatcher::Message; - -pub struct Sink(Rc)>); - -impl Clone for Sink { - fn clone(&self) -> Self { - Sink(self.0.clone()) - } -} - -impl Sink { - pub(crate) fn new(tx: Rc)>) -> Self { - Sink(tx) - } - - /// Close connection - pub fn close(&self) { - (self.0)(Message::Close); - } - - /// Close connection - pub fn wait_close(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - (self.0)(Message::WaitClose(tx)); - - rx.map(|_| ()) - } - - /// Send item - pub fn send(&self, item: T) { - (self.0)(Message::Item(item)); - } -} - -impl fmt::Debug for Sink { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Sink").finish() - } -} diff --git a/actix-ioframe/tests/test_server.rs b/actix-ioframe/tests/test_server.rs index 0a6746f8..b1f7f301 100644 --- a/actix-ioframe/tests/test_server.rs +++ b/actix-ioframe/tests/test_server.rs @@ -1,43 +1,49 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::Duration; +use std::cell::Cell; +use std::rc::Rc; use actix_codec::BytesCodec; -use actix_rt::time::delay_for; -use actix_service::{fn_service, Service}; +use actix_service::{fn_factory_with_config, fn_service, IntoService, Service}; use actix_testing::TestServer; +use actix_utils::mpsc; +use bytes::{Bytes, BytesMut}; use futures::future::ok; -use actix_ioframe::{Builder, Connect}; +use actix_ioframe::{Builder, Connect, FactoryBuilder}; #[derive(Clone)] -struct State; +struct State(Option>); #[actix_rt::test] -async fn test_disconnect() -> std::io::Result<()> { - let disconnect = Arc::new(AtomicBool::new(false)); - let disconnect1 = disconnect.clone(); +async fn test_basic() { + let client_item = Rc::new(Cell::new(false)); let srv = TestServer::with(move || { - let disconnect1 = disconnect1.clone(); - - Builder::new() - .factory(fn_service(|conn: Connect<_, _>| { - ok(conn.codec(BytesCodec).state(State)) - })) - .disconnect(move |_, _| { - disconnect1.store(true, Ordering::Relaxed); - }) - .finish(fn_service(|_t| ok(None))) + FactoryBuilder::new(fn_service(|conn: Connect<_, _>| { + ok(conn.codec(BytesCodec).state(State(None))) + })) + // echo + .build(fn_service(|t: BytesMut| ok(Some(t.freeze())))) }); - let mut client = Builder::new() - .service(|conn: Connect<_, _>| { - let conn = conn.codec(BytesCodec).state(State); - conn.sink().close(); - ok(conn) + let item = client_item.clone(); + let mut client = Builder::new(fn_service(move |conn: Connect<_, _>| { + async move { + let (tx, rx) = mpsc::channel(); + let _ = tx.send(Bytes::from_static(b"Hello")); + Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx)))) + } + })) + .build(fn_factory_with_config(move |mut cfg: State| { + let item = item.clone(); + ok((move |t: BytesMut| { + assert_eq!(t.freeze(), Bytes::from_static(b"Hello")); + item.set(true); + // drop Sender, which will close connection + cfg.0.take(); + ok::<_, ()>(None) }) - .finish(fn_service(|_t| ok(None))); + .into_service()) + })); let conn = actix_connect::default_connector() .call(actix_connect::Connect::with(String::new(), srv.addr())) @@ -45,8 +51,5 @@ async fn test_disconnect() -> std::io::Result<()> { .unwrap(); client.call(conn.into_parts().0).await.unwrap(); - let _ = delay_for(Duration::from_millis(100)).await; - assert!(disconnect.load(Ordering::Relaxed)); - - Ok(()) + assert!(client_item.get()); }