From 7404d82a9b71cc9ca74bd96e18d45a7652be6405 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 18 Nov 2019 14:30:04 +0600 Subject: [PATCH] use concrete types --- actix-codec/src/framed.rs | 7 +- actix-connect/src/connect.rs | 2 +- actix-connect/src/lib.rs | 16 +-- actix-ioframe/src/dispatcher.rs | 4 +- actix-server/src/accept.rs | 2 +- actix-server/src/builder.rs | 4 +- actix-server/src/config.rs | 4 +- actix-server/src/service.rs | 2 +- actix-server/src/ssl/openssl.rs | 2 +- actix-server/src/worker.rs | 2 +- actix-service/Cargo.toml | 1 - actix-service/src/and_then.rs | 118 +++++++++-------- actix-service/src/apply.rs | 63 +++++---- actix-service/src/apply_cfg.rs | 118 ++++++++--------- actix-service/src/fn_service.rs | 130 ++++++++---------- actix-service/src/into.rs | 204 ----------------------------- actix-service/src/lib.rs | 83 +++++++++++- actix-service/src/map.rs | 69 +++++----- actix-service/src/map_err.rs | 67 +++++----- actix-service/src/map_init_err.rs | 19 ++- actix-service/src/pipeline.rs | 50 ++----- actix-service/src/then.rs | 119 +++++++++-------- actix-service/src/transform.rs | 133 +++++++++++++++---- actix-service/src/transform_err.rs | 91 ------------- actix-utils/src/framed.rs | 4 +- 25 files changed, 568 insertions(+), 746 deletions(-) delete mode 100644 actix-service/src/into.rs delete mode 100644 actix-service/src/transform_err.rs diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 29892f93..4b6b7f88 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -216,11 +216,8 @@ impl Framed { } impl Framed { - /// Force send item - pub fn force_send( - &mut self, - item: ::Item, - ) -> Result<(), ::Error> + /// Serialize item and Write to the inner buffer + pub fn write(&mut self, item: ::Item) -> Result<(), ::Error> where T: AsyncWrite + Unpin, U: Encoder + Unpin, diff --git a/actix-connect/src/connect.rs b/actix-connect/src/connect.rs index 49e75cf6..1b87f4fe 100644 --- a/actix-connect/src/connect.rs +++ b/actix-connect/src/connect.rs @@ -6,7 +6,7 @@ use std::net::SocketAddr; use either::Either; /// Connect request -pub trait Address { +pub trait Address: Unpin { /// Host name of the request fn host(&self) -> &str; diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index 6663206a..ba5ec7b2 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -20,6 +20,10 @@ pub mod ssl; #[cfg(feature = "uri")] mod uri; +use actix_rt::Arbiter; +use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory}; +use tokio_net::tcp::TcpStream; + pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; pub use trust_dns_resolver::system_conf::read_system_conf; pub use trust_dns_resolver::{error::ResolveError, AsyncResolver}; @@ -30,10 +34,6 @@ pub use self::error::ConnectError; pub use self::resolver::{Resolver, ResolverFactory}; pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; -use actix_rt::Arbiter; -use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory}; -use tokio_net::tcp::TcpStream; - pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver { let (resolver, bg) = AsyncResolver::new(cfg, opts); tokio_executor::current_thread::spawn(bg); @@ -70,7 +70,7 @@ pub fn start_default_resolver() -> AsyncResolver { pub fn new_connector( resolver: AsyncResolver, ) -> impl Service, Response = Connection, Error = ConnectError> -{ + + Clone { pipeline(Resolver::new(resolver)).and_then(TcpConnector::new()) } @@ -83,14 +83,14 @@ pub fn new_connector_factory( Response = Connection, Error = ConnectError, InitError = (), -> { +> + Clone { pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory::new()) } /// Create connector service with default parameters pub fn default_connector( ) -> impl Service, Response = Connection, Error = ConnectError> -{ + + Clone { pipeline(Resolver::default()).and_then(TcpConnector::new()) } @@ -101,6 +101,6 @@ pub fn default_connector_factory() -> impl ServiceFactory< Response = Connection, Error = ConnectError, InitError = (), -> { +> + Clone { pipeline_factory(ResolverFactory::default()).and_then(TcpConnectorFactory::new()) } diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index da9957b3..ec036220 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -330,7 +330,7 @@ where if !buf_empty { match inner.buf.pop_front().unwrap() { Ok(msg) => { - if let Err(err) = framed.force_send(msg) { + if let Err(err) = framed.write(msg) { *dispatch_state = FramedState::FramedError(ServiceError::Encoder(err)); return true; @@ -347,7 +347,7 @@ where if !rx_done && rx.is_some() { match Pin::new(rx.as_mut().unwrap()).poll_next(cx) { Poll::Ready(Some(FramedMessage::Message(msg))) => { - if let Err(err) = framed.force_send(msg) { + if let Err(err) = framed.write(msg) { *dispatch_state = FramedState::FramedError(ServiceError::Encoder(err)); return true; diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index e7b589e5..421aa0ab 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -445,7 +445,7 @@ impl Accept { delay(Instant::now() + Duration::from_millis(510)).await; let _ = r.set_readiness(mio::Ready::readable()); } - .boxed(), + .boxed(), ); return; } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 0b2f313e..1a798c38 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -326,7 +326,7 @@ impl ServerBuilder { async move { Worker::start(rx1, rx2, services, avail, timeout); } - .boxed(), + .boxed(), ); worker @@ -401,7 +401,7 @@ impl ServerBuilder { .await; System::current().stop(); } - .boxed(), + .boxed(), ); } ready(()) diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 8cc99dfb..36e94a37 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -139,7 +139,7 @@ impl InternalServiceFactory for ConfiguredService { } return Ok(res); } - .boxed_local() + .boxed_local() } } @@ -266,6 +266,6 @@ where } }; } - .boxed_local() + .boxed_local() } } diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 8345f17b..2a77c810 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -87,7 +87,7 @@ where let _ = f.await; drop(guard); } - .boxed_local(), + .boxed_local(), ); ok(()) } else { diff --git a/actix-server/src/ssl/openssl.rs b/actix-server/src/ssl/openssl.rs index a9b0debd..69ed6fcd 100644 --- a/actix-server/src/ssl/openssl.rs +++ b/actix-server/src/ssl/openssl.rs @@ -92,7 +92,7 @@ impl Service for OpensslAcceptor let acc = acc; tokio_openssl::accept(&acc, io).await } - .boxed_local(), + .boxed_local(), params: Some(params), } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index e39ac493..a1ba6c78 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -187,7 +187,7 @@ impl Worker { } wrk.await } - .boxed_local(), + .boxed_local(), ); } diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 9741bbfc..5fa51787 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -24,7 +24,6 @@ path = "src/lib.rs" [dependencies] futures = "0.3.1" -pin-project = "0.4.5" [dev-dependencies] tokio = "0.2.0-alpha.5" diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 091fd141..5c1bd17b 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -2,8 +2,6 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use pin_project::pin_project; - use super::{Service, ServiceFactory}; use crate::cell::Cell; @@ -11,14 +9,14 @@ use crate::cell::Cell; /// of another service which completes successfully. /// /// This is created by the `ServiceExt::and_then` method. -pub struct AndThen { +pub struct AndThenService { a: A, b: Cell, } -impl AndThen { +impl AndThenService { /// Create new `AndThen` combinator - pub fn new(a: A, b: B) -> Self + pub(crate) fn new(a: A, b: B) -> Self where A: Service, B: Service, @@ -27,27 +25,29 @@ impl AndThen { } } -impl Clone for AndThen +impl Clone for AndThenService where A: Clone, { fn clone(&self) -> Self { - AndThen { + AndThenService { a: self.a.clone(), b: self.b.clone(), } } } -impl Service for AndThen +impl Service for AndThenService where A: Service, B: Service, + A::Future: Unpin, + B::Future: Unpin, { type Request = A::Request; type Response = B::Response; type Error = A::Error; - type Future = AndThenFuture; + type Future = AndThenServiceResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let not_ready = !self.a.poll_ready(cx)?.is_ready(); @@ -59,30 +59,29 @@ where } fn call(&mut self, req: A::Request) -> Self::Future { - AndThenFuture::new(self.a.call(req), self.b.clone()) + AndThenServiceResponse::new(self.a.call(req), self.b.clone()) } } -#[pin_project] -pub struct AndThenFuture +pub struct AndThenServiceResponse where A: Service, B: Service, { b: Cell, - #[pin] fut_b: Option, - #[pin] fut_a: Option, } -impl AndThenFuture +impl AndThenServiceResponse where A: Service, B: Service, + A::Future: Unpin, + B::Future: Unpin, { fn new(a: A::Future, b: Cell) -> Self { - AndThenFuture { + AndThenServiceResponse { b, fut_a: Some(a), fut_b: None, @@ -90,34 +89,27 @@ where } } -impl Future for AndThenFuture +impl Future for AndThenServiceResponse where A: Service, B: Service, + A::Future: Unpin, + B::Future: Unpin, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let mut this = self.get_mut(); loop { - let mut fut_a = this.fut_a.as_mut(); - let mut fut_b = this.fut_b.as_mut(); - - if let Some(fut) = fut_b.as_mut().as_pin_mut() { - return fut.poll(cx); + if let Some(ref mut fut) = this.fut_b { + return Pin::new(fut).poll(cx); } - match fut_a - .as_mut() - .as_pin_mut() - .expect("Bug in actix-service") - .poll(cx) - { + match Pin::new(&mut this.fut_a.as_mut().expect("Bug in actix-service")).poll(cx) { Poll::Ready(Ok(resp)) => { - fut_a.set(None); - let new_fut = this.b.get_mut().call(resp); - fut_b.set(Some(new_fut)); + let _ = this.fut_a.take(); + this.fut_b = Some(this.b.get_mut().call(resp)); } Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => return Poll::Pending, @@ -126,8 +118,8 @@ where } } -/// `AndThenNewService` new service combinator -pub struct AndThenNewService +/// `.and_then()` service factory combinator +pub struct AndThenServiceFactory where A: ServiceFactory, B: ServiceFactory, @@ -136,7 +128,7 @@ where b: B, } -impl AndThenNewService +impl AndThenServiceFactory where A: ServiceFactory, B: ServiceFactory< @@ -146,13 +138,13 @@ where InitError = A::InitError, >, { - /// Create new `AndThen` combinator + /// Create new `AndThenFactory` combinator pub fn new(a: A, b: B) -> Self { Self { a, b } } } -impl ServiceFactory for AndThenNewService +impl ServiceFactory for AndThenServiceFactory where A: ServiceFactory, B: ServiceFactory< @@ -161,22 +153,28 @@ where Error = A::Error, InitError = A::InitError, >, + A::Future: Unpin, + A::Service: Unpin, + ::Future: Unpin, + B::Future: Unpin, + B::Service: Unpin, + ::Future: Unpin, { type Request = A::Request; type Response = B::Response; type Error = A::Error; type Config = A::Config; - type Service = AndThen; + type Service = AndThenService; type InitError = A::InitError; - type Future = AndThenNewServiceFuture; + type Future = AndThenServiceFactoryResponse; fn new_service(&self, cfg: &A::Config) -> Self::Future { - AndThenNewServiceFuture::new(self.a.new_service(cfg), self.b.new_service(cfg)) + AndThenServiceFactoryResponse::new(self.a.new_service(cfg), self.b.new_service(cfg)) } } -impl Clone for AndThenNewService +impl Clone for AndThenServiceFactory where A: ServiceFactory + Clone, B: ServiceFactory + Clone, @@ -189,28 +187,31 @@ where } } -#[pin_project] -pub struct AndThenNewServiceFuture +pub struct AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, { - #[pin] fut_b: B::Future, - #[pin] fut_a: A::Future, a: Option, b: Option, } -impl AndThenNewServiceFuture +impl AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, + A::Future: Unpin, + A::Service: Unpin, + ::Future: Unpin, + B::Future: Unpin, + B::Service: Unpin, + ::Future: Unpin, { fn new(fut_a: A::Future, fut_b: B::Future) -> Self { - AndThenNewServiceFuture { + AndThenServiceFactoryResponse { fut_a, fut_b, a: None, @@ -219,27 +220,34 @@ where } } -impl Future for AndThenNewServiceFuture +impl Future for AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, + A::Future: Unpin, + A::Service: Unpin, + ::Future: Unpin, + B::Future: Unpin, + B::Service: Unpin, + ::Future: Unpin, { - type Output = Result, A::InitError>; + type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); + let this = self.get_mut(); + if this.a.is_none() { - if let Poll::Ready(service) = this.fut_a.poll(cx)? { - *this.a = Some(service); + if let Poll::Ready(service) = Pin::new(&mut this.fut_a).poll(cx)? { + this.a = Some(service); } } if this.b.is_none() { - if let Poll::Ready(service) = this.fut_b.poll(cx)? { - *this.b = Some(service); + if let Poll::Ready(service) = Pin::new(&mut this.fut_b).poll(cx)? { + this.b = Some(service); } } if this.a.is_some() && this.b.is_some() { - Poll::Ready(Ok(AndThen::new( + Poll::Ready(Ok(AndThenService::new( this.a.take().unwrap(), this.b.take().unwrap(), ))) diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index e4a42f4d..c14ea37e 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -1,4 +1,3 @@ -use pin_project::pin_project; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -7,10 +6,7 @@ use std::task::{Context, Poll}; use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; /// Apply tranform function to a service -pub fn apply_fn( - service: U, - f: F, -) -> impl Service +pub fn apply_fn(service: U, f: F) -> Apply where T: Service, F: FnMut(In, &mut T) -> R, @@ -24,30 +20,21 @@ where pub fn apply_fn_factory( service: U, f: F, -) -> impl ServiceFactory< - Config = T::Config, - Request = In, - Response = Out, - Error = Err, - InitError = T::InitError, -> +) -> ApplyServiceFactory where T: ServiceFactory, F: FnMut(In, &mut T::Service) -> R + Clone, R: Future>, U: IntoServiceFactory, { - ApplyNewService::new(service.into_factory(), f) + ApplyServiceFactory::new(service.into_factory(), f) } -#[doc(hidden)] /// `Apply` service combinator -#[pin_project] -struct Apply +pub struct Apply where T: Service, { - #[pin] service: T, f: F, r: PhantomData<(In, Out, R)>, @@ -89,8 +76,8 @@ where } } -/// `ApplyNewService` new service combinator -struct ApplyNewService +/// `apply()` service factory +pub struct ApplyServiceFactory where T: ServiceFactory, { @@ -99,7 +86,7 @@ where r: PhantomData<(R, In, Out)>, } -impl ApplyNewService +impl ApplyServiceFactory where T: ServiceFactory, F: FnMut(In, &mut T::Service) -> R + Clone, @@ -115,10 +102,11 @@ where } } -impl ServiceFactory for ApplyNewService +impl ServiceFactory for ApplyServiceFactory where T: ServiceFactory, - F: FnMut(In, &mut T::Service) -> R + Clone, + T::Future: Unpin, + F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, R: Future>, { type Request = In; @@ -128,34 +116,32 @@ where type Config = T::Config; type Service = Apply; type InitError = T::InitError; - type Future = ApplyNewServiceFuture; + type Future = ApplyServiceFactoryResponse; fn new_service(&self, cfg: &T::Config) -> Self::Future { - ApplyNewServiceFuture::new(self.service.new_service(cfg), self.f.clone()) + ApplyServiceFactoryResponse::new(self.service.new_service(cfg), self.f.clone()) } } -#[pin_project] -struct ApplyNewServiceFuture +pub struct ApplyServiceFactoryResponse where T: ServiceFactory, F: FnMut(In, &mut T::Service) -> R + Clone, R: Future>, { - #[pin] fut: T::Future, f: Option, r: PhantomData<(In, Out)>, } -impl ApplyNewServiceFuture +impl ApplyServiceFactoryResponse where T: ServiceFactory, F: FnMut(In, &mut T::Service) -> R + Clone, R: Future>, { fn new(fut: T::Future, f: F) -> Self { - ApplyNewServiceFuture { + Self { f: Some(f), fut, r: PhantomData, @@ -163,17 +149,28 @@ where } } -impl Future for ApplyNewServiceFuture +impl Unpin for ApplyServiceFactoryResponse where T: ServiceFactory, - F: FnMut(In, &mut T::Service) -> R + Clone, + T::Future: Unpin, + F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, + R: Future>, +{ +} + +impl Future for ApplyServiceFactoryResponse +where + T: ServiceFactory, + T::Future: Unpin, + F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, R: Future>, { type Output = Result, T::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - if let Poll::Ready(svc) = this.fut.poll(cx)? { + let this = self.get_mut(); + + if let Poll::Ready(svc) = Pin::new(&mut this.fut).poll(cx)? { Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap()))) } else { Poll::Pending diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index 304ae1ee..312dec23 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -3,28 +3,15 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use futures::ready; -use pin_project::pin_project; - use crate::cell::Cell; -use crate::{IntoService, Service, ServiceFactory}; +use crate::{Service, ServiceFactory}; /// Convert `Fn(&Config, &mut Service) -> Future` fn to a NewService -pub fn apply_cfg( - srv: T, - f: F, -) -> impl ServiceFactory< - Config = C, - Request = S::Request, - Response = S::Response, - Error = S::Error, - Service = S, - InitError = E, -> + Clone +pub fn apply_cfg(srv: T, f: F) -> ApplyConfigService where F: FnMut(&C, &mut T) -> R, T: Service, - R: Future>, + R: Future> + Unpin, S: Service, { ApplyConfigService { @@ -39,14 +26,7 @@ where pub fn apply_cfg_factory( srv: T, f: F, -) -> impl ServiceFactory< - Config = C, - Request = S::Request, - Response = S::Response, - Error = S::Error, - Service = S, - InitError = T::InitError, -> + Clone +) -> ApplyConfigServiceFactory where C: Clone, F: FnMut(&C, &mut T::Service) -> R, @@ -55,7 +35,7 @@ where R: Future>, S: Service, { - ApplyConfigNewService { + ApplyConfigServiceFactory { f: Cell::new(f), srv: Cell::new(srv), _t: PhantomData, @@ -63,8 +43,7 @@ where } /// Convert `Fn(&Config) -> Future` fn to NewService\ -#[pin_project] -struct ApplyConfigService +pub struct ApplyConfigService where F: FnMut(&C, &mut T) -> R, T: Service, @@ -72,7 +51,6 @@ where S: Service, { f: Cell, - #[pin] srv: Cell, _t: PhantomData<(C, R, S)>, } @@ -97,7 +75,8 @@ impl ServiceFactory for ApplyConfigService where F: FnMut(&C, &mut T) -> R, T: Service, - R: Future>, + T::Future: Unpin, + R: Future> + Unpin, S: Service, { type Config = C; @@ -107,41 +86,46 @@ where type Service = S; type InitError = E; - type Future = FnNewServiceConfigFut; + type Future = ApplyConfigServiceResponse; fn new_service(&self, cfg: &C) -> Self::Future { - FnNewServiceConfigFut { + ApplyConfigServiceResponse { fut: unsafe { (self.f.get_mut_unsafe())(cfg, self.srv.get_mut_unsafe()) }, _t: PhantomData, } } } -#[pin_project] -struct FnNewServiceConfigFut +pub struct ApplyConfigServiceResponse where R: Future>, S: Service, { - #[pin] fut: R, _t: PhantomData<(S,)>, } -impl Future for FnNewServiceConfigFut +impl Unpin for ApplyConfigServiceResponse where - R: Future>, + R: Future> + Unpin, + S: Service, +{ +} + +impl Future for ApplyConfigServiceResponse +where + R: Future> + Unpin, S: Service, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service())) + Pin::new(&mut self.get_mut().fut).poll(cx) } } /// Convert `Fn(&Config) -> Future` fn to NewService -struct ApplyConfigNewService +pub struct ApplyConfigServiceFactory where C: Clone, F: FnMut(&C, &mut T::Service) -> R, @@ -154,7 +138,7 @@ where _t: PhantomData<(C, R, S)>, } -impl Clone for ApplyConfigNewService +impl Clone for ApplyConfigServiceFactory where C: Clone, F: FnMut(&C, &mut T::Service) -> R, @@ -163,7 +147,7 @@ where S: Service, { fn clone(&self) -> Self { - ApplyConfigNewService { + Self { f: self.f.clone(), srv: self.srv.clone(), _t: PhantomData, @@ -171,13 +155,14 @@ where } } -impl ServiceFactory for ApplyConfigNewService +impl ServiceFactory for ApplyConfigServiceFactory where C: Clone, F: FnMut(&C, &mut T::Service) -> R, T: ServiceFactory, + T::Future: Unpin, T::InitError: From, - R: Future>, + R: Future> + Unpin, S: Service, { type Config = C; @@ -187,10 +172,10 @@ where type Service = S; type InitError = T::InitError; - type Future = ApplyConfigNewServiceFut; + type Future = ApplyConfigServiceFactoryResponse; fn new_service(&self, cfg: &C) -> Self::Future { - ApplyConfigNewServiceFut { + ApplyConfigServiceFactoryResponse { f: self.f.clone(), cfg: cfg.clone(), fut: None, @@ -201,8 +186,7 @@ where } } -#[pin_project] -struct ApplyConfigNewServiceFut +pub struct ApplyConfigServiceFactoryResponse where C: Clone, F: FnMut(&C, &mut T::Service) -> R, @@ -214,45 +198,57 @@ where cfg: C, f: Cell, srv: Option, - #[pin] srv_fut: Option, - #[pin] fut: Option, _t: PhantomData<(S,)>, } -impl Future for ApplyConfigNewServiceFut +impl Unpin for ApplyConfigServiceFactoryResponse where C: Clone, F: FnMut(&C, &mut T::Service) -> R, T: ServiceFactory, + T::Future: Unpin, T::InitError: From, - R: Future>, + R: Future> + Unpin, + S: Service, +{ +} + +impl Future for ApplyConfigServiceFactoryResponse +where + C: Clone, + F: FnMut(&C, &mut T::Service) -> R, + T: ServiceFactory, + T::Future: Unpin, + T::InitError: From, + R: Future> + Unpin, S: Service, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - 'poll: loop { - if let Some(fut) = this.srv_fut.as_mut().as_pin_mut() { - match fut.poll(cx)? { + let this = self.get_mut(); + + loop { + if let Some(ref mut fut) = this.srv_fut { + match Pin::new(fut).poll(cx)? { Poll::Pending => return Poll::Pending, Poll::Ready(srv) => { - this.srv_fut.set(None); - *this.srv = Some(srv); - continue 'poll; + let _ = this.srv_fut.take(); + this.srv = Some(srv); + continue; } } } - if let Some(fut) = this.fut.as_mut().as_pin_mut() { - return Poll::Ready(Ok(ready!(fut.poll(cx))?.into_service())); + if let Some(ref mut fut) = this.fut { + return Pin::new(fut).poll(cx); } else if let Some(ref mut srv) = this.srv { match srv.poll_ready(cx)? { Poll::Ready(_) => { - this.fut.set(Some(this.f.get_mut()(&this.cfg, srv))); - continue 'poll; + this.fut = Some(this.f.get_mut()(&this.cfg, srv)); + continue; } Poll::Pending => return Poll::Pending, } diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 42c9a5b5..ad82c9c9 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -4,72 +4,49 @@ use std::pin::Pin; use std::task::{Context, Poll}; use futures::future::{ok, Ready}; -use pin_project::pin_project; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; /// Create `ServiceFactory` for function that can act as a `Service` pub fn service_fn( f: F, -) -> impl ServiceFactory - + Clone +) -> FnServiceFactory where F: FnMut(Req) -> Fut + Clone, Fut: Future>, { - NewServiceFn::new(f) + FnServiceFactory::new(f) } -pub fn service_fn2( - f: F, -) -> impl Service +pub fn service_fn2(f: F) -> FnService where F: FnMut(Req) -> Fut, Fut: Future>, { - ServiceFn::new(f) + FnService::new(f) } /// Create `ServiceFactory` for function that can produce services -pub fn factory_fn( - f: F, -) -> impl ServiceFactory< - Config = Cfg, - Service = S, - Request = S::Request, - Response = S::Response, - Error = S::Error, - InitError = Err, - Future = Fut, -> +pub fn factory_fn(f: F) -> FnServiceNoConfig where - S: Service, + Srv: Service, F: Fn() -> Fut, - Fut: Future>, + Fut: Future>, { - FnNewServiceNoConfig::new(f) + FnServiceNoConfig::new(f) } /// Create `ServiceFactory` for function that can produce services with configuration -pub fn factory_fn_cfg( - f: F, -) -> impl ServiceFactory< - Config = Cfg, - Service = Srv, - Request = Srv::Request, - Response = Srv::Response, - Error = Srv::Error, - InitError = Err, -> +pub fn factory_fn_cfg(f: F) -> FnServiceConfig where F: Fn(&Cfg) -> Fut, Fut: Future>, Srv: Service, { - FnNewServiceConfig::new(f) + FnServiceConfig::new(f) } -pub struct ServiceFn +pub struct FnService where F: FnMut(Req) -> Fut, Fut: Future>, @@ -78,27 +55,27 @@ where _t: PhantomData, } -impl ServiceFn +impl FnService where F: FnMut(Req) -> Fut, Fut: Future>, { pub(crate) fn new(f: F) -> Self { - ServiceFn { f, _t: PhantomData } + Self { f, _t: PhantomData } } } -impl Clone for ServiceFn +impl Clone for FnService where F: FnMut(Req) -> Fut + Clone, Fut: Future>, { fn clone(&self) -> Self { - ServiceFn::new(self.f.clone()) + Self::new(self.f.clone()) } } -impl Service for ServiceFn +impl Service for FnService where F: FnMut(Req) -> Fut, Fut: Future>, @@ -117,17 +94,17 @@ where } } -impl IntoService> for F +impl IntoService> for F where F: FnMut(Req) -> Fut, Fut: Future>, { - fn into_service(self) -> ServiceFn { - ServiceFn::new(self) + fn into_service(self) -> FnService { + FnService::new(self) } } -struct NewServiceFn +pub struct FnServiceFactory where F: FnMut(Req) -> Fut, Fut: Future>, @@ -136,27 +113,27 @@ where _t: PhantomData<(Req, Cfg)>, } -impl NewServiceFn +impl FnServiceFactory where F: FnMut(Req) -> Fut + Clone, Fut: Future>, { fn new(f: F) -> Self { - NewServiceFn { f, _t: PhantomData } + FnServiceFactory { f, _t: PhantomData } } } -impl Clone for NewServiceFn +impl Clone for FnServiceFactory where F: FnMut(Req) -> Fut + Clone, Fut: Future>, { fn clone(&self) -> Self { - NewServiceFn::new(self.f.clone()) + Self::new(self.f.clone()) } } -impl ServiceFactory for NewServiceFn +impl ServiceFactory for FnServiceFactory where F: FnMut(Req) -> Fut + Clone, Fut: Future>, @@ -166,17 +143,17 @@ where type Error = Err; type Config = Cfg; - type Service = ServiceFn; + type Service = FnService; type InitError = (); type Future = Ready>; fn new_service(&self, _: &Cfg) -> Self::Future { - ok(ServiceFn::new(self.f.clone())) + ok(FnService::new(self.f.clone())) } } /// Convert `Fn(&Config) -> Future` fn to NewService -struct FnNewServiceConfig +pub struct FnServiceConfig where F: Fn(&Cfg) -> Fut, Fut: Future>, @@ -186,21 +163,21 @@ where _t: PhantomData<(Fut, Cfg, Srv, Err)>, } -impl FnNewServiceConfig +impl FnServiceConfig where F: Fn(&Cfg) -> Fut, Fut: Future>, Srv: Service, { - pub fn new(f: F) -> Self { - FnNewServiceConfig { f, _t: PhantomData } + fn new(f: F) -> Self { + FnServiceConfig { f, _t: PhantomData } } } -impl ServiceFactory for FnNewServiceConfig +impl ServiceFactory for FnServiceConfig where F: Fn(&Cfg) -> Fut, - Fut: Future>, + Fut: Future> + Unpin, Srv: Service, { type Request = Srv::Request; @@ -210,62 +187,67 @@ where type Config = Cfg; type Service = Srv; type InitError = Err; - type Future = FnNewServiceConfigFut; + type Future = NewServiceFnConfigFut; fn new_service(&self, cfg: &Cfg) -> Self::Future { - FnNewServiceConfigFut { + NewServiceFnConfigFut { fut: (self.f)(cfg), _t: PhantomData, } } } -#[pin_project] -struct FnNewServiceConfigFut +pub struct NewServiceFnConfigFut where - R: Future>, + R: Future> + Unpin, S: Service, { - #[pin] fut: R, _t: PhantomData<(S,)>, } -impl Future for FnNewServiceConfigFut +impl Unpin for NewServiceFnConfigFut where - R: Future>, + R: Future> + Unpin, + S: Service, +{ +} + +impl Future for NewServiceFnConfigFut +where + R: Future> + Unpin, S: Service, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(futures::ready!(self.project().fut.poll(cx))?)) + Pin::new(&mut self.get_mut().fut).poll(cx) } } /// Converter for `Fn() -> Future` fn -pub struct FnNewServiceNoConfig +pub struct FnServiceNoConfig where F: Fn() -> R, - R: Future>, S: Service, + R: Future>, { f: F, _t: PhantomData, } -impl FnNewServiceNoConfig +impl FnServiceNoConfig where F: Fn() -> R, R: Future>, S: Service, { fn new(f: F) -> Self { - FnNewServiceNoConfig { f, _t: PhantomData } + Self { f, _t: PhantomData } } } -impl ServiceFactory for FnNewServiceNoConfig +impl ServiceFactory for FnServiceNoConfig where F: Fn() -> R, R: Future>, @@ -284,7 +266,7 @@ where } } -impl Clone for FnNewServiceNoConfig +impl Clone for FnServiceNoConfig where F: Fn() -> R + Clone, R: Future>, @@ -295,13 +277,13 @@ where } } -impl IntoServiceFactory> for F +impl IntoServiceFactory> for F where F: Fn() -> R, R: Future>, S: Service, { - fn into_factory(self) -> FnNewServiceNoConfig { - FnNewServiceNoConfig::new(self) + fn into_factory(self) -> FnServiceNoConfig { + FnServiceNoConfig::new(self) } } diff --git a/actix-service/src/into.rs b/actix-service/src/into.rs deleted file mode 100644 index b6b3f612..00000000 --- a/actix-service/src/into.rs +++ /dev/null @@ -1,204 +0,0 @@ -use std::task::{Context, Poll}; - -use crate::map::{Map, MapNewService}; -use crate::map_err::{MapErr, MapErrNewService}; -use crate::map_init_err::MapInitErr; -use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; - -#[inline] -/// Convert object of type `U` to a service `T` -pub fn into_service(service: U) -> ServiceMapper -where - U: IntoService, - T: Service, -{ - ServiceMapper { - service: service.into_service(), - } -} - -pub fn into_factory(factory: F) -> ServiceFactoryMapper -where - T: ServiceFactory, - F: IntoServiceFactory, -{ - ServiceFactoryMapper { - factory: factory.into_factory(), - } -} - -pub struct ServiceMapper { - service: T, -} - -pub struct ServiceFactoryMapper { - factory: T, -} - -impl ServiceMapper { - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - /// - /// This function is similar to the `Option::map` or `Iterator::map` where - /// it will change the type of the underlying service. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it, similar to the existing `map` methods in the - /// standard library. - pub fn map( - self, - f: F, - ) -> ServiceMapper> - where - Self: Sized, - F: FnMut(T::Response) -> R + Clone, - { - ServiceMapper { - service: Map::new(self.service, f), - } - } - - /// Map this service's error to a different error, returning a new service. - /// - /// This function is similar to the `Result::map_err` where it will change - /// the error type of the underlying service. This is useful for example to - /// ensure that services have the same error type. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - pub fn map_err( - self, - f: F, - ) -> ServiceMapper> - where - Self: Sized, - F: Fn(T::Error) -> E + Clone, - { - ServiceMapper { - service: MapErr::new(self, f), - } - } -} - -impl Clone for ServiceMapper -where - T: Clone, -{ - fn clone(&self) -> Self { - ServiceMapper { - service: self.service.clone(), - } - } -} - -impl Service for ServiceMapper { - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Future = T::Future; - - #[inline] - fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(ctx) - } - - #[inline] - fn call(&mut self, req: T::Request) -> Self::Future { - self.service.call(req) - } -} - -impl ServiceFactoryMapper { - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - pub fn map( - self, - f: F, - ) -> ServiceFactoryMapper< - impl ServiceFactory< - Config = T::Config, - Request = T::Request, - Response = R, - Error = T::Error, - InitError = T::InitError, - >, - > - where - Self: Sized, - F: FnMut(T::Response) -> R + Clone, - { - ServiceFactoryMapper { - factory: MapNewService::new(self.factory, f), - } - } - - /// Map this service's error to a different error, returning a new service. - pub fn map_err( - self, - f: F, - ) -> ServiceFactoryMapper< - impl ServiceFactory< - Config = T::Config, - Request = T::Request, - Response = T::Response, - Error = E, - InitError = T::InitError, - >, - > - where - Self: Sized, - F: Fn(T::Error) -> E + Clone, - { - ServiceFactoryMapper { - factory: MapErrNewService::new(self.factory, f), - } - } - - /// Map this factory's init error to a different error, returning a new service. - pub fn map_init_err( - self, - f: F, - ) -> ServiceFactoryMapper< - impl ServiceFactory< - Config = T::Config, - Request = T::Request, - Response = T::Response, - Error = T::Error, - InitError = E, - >, - > - where - Self: Sized, - F: Fn(T::InitError) -> E + Clone, - { - ServiceFactoryMapper { - factory: MapInitErr::new(self.factory, f), - } - } -} - -impl Clone for ServiceFactoryMapper -where - T: Clone, -{ - fn clone(&self) -> Self { - ServiceFactoryMapper { - factory: self.factory.clone(), - } - } -} - -impl ServiceFactory for ServiceFactoryMapper { - type Config = T::Config; - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Service = T::Service; - type InitError = T::InitError; - type Future = T::Future; - - #[inline] - fn new_service(&self, cfg: &T::Config) -> Self::Future { - self.factory.new_service(cfg) - } -} diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 16162563..ab7da630 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -10,7 +10,6 @@ mod apply_cfg; pub mod boxed; mod cell; mod fn_service; -mod into; mod map; mod map_config; mod map_err; @@ -18,12 +17,10 @@ mod map_init_err; mod pipeline; mod then; mod transform; -mod transform_err; pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::apply_cfg::{apply_cfg, apply_cfg_factory}; pub use self::fn_service::{factory_fn, factory_fn_cfg, service_fn, service_fn2}; -pub use self::into::{into_factory, into_service, ServiceFactoryMapper, ServiceMapper}; pub use self::map_config::{map_config, unit_config, MappedConfig}; pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; pub use self::transform::{apply, Transform}; @@ -63,6 +60,41 @@ pub trait Service { /// Calling `call` without calling `poll_ready` is permitted. The /// implementation must be resilient to this fact. fn call(&mut self, req: Self::Request) -> Self::Future; + + /// Map this service's output to a different type, returning a new service + /// of the resulting type. + /// + /// This function is similar to the `Option::map` or `Iterator::map` where + /// it will change the type of the underlying service. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + fn map(self, f: F) -> crate::dev::Map + where + Self: Sized, + Self::Future: Unpin, + F: FnMut(Self::Response) -> R + Unpin, + { + crate::dev::Map::new(self, f) + } + + /// Map this service's error to a different error, returning a new service. + /// + /// This function is similar to the `Result::map_err` where it will change + /// the error type of the underlying service. This is useful for example to + /// ensure that services have the same error type. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it. + fn map_err(self, f: F) -> crate::dev::MapErr + where + Self: Sized, + Self::Future: Unpin, + F: Fn(Self::Error) -> E, + { + crate::dev::MapErr::new(self, f) + } } /// Creates new `Service` values. @@ -102,6 +134,37 @@ pub trait ServiceFactory { /// Create and return a new service value asynchronously. fn new_service(&self, cfg: &Self::Config) -> Self::Future; + + /// Map this service's output to a different type, returning a new service + /// of the resulting type. + fn map(self, f: F) -> crate::map::MapServiceFactory + where + Self: Sized, + ::Future: Unpin, + F: FnMut(Self::Response) -> R + Unpin + Clone, + { + crate::map::MapServiceFactory::new(self, f) + } + + /// Map this service's error to a different error, returning a new service. + fn map_err(self, f: F) -> crate::map_err::MapErrServiceFactory + where + Self: Sized, + ::Future: Unpin, + F: Fn(Self::Error) -> E + Unpin + Clone, + { + crate::map_err::MapErrServiceFactory::new(self, f) + } + + /// Map this factory's init error to a different error, returning a new service. + fn map_init_err(self, f: F) -> crate::map_init_err::MapInitErr + where + Self: Sized, + ::Future: Unpin, + F: Fn(Self::InitError) -> E + Unpin + Clone, + { + crate::map_init_err::MapInitErr::new(self, f) + } } impl<'a, S> Service for &'a mut S @@ -227,3 +290,17 @@ where self } } + +pub mod dev { + pub use crate::and_then::{AndThenService, AndThenServiceFactory}; + pub use crate::apply::{Apply, ApplyServiceFactory}; + pub use crate::apply_cfg::{ApplyConfigService, ApplyConfigServiceFactory}; + pub use crate::fn_service::{ + FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig, + }; + pub use crate::map::{Map, MapServiceFactory}; + pub use crate::map_err::{MapErr, MapErrServiceFactory}; + pub use crate::map_init_err::MapInitErr; + pub use crate::then::{ThenService, ThenServiceFactory}; + pub use crate::transform::{ApplyTransform, TransformMapInitErr}; +} diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index f877293a..29f3b301 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -3,14 +3,12 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use pin_project::pin_project; - use super::{Service, ServiceFactory}; /// Service for the `map` combinator, changing the type of a service's response. /// /// This is created by the `ServiceExt::map` method. -pub(crate) struct Map { +pub struct Map { service: A, f: F, _t: PhantomData, @@ -18,7 +16,7 @@ pub(crate) struct Map { impl Map { /// Create new `Map` combinator - pub fn new(service: A, f: F) -> Self + pub(crate) fn new(service: A, f: F) -> Self where A: Service, F: FnMut(A::Response) -> Response, @@ -48,7 +46,8 @@ where impl Service for Map where A: Service, - F: FnMut(A::Response) -> Response + Clone, + A::Future: Unpin, + F: FnMut(A::Response) -> Response + Unpin + Clone, { type Request = A::Request; type Response = Response; @@ -64,14 +63,12 @@ where } } -#[pin_project] -pub(crate) struct MapFuture +pub struct MapFuture where A: Service, F: FnMut(A::Response) -> Response, { f: F, - #[pin] fut: A::Future, } @@ -88,13 +85,15 @@ where impl Future for MapFuture where A: Service, - F: FnMut(A::Response) -> Response, + A::Future: Unpin, + F: FnMut(A::Response) -> Response + Unpin, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.fut.poll(cx) { + let this = self.get_mut(); + + match Pin::new(&mut this.fut).poll(cx) { Poll::Ready(Ok(resp)) => Poll::Ready(Ok((this.f)(resp))), Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Pending => Poll::Pending, @@ -103,18 +102,18 @@ where } /// `MapNewService` new service combinator -pub(crate) struct MapNewService { +pub struct MapServiceFactory { a: A, f: F, r: PhantomData, } -impl MapNewService { +impl MapServiceFactory { /// Create new `Map` new service instance - pub fn new(a: A, f: F) -> Self + pub(crate) fn new(a: A, f: F) -> Self where A: ServiceFactory, - F: FnMut(A::Response) -> Res, + F: FnMut(A::Response) -> Res + Unpin, { Self { a, @@ -124,7 +123,7 @@ impl MapNewService { } } -impl Clone for MapNewService +impl Clone for MapServiceFactory where A: Clone, F: Clone, @@ -138,10 +137,12 @@ where } } -impl ServiceFactory for MapNewService +impl ServiceFactory for MapServiceFactory where A: ServiceFactory, - F: FnMut(A::Response) -> Res + Clone, + A::Future: Unpin, + ::Future: Unpin, + F: FnMut(A::Response) -> Res + Unpin + Clone, { type Request = A::Request; type Response = Res; @@ -150,44 +151,44 @@ where type Config = A::Config; type Service = Map; type InitError = A::InitError; - type Future = MapNewServiceFuture; + type Future = MapServiceFuture; fn new_service(&self, cfg: &A::Config) -> Self::Future { - MapNewServiceFuture::new(self.a.new_service(cfg), self.f.clone()) + MapServiceFuture::new(self.a.new_service(cfg), self.f.clone()) } } -#[pin_project] -pub(crate) struct MapNewServiceFuture +pub struct MapServiceFuture where A: ServiceFactory, F: FnMut(A::Response) -> Res, { - #[pin] fut: A::Future, f: Option, } -impl MapNewServiceFuture +impl MapServiceFuture where A: ServiceFactory, - F: FnMut(A::Response) -> Res, + F: FnMut(A::Response) -> Res + Unpin, { fn new(fut: A::Future, f: F) -> Self { - MapNewServiceFuture { f: Some(f), fut } + MapServiceFuture { f: Some(f), fut } } } -impl Future for MapNewServiceFuture +impl Future for MapServiceFuture where A: ServiceFactory, - F: FnMut(A::Response) -> Res, + A::Future: Unpin, + F: FnMut(A::Response) -> Res + Unpin, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - if let Poll::Ready(svc) = this.fut.poll(cx)? { + let this = self.get_mut(); + + if let Poll::Ready(svc) = Pin::new(&mut this.fut).poll(cx)? { Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap()))) } else { Poll::Pending @@ -200,7 +201,7 @@ mod tests { use futures::future::{lazy, ok, Ready}; use super::*; - use crate::{into_factory, into_service, Service}; + use crate::{IntoServiceFactory, Service, ServiceFactory}; struct Srv; @@ -221,14 +222,14 @@ mod tests { #[tokio::test] async fn test_poll_ready() { - let mut srv = into_service(Srv).map(|_| "ok"); + let mut srv = Srv.map(|_| "ok"); let res = lazy(|cx| srv.poll_ready(cx)).await; assert_eq!(res, Poll::Ready(Ok(()))); } #[tokio::test] async fn test_call() { - let mut srv = into_service(Srv).map(|_| "ok"); + let mut srv = Srv.map(|_| "ok"); let res = srv.call(()).await; assert!(res.is_ok()); assert_eq!(res.unwrap(), "ok"); @@ -236,7 +237,7 @@ mod tests { #[tokio::test] async fn test_new_service() { - let new_srv = into_factory(|| ok::<_, ()>(Srv)).map(|_| "ok"); + let new_srv = (|| ok::<_, ()>(Srv)).into_factory().map(|_| "ok"); let mut srv = new_srv.new_service(&()).await.unwrap(); let res = srv.call(()).await; assert!(res.is_ok()); diff --git a/actix-service/src/map_err.rs b/actix-service/src/map_err.rs index b76ad2af..b1cf0ae9 100644 --- a/actix-service/src/map_err.rs +++ b/actix-service/src/map_err.rs @@ -3,15 +3,13 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use pin_project::pin_project; - use super::{Service, ServiceFactory}; /// Service for the `map_err` combinator, changing the type of a service's /// error. /// /// This is created by the `ServiceExt::map_err` method. -pub(crate) struct MapErr { +pub struct MapErr { service: A, f: F, _t: PhantomData, @@ -19,7 +17,7 @@ pub(crate) struct MapErr { impl MapErr { /// Create new `MapErr` combinator - pub fn new(service: A, f: F) -> Self + pub(crate) fn new(service: A, f: F) -> Self where A: Service, F: Fn(A::Error) -> E, @@ -49,7 +47,8 @@ where impl Service for MapErr where A: Service, - F: Fn(A::Error) -> E + Clone, + A::Future: Unpin, + F: Fn(A::Error) -> E + Unpin + Clone, { type Request = A::Request; type Response = A::Response; @@ -65,21 +64,21 @@ where } } -#[pin_project] -pub(crate) struct MapErrFuture +pub struct MapErrFuture where A: Service, - F: Fn(A::Error) -> E, + A::Future: Unpin, + F: Fn(A::Error) -> E + Unpin, { f: F, - #[pin] fut: A::Future, } impl MapErrFuture where A: Service, - F: Fn(A::Error) -> E, + A::Future: Unpin, + F: Fn(A::Error) -> E + Unpin, { fn new(fut: A::Future, f: F) -> Self { MapErrFuture { f, fut } @@ -89,13 +88,14 @@ where impl Future for MapErrFuture where A: Service, - F: Fn(A::Error) -> E, + A::Future: Unpin, + F: Fn(A::Error) -> E + Unpin, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - this.fut.poll(cx).map_err(this.f) + let this = self.get_mut(); + Pin::new(&mut this.fut).poll(cx).map_err(&this.f) } } @@ -103,7 +103,7 @@ where /// service's error. /// /// This is created by the `NewServiceExt::map_err` method. -pub(crate) struct MapErrNewService +pub struct MapErrServiceFactory where A: ServiceFactory, F: Fn(A::Error) -> E + Clone, @@ -113,7 +113,7 @@ where e: PhantomData, } -impl MapErrNewService +impl MapErrServiceFactory where A: ServiceFactory, F: Fn(A::Error) -> E + Clone, @@ -128,7 +128,7 @@ where } } -impl Clone for MapErrNewService +impl Clone for MapErrServiceFactory where A: ServiceFactory + Clone, F: Fn(A::Error) -> E + Clone, @@ -142,10 +142,12 @@ where } } -impl ServiceFactory for MapErrNewService +impl ServiceFactory for MapErrServiceFactory where A: ServiceFactory, - F: Fn(A::Error) -> E + Clone, + A::Future: Unpin, + ::Future: Unpin, + F: Fn(A::Error) -> E + Unpin + Clone, { type Request = A::Request; type Response = A::Response; @@ -154,44 +156,43 @@ where type Config = A::Config; type Service = MapErr; type InitError = A::InitError; - type Future = MapErrNewServiceFuture; + type Future = MapErrServiceFuture; fn new_service(&self, cfg: &A::Config) -> Self::Future { - MapErrNewServiceFuture::new(self.a.new_service(cfg), self.f.clone()) + MapErrServiceFuture::new(self.a.new_service(cfg), self.f.clone()) } } -#[pin_project] -pub(crate) struct MapErrNewServiceFuture +pub struct MapErrServiceFuture where A: ServiceFactory, F: Fn(A::Error) -> E, { - #[pin] fut: A::Future, f: F, } -impl MapErrNewServiceFuture +impl MapErrServiceFuture where A: ServiceFactory, F: Fn(A::Error) -> E, { fn new(fut: A::Future, f: F) -> Self { - MapErrNewServiceFuture { f, fut } + MapErrServiceFuture { f, fut } } } -impl Future for MapErrNewServiceFuture +impl Future for MapErrServiceFuture where A: ServiceFactory, - F: Fn(A::Error) -> E + Clone, + A::Future: Unpin, + F: Fn(A::Error) -> E + Unpin + Clone, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - if let Poll::Ready(svc) = this.fut.poll(cx)? { + let this = self.get_mut(); + if let Poll::Ready(svc) = Pin::new(&mut this.fut).poll(cx)? { Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) } else { Poll::Pending @@ -204,7 +205,7 @@ mod tests { use futures::future::{err, lazy, ok, Ready}; use super::*; - use crate::{into_factory, into_service, Service}; + use crate::{IntoServiceFactory, Service, ServiceFactory}; struct Srv; @@ -225,14 +226,14 @@ mod tests { #[tokio::test] async fn test_poll_ready() { - let mut srv = into_service(Srv).map_err(|_| "error"); + let mut srv = Srv.map_err(|_| "error"); let res = lazy(|cx| srv.poll_ready(cx)).await; assert_eq!(res, Poll::Ready(Err("error"))); } #[tokio::test] async fn test_call() { - let mut srv = into_service(Srv).map_err(|_| "error"); + let mut srv = Srv.map_err(|_| "error"); let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); @@ -240,7 +241,7 @@ mod tests { #[tokio::test] async fn test_new_service() { - let new_srv = into_factory(|| ok::<_, ()>(Srv)).map_err(|_| "error"); + let new_srv = (|| ok::<_, ()>(Srv)).into_factory().map_err(|_| "error"); let mut srv = new_srv.new_service(&()).await.unwrap(); let res = srv.call(()).await; assert!(res.is_err()); diff --git a/actix-service/src/map_init_err.rs b/actix-service/src/map_init_err.rs index 36881692..2ea96cbf 100644 --- a/actix-service/src/map_init_err.rs +++ b/actix-service/src/map_init_err.rs @@ -3,12 +3,10 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use pin_project::pin_project; - use super::ServiceFactory; /// `MapInitErr` service combinator -pub(crate) struct MapInitErr { +pub struct MapInitErr { a: A, f: F, e: PhantomData, @@ -46,7 +44,8 @@ where impl ServiceFactory for MapInitErr where A: ServiceFactory, - F: Fn(A::InitError) -> E + Clone, + A::Future: Unpin, + F: Fn(A::InitError) -> E + Unpin + Clone, { type Request = A::Request; type Response = A::Response; @@ -61,14 +60,13 @@ where MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone()) } } -#[pin_project] -pub(crate) struct MapInitErrFuture + +pub struct MapInitErrFuture where A: ServiceFactory, F: Fn(A::InitError) -> E, { f: F, - #[pin] fut: A::Future, } @@ -85,12 +83,13 @@ where impl Future for MapInitErrFuture where A: ServiceFactory, - F: Fn(A::InitError) -> E, + A::Future: Unpin, + F: Fn(A::InitError) -> E + Unpin, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - this.fut.poll(cx).map_err(this.f) + let this = self.get_mut(); + Pin::new(&mut this.fut).poll(cx).map_err(&this.f) } } diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 8c771aba..fd58dbbd 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -1,7 +1,7 @@ use std::task::{Context, Poll}; -use crate::and_then::{AndThen, AndThenNewService}; -use crate::then::{Then, ThenNewService}; +use crate::and_then::{AndThenService, AndThenServiceFactory}; +use crate::then::{ThenService, ThenServiceFactory}; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; pub fn pipeline(service: F) -> Pipeline @@ -39,17 +39,14 @@ impl Pipeline { /// /// Note that this function consumes the receiving service and returns a /// wrapped version of it. - pub fn and_then( - self, - service: F, - ) -> Pipeline> + pub fn and_then(self, service: F) -> Pipeline> where Self: Sized, F: IntoService, - U: Service, + U: Service + Unpin, { Pipeline { - service: AndThen::new(self.service, service.into_service()), + service: AndThenService::new(self.service, service.into_service()), } } @@ -58,17 +55,14 @@ impl Pipeline { /// /// Note that this function consumes the receiving pipeline and returns a /// wrapped version of it. - pub fn then( - self, - service: F, - ) -> Pipeline> + pub fn then(self, service: F) -> Pipeline> where Self: Sized, F: IntoService, U: Service, Error = T::Error>, { Pipeline { - service: Then::new(self.service, service.into_service()), + service: ThenService::new(self.service, service.into_service()), } } } @@ -108,18 +102,7 @@ pub struct PipelineFactory { impl PipelineFactory { /// Call another service after call to this one has resolved successfully. - pub fn and_then( - self, - factory: F, - ) -> PipelineFactory< - impl ServiceFactory< - Config = T::Config, - Request = T::Request, - Response = U::Response, - Error = T::Error, - InitError = T::InitError, - >, - > + pub fn and_then(self, factory: F) -> PipelineFactory> where Self: Sized, F: IntoServiceFactory, @@ -131,7 +114,7 @@ impl PipelineFactory { >, { PipelineFactory { - factory: AndThenNewService::new(self.factory, factory.into_factory()), + factory: AndThenServiceFactory::new(self.factory, factory.into_factory()), } } @@ -141,18 +124,7 @@ impl PipelineFactory { /// /// Note that this function consumes the receiving pipeline and returns a /// wrapped version of it. - pub fn then( - self, - factory: F, - ) -> PipelineFactory< - impl ServiceFactory< - Config = T::Config, - Request = T::Request, - Response = U::Response, - Error = T::Error, - InitError = T::InitError, - >, - > + pub fn then(self, factory: F) -> PipelineFactory> where Self: Sized, F: IntoServiceFactory, @@ -164,7 +136,7 @@ impl PipelineFactory { >, { PipelineFactory { - factory: ThenNewService::new(self.factory, factory.into_factory()), + factory: ThenServiceFactory::new(self.factory, factory.into_factory()), } } } diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index 8fbbe1e5..37ddc21e 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -2,8 +2,6 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use pin_project::pin_project; - use super::{Service, ServiceFactory}; use crate::cell::Cell; @@ -11,43 +9,45 @@ use crate::cell::Cell; /// another service. /// /// This is created by the `ServiceExt::then` method. -pub(crate) struct Then { +pub struct ThenService { a: A, b: Cell, } -impl Then { - /// Create new `Then` combinator - pub fn new(a: A, b: B) -> Then +impl ThenService { + /// Create new `.then()` combinator + pub(crate) fn new(a: A, b: B) -> ThenService where A: Service, B: Service, Error = A::Error>, { - Then { a, b: Cell::new(b) } + Self { a, b: Cell::new(b) } } } -impl Clone for Then +impl Clone for ThenService where A: Clone, { fn clone(&self) -> Self { - Then { + ThenService { a: self.a.clone(), b: self.b.clone(), } } } -impl Service for Then +impl Service for ThenService where A: Service, B: Service, Error = A::Error>, + A::Future: Unpin, + B::Future: Unpin, { type Request = A::Request; type Response = B::Response; type Error = B::Error; - type Future = ThenFuture; + type Future = ThenServiceResponse; fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { let not_ready = !self.a.poll_ready(ctx)?.is_ready(); @@ -59,30 +59,29 @@ where } fn call(&mut self, req: A::Request) -> Self::Future { - ThenFuture::new(self.a.call(req), self.b.clone()) + ThenServiceResponse::new(self.a.call(req), self.b.clone()) } } -#[pin_project] -pub(crate) struct ThenFuture +pub struct ThenServiceResponse where A: Service, B: Service>, { b: Cell, - #[pin] fut_b: Option, - #[pin] fut_a: Option, } -impl ThenFuture +impl ThenServiceResponse where A: Service, B: Service>, + A::Future: Unpin, + B::Future: Unpin, { fn new(a: A::Future, b: Cell) -> Self { - ThenFuture { + ThenServiceResponse { b, fut_a: Some(a), fut_b: None, @@ -90,34 +89,26 @@ where } } -impl Future for ThenFuture +impl Future for ThenServiceResponse where A: Service, B: Service>, + A::Future: Unpin, + B::Future: Unpin, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let this = self.get_mut(); loop { - let mut fut_a = this.fut_a.as_mut(); - let mut fut_b = this.fut_b.as_mut(); - - if let Some(fut) = fut_b.as_mut().as_pin_mut() { - return fut.poll(cx); + if let Some(ref mut fut) = this.fut_b { + return Pin::new(fut).poll(cx); } - match fut_a - .as_mut() - .as_pin_mut() - .expect("Bug in actix-service") - .poll(cx) - { + match Pin::new(this.fut_a.as_mut().expect("Bug in actix-service")).poll(cx) { Poll::Ready(r) => { - fut_a.set(None); - let new_fut = this.b.get_mut().call(r); - fut_b.set(Some(new_fut)); + this.fut_b = Some(this.b.get_mut().call(r)); } Poll::Pending => return Poll::Pending, @@ -127,12 +118,12 @@ where } /// `.then()` service factory combinator -pub(crate) struct ThenNewService { +pub struct ThenServiceFactory { a: A, b: B, } -impl ThenNewService +impl ThenServiceFactory where A: ServiceFactory, B: ServiceFactory< @@ -143,12 +134,12 @@ where >, { /// Create new `AndThen` combinator - pub fn new(a: A, b: B) -> Self { + pub(crate) fn new(a: A, b: B) -> Self { Self { a, b } } } -impl ServiceFactory for ThenNewService +impl ServiceFactory for ThenServiceFactory where A: ServiceFactory, B: ServiceFactory< @@ -157,22 +148,28 @@ where Error = A::Error, InitError = A::InitError, >, + A::Future: Unpin, + A::Service: Unpin, + ::Future: Unpin, + B::Future: Unpin, + B::Service: Unpin, + ::Future: Unpin, { type Request = A::Request; type Response = B::Response; type Error = A::Error; type Config = A::Config; - type Service = Then; + type Service = ThenService; type InitError = A::InitError; - type Future = ThenNewServiceFuture; + type Future = ThenServiceFactoryResponse; fn new_service(&self, cfg: &A::Config) -> Self::Future { - ThenNewServiceFuture::new(self.a.new_service(cfg), self.b.new_service(cfg)) + ThenServiceFactoryResponse::new(self.a.new_service(cfg), self.b.new_service(cfg)) } } -impl Clone for ThenNewService +impl Clone for ThenServiceFactory where A: Clone, B: Clone, @@ -185,8 +182,7 @@ where } } -#[pin_project] -pub(crate) struct ThenNewServiceFuture +pub struct ThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory< @@ -196,15 +192,13 @@ where InitError = A::InitError, >, { - #[pin] fut_b: B::Future, - #[pin] fut_a: A::Future, a: Option, b: Option, } -impl ThenNewServiceFuture +impl ThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory< @@ -213,9 +207,15 @@ where Error = A::Error, InitError = A::InitError, >, + A::Future: Unpin, + A::Service: Unpin, + ::Future: Unpin, + B::Future: Unpin, + B::Service: Unpin, + ::Future: Unpin, { fn new(fut_a: A::Future, fut_b: B::Future) -> Self { - ThenNewServiceFuture { + Self { fut_a, fut_b, a: None, @@ -224,7 +224,7 @@ where } } -impl Future for ThenNewServiceFuture +impl Future for ThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory< @@ -233,23 +233,30 @@ where Error = A::Error, InitError = A::InitError, >, + A::Future: Unpin, + A::Service: Unpin, + ::Future: Unpin, + B::Future: Unpin, + B::Service: Unpin, + ::Future: Unpin, { - type Output = Result, A::InitError>; + type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); + let this = self.get_mut(); + if this.a.is_none() { - if let Poll::Ready(service) = this.fut_a.poll(cx)? { - *this.a = Some(service); + if let Poll::Ready(service) = Pin::new(&mut this.fut_a).poll(cx)? { + this.a = Some(service); } } if this.b.is_none() { - if let Poll::Ready(service) = this.fut_b.poll(cx)? { - *this.b = Some(service); + if let Poll::Ready(service) = Pin::new(&mut this.fut_b).poll(cx)? { + this.b = Some(service); } } if this.a.is_some() && this.b.is_some() { - Poll::Ready(Ok(Then::new( + Poll::Ready(Ok(ThenService::new( this.a.take().unwrap(), this.b.take().unwrap(), ))) diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 5421ba78..d57a6296 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -1,14 +1,12 @@ use std::future::Future; +use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::transform_err::TransformMapInitErr; use crate::{IntoServiceFactory, Service, ServiceFactory}; -use pin_project::pin_project; - /// The `Transform` trait defines the interface of a Service factory. `Transform` /// is often implemented for middleware, defining how to construct a /// middleware Service. A Service that is constructed by the factory takes @@ -45,7 +43,8 @@ pub trait Transform { fn map_init_err(self, f: F) -> TransformMapInitErr where Self: Sized, - F: Fn(Self::InitError) -> E, + Self::Future: Unpin, + F: Fn(Self::InitError) -> E + Unpin + Clone, { TransformMapInitErr::new(self, f) } @@ -86,27 +85,19 @@ where /// Apply transform to a service. Function returns /// services factory that in initialization creates /// service and applies transform to this service. -pub fn apply( - t: T, - service: U, -) -> impl ServiceFactory< - Config = S::Config, - Request = T::Request, - Response = T::Response, - Error = T::Error, - Service = T::Transform, - InitError = S::InitError, -> + Clone +pub fn apply(t: T, service: U) -> ApplyTransform where S: ServiceFactory, + S::Future: Unpin, T: Transform, + T::Future: Unpin, U: IntoServiceFactory, { ApplyTransform::new(t, service.into_factory()) } /// `Apply` transform to new service -struct ApplyTransform { +pub struct ApplyTransform { s: Rc, t: Rc, } @@ -137,7 +128,9 @@ impl Clone for ApplyTransform { impl ServiceFactory for ApplyTransform where S: ServiceFactory, + S::Future: Unpin, T: Transform, + T::Future: Unpin, { type Request = T::Request; type Response = T::Response; @@ -157,15 +150,12 @@ where } } -#[pin_project] -struct ApplyTransformFuture +pub struct ApplyTransformFuture where S: ServiceFactory, T: Transform, { - #[pin] fut_a: S::Future, - #[pin] fut_t: Option, t_cell: Rc, } @@ -173,23 +163,114 @@ where impl Future for ApplyTransformFuture where S: ServiceFactory, + S::Future: Unpin, T: Transform, + T::Future: Unpin, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let mut this = self.get_mut(); - if this.fut_t.as_mut().as_pin_mut().is_none() { - if let Poll::Ready(service) = this.fut_a.poll(cx)? { - this.fut_t.set(Some(this.t_cell.new_transform(service))); + if this.fut_t.is_none() { + if let Poll::Ready(service) = Pin::new(&mut this.fut_a).poll(cx)? { + this.fut_t = Some(this.t_cell.new_transform(service)); + } else { + return Poll::Pending; } } - if let Some(fut) = this.fut_t.as_mut().as_pin_mut() { - fut.poll(cx) + if let Some(ref mut fut) = this.fut_t { + Pin::new(fut).poll(cx) } else { Poll::Pending } } } + +/// Transform for the `map_err` combinator, changing the type of a new +/// transform's init error. +/// +/// This is created by the `Transform::map_err` method. +pub struct TransformMapInitErr { + t: T, + f: F, + e: PhantomData<(S, E)>, +} + +impl TransformMapInitErr { + /// Create new `TransformMapErr` new transform instance + pub(crate) fn new(t: T, f: F) -> Self + where + T: Transform, + T::Future: Unpin, + F: Fn(T::InitError) -> E + Unpin + Clone, + { + Self { + t, + f, + e: PhantomData, + } + } +} + +impl Clone for TransformMapInitErr +where + T: Clone, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + t: self.t.clone(), + f: self.f.clone(), + e: PhantomData, + } + } +} + +impl Transform for TransformMapInitErr +where + T: Transform, + T::Future: Unpin, + F: Fn(T::InitError) -> E + Unpin + Clone, +{ + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Transform = T::Transform; + + type InitError = E; + type Future = TransformMapInitErrFuture; + + fn new_transform(&self, service: S) -> Self::Future { + TransformMapInitErrFuture { + fut: self.t.new_transform(service), + f: self.f.clone(), + } + } +} + +pub struct TransformMapInitErrFuture +where + T: Transform, + T::Future: Unpin, + F: Fn(T::InitError) -> E + Unpin, +{ + fut: T::Future, + f: F, +} + +impl Future for TransformMapInitErrFuture +where + T: Transform, + T::Future: Unpin, + F: Fn(T::InitError) -> E + Unpin + Clone, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + Pin::new(&mut this.fut).poll(cx).map_err(&this.f) + } +} diff --git a/actix-service/src/transform_err.rs b/actix-service/src/transform_err.rs deleted file mode 100644 index d41de357..00000000 --- a/actix-service/src/transform_err.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use pin_project::pin_project; - -use super::Transform; - -/// Transform for the `map_err` combinator, changing the type of a new -/// transform's init error. -/// -/// This is created by the `Transform::map_err` method. -pub struct TransformMapInitErr { - t: T, - f: F, - e: PhantomData<(S, E)>, -} - -impl TransformMapInitErr { - /// Create new `TransformMapErr` new transform instance - pub fn new(t: T, f: F) -> Self - where - T: Transform, - F: Fn(T::InitError) -> E, - { - Self { - t, - f, - e: PhantomData, - } - } -} - -impl Clone for TransformMapInitErr -where - T: Clone, - F: Clone, -{ - fn clone(&self) -> Self { - Self { - t: self.t.clone(), - f: self.f.clone(), - e: PhantomData, - } - } -} - -impl Transform for TransformMapInitErr -where - T: Transform, - F: Fn(T::InitError) -> E + Clone, -{ - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Transform = T::Transform; - - type InitError = E; - type Future = TransformMapInitErrFuture; - - fn new_transform(&self, service: S) -> Self::Future { - TransformMapInitErrFuture { - fut: self.t.new_transform(service), - f: self.f.clone(), - } - } -} -#[pin_project] -pub struct TransformMapInitErrFuture -where - T: Transform, - F: Fn(T::InitError) -> E, -{ - #[pin] - fut: T::Future, - f: F, -} - -impl Future for TransformMapInitErrFuture -where - T: Transform, - F: Fn(T::InitError) -> E + Clone, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - this.fut.poll(cx).map_err(this.f) - } -} diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index ccdbabb5..c21d9f8c 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -325,7 +325,7 @@ where if !buf_empty { match inner.buf.pop_front().unwrap() { Ok(msg) => { - if let Err(err) = framed.force_send(msg) { + if let Err(err) = framed.write(msg) { *state = TransportState::FramedError(FramedTransportError::Encoder(err)); return true; @@ -342,7 +342,7 @@ where if !rx_done && rx.is_some() { match Pin::new(rx.as_mut().unwrap()).poll_next(cx) { Poll::Ready(Some(FramedMessage::Message(msg))) => { - if let Err(err) = framed.force_send(msg) { + if let Err(err) = framed.write(msg) { *state = TransportState::FramedError(FramedTransportError::Encoder(err)); return true;