From c7a8743bf97dd397e8d1924b84a1f7f422a74432 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 11 Dec 2019 16:44:09 +0600 Subject: [PATCH] remove E param --- actix-ioframe/CHANGES.md | 4 + actix-ioframe/Cargo.toml | 2 +- actix-ioframe/src/connect.rs | 25 ++-- actix-ioframe/src/dispatcher.rs | 24 ++-- actix-ioframe/src/item.rs | 24 ++-- actix-ioframe/src/service.rs | 190 ++++++++++++----------------- actix-ioframe/src/sink.rs | 19 +-- actix-ioframe/tests/test_server.rs | 4 +- 8 files changed, 130 insertions(+), 162 deletions(-) diff --git a/actix-ioframe/CHANGES.md b/actix-ioframe/CHANGES.md index 28fb886d..03d710b7 100644 --- a/actix-ioframe/CHANGES.md +++ b/actix-ioframe/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.4.0] - 2019-12-1 + +* Remove `E` param + ## [0.3.0-alpha.3] - 2019-12-07 * Migrate to tokio 0.2 diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 1df73c98..1fb4438d 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-ioframe" -version = "0.3.0" +version = "0.4.0" authors = ["Nikolay Kim "] description = "Actix framed service" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs index f7534413..29fcb1e2 100644 --- a/actix-ioframe/src/connect.rs +++ b/actix-ioframe/src/connect.rs @@ -7,21 +7,21 @@ use futures::Stream; use crate::sink::Sink; -pub struct Connect +pub struct Connect where Codec: Encoder + Decoder, { io: Io, - sink: Sink<::Item, Err>, + sink: Sink<::Item>, _t: PhantomData<(St, Codec)>, } -impl Connect +impl Connect where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, { - pub(crate) fn new(io: Io, sink: Sink<::Item, Err>) -> Self { + pub(crate) fn new(io: Io, sink: Sink<::Item>) -> Self { Self { io, sink, @@ -29,7 +29,7 @@ where } } - pub fn codec(self, codec: Codec) -> ConnectResult { + pub fn codec(self, codec: Codec) -> ConnectResult { ConnectResult { state: (), sink: self.sink, @@ -39,15 +39,15 @@ where } #[pin_project::pin_project] -pub struct ConnectResult { +pub struct ConnectResult { pub(crate) state: St, pub(crate) framed: Framed, - pub(crate) sink: Sink<::Item, Err>, + pub(crate) sink: Sink<::Item>, } -impl ConnectResult { +impl ConnectResult { #[inline] - pub fn sink(&self) -> &Sink<::Item, Err> { + pub fn sink(&self) -> &Sink<::Item> { &self.sink } @@ -62,7 +62,7 @@ impl ConnectResult { } #[inline] - pub fn state(self, state: S) -> ConnectResult { + pub fn state(self, state: S) -> ConnectResult { ConnectResult { state, framed: self.framed, @@ -71,7 +71,7 @@ impl ConnectResult { } } -impl Stream for ConnectResult +impl Stream for ConnectResult where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, @@ -83,8 +83,7 @@ where } } -impl futures::Sink<::Item> - for ConnectResult +impl futures::Sink<::Item> for ConnectResult where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index 8db9a436..965215f7 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -13,7 +13,7 @@ use crate::error::ServiceError; use crate::item::Item; use crate::sink::Sink; -type Request = Item; +type Request = Item; type Response = ::Item; pub(crate) enum Message { @@ -25,10 +25,10 @@ pub(crate) enum Message { /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. #[pin_project::pin_project] -pub(crate) struct Dispatcher +pub(crate) struct Dispatcher where St: Clone, - S: Service, Response = Option>, Error = E>, + S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, @@ -37,19 +37,19 @@ where ::Error: std::fmt::Debug, { service: S, - sink: Sink<::Item, E>, + sink: Sink<::Item>, state: St, dispatch_state: FramedState, framed: Framed, - rx: mpsc::Receiver::Item>, E>>, - tx: mpsc::Sender::Item>, E>>, + rx: mpsc::Receiver::Item>, S::Error>>, + tx: mpsc::Sender::Item>, S::Error>>, disconnect: Option>, } -impl Dispatcher +impl Dispatcher where St: Clone, - S: Service, Response = Option>, Error = E>, + S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, @@ -61,8 +61,8 @@ where framed: Framed, state: St, service: F, - sink: Sink<::Item, E>, - rx: mpsc::Receiver::Item>, E>>, + sink: Sink<::Item>, + rx: mpsc::Receiver::Item>, S::Error>>, disconnect: Option>, ) -> Self { let tx = rx.sender(); @@ -126,10 +126,10 @@ impl FramedState { } } -impl Dispatcher +impl Dispatcher where St: Clone, - S: Service, Response = Option>, Error = E>, + S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, diff --git a/actix-ioframe/src/item.rs b/actix-ioframe/src/item.rs index 03db8131..42008d9e 100644 --- a/actix-ioframe/src/item.rs +++ b/actix-ioframe/src/item.rs @@ -5,19 +5,19 @@ use actix_codec::{Decoder, Encoder}; use crate::sink::Sink; -pub struct Item { +pub struct Item { state: St, - sink: Sink<::Item, E>, + sink: Sink<::Item>, item: ::Item, } -impl Item +impl Item where Codec: Encoder + Decoder, { pub(crate) fn new( state: St, - sink: Sink<::Item, E>, + sink: Sink<::Item>, item: ::Item, ) -> Self { Item { state, sink, item } @@ -34,7 +34,7 @@ where } #[inline] - pub fn sink(&self) -> &Sink<::Item, E> { + pub fn sink(&self) -> &Sink<::Item> { &self.sink } @@ -44,18 +44,12 @@ where } #[inline] - pub fn into_parts( - self, - ) -> ( - St, - Sink<::Item, E>, - ::Item, - ) { + pub fn into_parts(self) -> (St, Sink<::Item>, ::Item) { (self.state, self.sink, self.item) } } -impl Deref for Item +impl Deref for Item where Codec: Encoder + Decoder, { @@ -67,7 +61,7 @@ where } } -impl DerefMut for Item +impl DerefMut for Item where Codec: Encoder + Decoder, { @@ -77,7 +71,7 @@ where } } -impl fmt::Debug for Item +impl fmt::Debug for Item where Codec: Encoder + Decoder, ::Item: fmt::Debug, diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs index 82576f04..ee781970 100644 --- a/actix-ioframe/src/service.rs +++ b/actix-ioframe/src/service.rs @@ -17,7 +17,7 @@ use crate::error::ServiceError; use crate::item::Item; use crate::sink::Sink; -type RequestItem = Item; +type RequestItem = Item; type ResponseItem = Option<::Item>; type ServiceResult = Result::Item>, E>; @@ -37,15 +37,11 @@ impl Builder { } /// Construct framed handler service with specified connect service - pub fn service(self, connect: F) -> ServiceBuilder + pub fn service(self, connect: F) -> ServiceBuilder where F: IntoService, Io: AsyncRead + AsyncWrite, - C: Service< - Request = Connect, - Response = ConnectResult, - Error = E, - >, + C: Service, Response = ConnectResult>, Codec: Decoder + Encoder, { ServiceBuilder { @@ -56,17 +52,16 @@ impl Builder { } /// Construct framed handler new service with specified connect service - pub fn factory(self, connect: F) -> NewServiceBuilder + pub fn factory(self, connect: F) -> NewServiceBuilder where F: IntoServiceFactory, - E: 'static, Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), - Request = Connect, - Response = ConnectResult, - Error = E, + Request = Connect, + Response = ConnectResult, >, + C::Error: 'static, C::Future: 'static, Codec: Decoder + Encoder, { @@ -78,20 +73,16 @@ impl Builder { } } -pub struct ServiceBuilder { +pub struct ServiceBuilder { connect: C, disconnect: Option>, - _t: PhantomData<(St, Io, Codec, Err)>, + _t: PhantomData<(St, Io, Codec)>, } -impl ServiceBuilder +impl ServiceBuilder where St: Clone, - C: Service< - Request = Connect, - Response = ConnectResult, - Error = Err, - >, + C: Service, Response = ConnectResult>, Io: AsyncRead + AsyncWrite, Codec: Decoder + Encoder, ::Item: 'static, @@ -109,15 +100,15 @@ where } /// Provide stream items handler service and construct service factory. - pub fn finish(self, service: F) -> FramedServiceImpl + pub fn finish(self, service: F) -> FramedServiceImpl where F: IntoServiceFactory, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, >, { FramedServiceImpl { @@ -129,23 +120,22 @@ where } } -pub struct NewServiceBuilder { +pub struct NewServiceBuilder { connect: C, disconnect: Option>, - _t: PhantomData<(St, Io, Codec, Err)>, + _t: PhantomData<(St, Io, Codec)>, } -impl NewServiceBuilder +impl NewServiceBuilder where St: Clone, Io: AsyncRead + AsyncWrite, - Err: 'static, C: ServiceFactory< Config = (), - Request = Connect, - Response = ConnectResult, - Error = Err, + Request = Connect, + Response = ConnectResult, >, + C::Error: 'static, C::Future: 'static, Codec: Decoder + Encoder, ::Item: 'static, @@ -162,15 +152,15 @@ where self } - pub fn finish(self, service: F) -> FramedService + pub fn finish(self, service: F) -> FramedService where F: IntoServiceFactory, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, > + 'static, { FramedService { @@ -182,34 +172,32 @@ where } } -pub struct FramedService { +pub struct FramedService { connect: C, handler: Rc, disconnect: Option>, - _t: PhantomData<(St, Io, Codec, Err, Cfg)>, + _t: PhantomData<(St, Io, Codec, Cfg)>, } -impl ServiceFactory - for FramedService +impl ServiceFactory for FramedService where St: Clone + 'static, Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), - Request = Connect, - Response = ConnectResult, - Error = Err, + Request = Connect, + Response = ConnectResult, >, + C::Error: 'static, C::Future: 'static, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, > + 'static, ::Future: 'static, - Err: 'static, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, @@ -219,7 +207,7 @@ where type Response = (); type Error = ServiceError; type InitError = C::InitError; - type Service = FramedServiceImpl; + type Service = FramedServiceImpl; type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: Cfg) -> Self::Future { @@ -241,29 +229,25 @@ where } } -pub struct FramedServiceImpl { +pub struct FramedServiceImpl { connect: C, handler: Rc, disconnect: Option>, - _t: PhantomData<(St, Io, Codec, Err)>, + _t: PhantomData<(St, Io, Codec)>, } -impl Service for FramedServiceImpl +impl Service for FramedServiceImpl where St: Clone, Io: AsyncRead + AsyncWrite, - C: Service< - Request = Connect, - Response = ConnectResult, - Error = Err, - >, - Err: 'static, + C: Service, Response = ConnectResult>, + C::Error: 'static, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, >, ::Future: 'static, Codec: Decoder + Encoder, @@ -272,8 +256,8 @@ where { type Request = Io; type Response = (); - type Error = ServiceError; - type Future = FramedServiceImplResponse; + type Error = ServiceError; + type Future = FramedServiceImplResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.connect.poll_ready(cx).map_err(|e| e.into()) @@ -281,7 +265,9 @@ where fn call(&mut self, req: Io) -> Self::Future { let (tx, rx) = mpsc::channel(); - let sink = Sink::new(tx); + let sink = Sink::new(Rc::new(move |msg| { + let _ = tx.send(Ok(msg)); + })); FramedServiceImplResponse { inner: FramedServiceImplResponseInner::Connect( self.connect.call(Connect::new(req, sink.clone())), @@ -294,21 +280,17 @@ where } #[pin_project::pin_project] -pub struct FramedServiceImplResponse +pub struct FramedServiceImplResponse where St: Clone, - C: Service< - Request = Connect, - Response = ConnectResult, - Error = Err, - >, - Err: 'static, + C: Service, Response = ConnectResult>, + C::Error: 'static, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, >, ::Future: 'static, Io: AsyncRead + AsyncWrite, @@ -317,24 +299,20 @@ where ::Error: std::fmt::Debug, { #[pin] - inner: FramedServiceImplResponseInner, + inner: FramedServiceImplResponseInner, } -impl Future for FramedServiceImplResponse +impl Future for FramedServiceImplResponse where St: Clone, - C: Service< - Request = Connect, - Response = ConnectResult, - Error = Err, - >, - Err: 'static, + C: Service, Response = ConnectResult>, + C::Error: 'static, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, >, ::Future: 'static, Io: AsyncRead + AsyncWrite, @@ -342,7 +320,7 @@ where ::Item: 'static, ::Error: std::fmt::Debug, { - type Output = Result<(), ServiceError>; + type Output = Result<(), ServiceError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); @@ -360,21 +338,17 @@ where } #[pin_project::pin_project] -enum FramedServiceImplResponseInner +enum FramedServiceImplResponseInner where St: Clone, - C: Service< - Request = Connect, - Response = ConnectResult, - Error = Err, - >, - Err: 'static, + C: Service, Response = ConnectResult>, + C::Error: 'static, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, >, ::Future: 'static, Io: AsyncRead + AsyncWrite, @@ -386,32 +360,28 @@ where #[pin] C::Future, Rc, Option>, - Option>>, + Option>>, ), Handler( #[pin] T::Future, - Option>, + Option>, Option>, - Option>>, + Option>>, ), - Dispatcher(Dispatcher), + Dispatcher(Dispatcher), } -impl FramedServiceImplResponseInner +impl FramedServiceImplResponseInner where St: Clone, - C: Service< - Request = Connect, - Response = ConnectResult, - Error = Err, - >, - Err: 'static, + C: Service, Response = ConnectResult>, + C::Error: 'static, T: ServiceFactory< Config = St, - Request = RequestItem, + Request = RequestItem, Response = ResponseItem, - Error = Err, - InitError = Err, + Error = C::Error, + InitError = C::Error, >, ::Future: 'static, Io: AsyncRead + AsyncWrite, @@ -424,8 +394,8 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Either< - FramedServiceImplResponseInner, - Poll>>, + FramedServiceImplResponseInner, + Poll>>, > { #[project] match self.project() { diff --git a/actix-ioframe/src/sink.rs b/actix-ioframe/src/sink.rs index 4e0fe025..eb4ff703 100644 --- a/actix-ioframe/src/sink.rs +++ b/actix-ioframe/src/sink.rs @@ -1,43 +1,44 @@ use std::fmt; +use std::rc::Rc; -use actix_utils::{mpsc, oneshot}; +use actix_utils::oneshot; use futures::future::{Future, FutureExt}; use crate::dispatcher::Message; -pub struct Sink(mpsc::Sender, E>>); +pub struct Sink(Rc)>); -impl Clone for Sink { +impl Clone for Sink { fn clone(&self) -> Self { Sink(self.0.clone()) } } -impl Sink { - pub(crate) fn new(tx: mpsc::Sender, E>>) -> Self { +impl Sink { + pub(crate) fn new(tx: Rc)>) -> Self { Sink(tx) } /// Close connection pub fn close(&self) { - let _ = self.0.send(Ok(Message::Close)); + (self.0)(Message::Close); } /// Close connection pub fn wait_close(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.send(Ok(Message::WaitClose(tx))); + (self.0)(Message::WaitClose(tx)); rx.map(|_| ()) } /// Send item pub fn send(&self, item: T) { - let _ = self.0.send(Ok(Message::Item(item))); + (self.0)(Message::Item(item)); } } -impl fmt::Debug for Sink { +impl fmt::Debug for Sink { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Sink").finish() } diff --git a/actix-ioframe/tests/test_server.rs b/actix-ioframe/tests/test_server.rs index 35bdb983..0a6746f8 100644 --- a/actix-ioframe/tests/test_server.rs +++ b/actix-ioframe/tests/test_server.rs @@ -22,7 +22,7 @@ async fn test_disconnect() -> std::io::Result<()> { let disconnect1 = disconnect1.clone(); Builder::new() - .factory(fn_service(|conn: Connect<_, _, _>| { + .factory(fn_service(|conn: Connect<_, _>| { ok(conn.codec(BytesCodec).state(State)) })) .disconnect(move |_, _| { @@ -32,7 +32,7 @@ async fn test_disconnect() -> std::io::Result<()> { }); let mut client = Builder::new() - .service(|conn: Connect<_, _, _>| { + .service(|conn: Connect<_, _>| { let conn = conn.codec(BytesCodec).state(State); conn.sink().close(); ok(conn)