From 135494646055399f60cc0362ea8f0201375dd355 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 18 Nov 2019 18:28:54 +0600 Subject: [PATCH] remove pin-project; update Unpin consrtaint --- actix-codec/src/framed.rs | 17 ++++++ actix-connect/Cargo.toml | 1 - actix-connect/src/connector.rs | 6 +- actix-connect/src/resolver.rs | 7 +-- actix-ioframe/Cargo.toml | 1 - actix-ioframe/src/connect.rs | 20 ++++--- actix-ioframe/src/dispatcher.rs | 47 ++++++++++------ actix-ioframe/src/service.rs | 99 ++++++++++++++++++++++----------- actix-server/Cargo.toml | 1 - actix-server/src/ssl/openssl.rs | 18 +++--- actix-server/src/ssl/rustls.rs | 14 ++--- actix-service/src/and_then.rs | 17 ++++-- actix-service/src/apply.rs | 3 +- actix-service/src/lib.rs | 6 +- actix-service/src/map_config.rs | 31 +++-------- actix-service/src/pipeline.rs | 2 +- actix-service/src/then.rs | 22 ++++++-- actix-utils/Cargo.toml | 1 - actix-utils/src/either.rs | 23 +++++--- actix-utils/src/framed.rs | 23 ++++---- actix-utils/src/inflight.rs | 17 ++++-- actix-utils/src/timeout.rs | 10 ++-- 22 files changed, 225 insertions(+), 161 deletions(-) diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 4b6b7f88..949e7b97 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -225,6 +225,15 @@ impl Framed { self.inner.get_mut().force_send(item) } + pub fn is_ready(&mut self, cx: &mut Context<'_>) -> Poll> + where + T: AsyncWrite + Unpin, + U: Encoder + Unpin, + U::Error: From, + { + Pin::new(&mut self.inner.get_mut()).poll_ready(cx) + } + pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> where T: AsyncRead + Unpin, @@ -240,6 +249,14 @@ impl Framed { { Pin::new(self.inner.get_mut()).poll_flush(cx) } + + pub fn close(&mut self, cx: &mut Context<'_>) -> Poll> + where + T: AsyncWrite + Unpin, + U: Encoder + Unpin, + { + Pin::new(&mut self.inner.get_mut()).poll_close(cx) + } } impl Stream for Framed diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 4ede79b8..0198cd58 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -40,7 +40,6 @@ actix-rt = "1.0.0-alpha.1" derive_more = "0.15" either = "1.5.2" futures = "0.3.1" -pin-project = "0.4.5" http = { version = "0.1.17", optional = true } log = "0.4" tokio-net = "=0.2.0-alpha.6" diff --git a/actix-connect/src/connector.rs b/actix-connect/src/connector.rs index 4b811bca..b8c54eeb 100644 --- a/actix-connect/src/connector.rs +++ b/actix-connect/src/connector.rs @@ -8,7 +8,6 @@ use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; use futures::future::{err, ok, BoxFuture, Either, FutureExt, Ready}; -use pin_project::pin_project; use tokio_net::tcp::TcpStream; use super::connect::{Address, Connect, Connection}; @@ -94,7 +93,6 @@ impl Service for TcpConnector { } } -#[pin_project] #[doc(hidden)] /// Tcp stream connector response future pub struct TcpConnectorResponse { @@ -137,7 +135,7 @@ impl Future for TcpConnectorResponse { type Output = Result, ConnectError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = self.project(); + let this = self.get_mut(); // connect loop { @@ -167,7 +165,7 @@ impl Future for TcpConnectorResponse { // try to connect let addr = this.addrs.as_mut().unwrap().pop_front().unwrap(); - *this.stream = Some(TcpStream::connect(addr).boxed()); + this.stream = Some(TcpStream::connect(addr).boxed()); } } } diff --git a/actix-connect/src/resolver.rs b/actix-connect/src/resolver.rs index 56df979e..58349817 100644 --- a/actix-connect/src/resolver.rs +++ b/actix-connect/src/resolver.rs @@ -6,7 +6,6 @@ use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; use futures::future::{ok, Either, Ready}; -use pin_project::pin_project; use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::{AsyncResolver, Background}; @@ -129,12 +128,10 @@ impl Service for Resolver { } } -#[pin_project] #[doc(hidden)] /// Resolver future pub struct ResolverFuture { req: Option>, - #[pin] lookup: Background, } @@ -157,9 +154,9 @@ impl Future for ResolverFuture { type Output = Result, ConnectError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = self.project(); + let this = self.get_mut(); - match this.lookup.poll(cx) { + match Pin::new(&mut this.lookup).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(ips)) => { let req = this.req.take().unwrap(); diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index df86ad13..7c93fe26 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -24,7 +24,6 @@ actix-utils = "0.5.0-alpha.1" bytes = "0.4" either = "1.5.2" futures = "0.3.1" -pin-project = "0.4.5" tokio-executor = "=0.2.0-alpha.6" log = "0.4" diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs index 703c3b14..0f445b39 100644 --- a/actix-ioframe/src/connect.rs +++ b/actix-ioframe/src/connect.rs @@ -5,7 +5,6 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_utils::mpsc; use futures::Stream; -use pin_project::pin_project; use crate::dispatcher::FramedMessage; use crate::sink::Sink; @@ -42,10 +41,8 @@ where } } -#[pin_project] pub struct ConnectResult { pub(crate) state: St, - #[pin] pub(crate) framed: Framed, pub(crate) rx: mpsc::Receiver::Item>>, pub(crate) sink: Sink<::Item>, @@ -78,6 +75,13 @@ impl ConnectResult { } } +impl Unpin for ConnectResult +where + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, +{ +} + impl Stream for ConnectResult where Io: AsyncRead + AsyncWrite + Unpin, @@ -86,7 +90,7 @@ where type Item = Result<::Item, ::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.project().framed.poll_next(cx) + self.get_mut().framed.next_item(cx) } } @@ -98,21 +102,21 @@ where type Error = ::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().framed.poll_ready(cx) + self.get_mut().framed.is_ready(cx) } fn start_send( self: Pin<&mut Self>, item: ::Item, ) -> Result<(), Self::Error> { - self.project().framed.start_send(item) + self.get_mut().framed.write(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().framed.poll_flush(cx) + self.get_mut().framed.flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().framed.poll_close(cx) + self.get_mut().framed.close(cx) } } diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index ec036220..a3d8158e 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -1,6 +1,5 @@ //! Framed dispatcher service and related utilities use std::collections::VecDeque; -use std::future::Future; use std::mem; use std::pin::Pin; use std::rc::Rc; @@ -31,7 +30,6 @@ pub(crate) enum FramedMessage { /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. -#[pin_project::pin_project] pub(crate) struct FramedDispatcher where S: Service, Response = Option>>, @@ -123,32 +121,45 @@ struct FramedDispatcherInner { task: LocalWaker, } -impl Future for FramedDispatcher +impl Unpin for FramedDispatcher where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, + S: Service, Response = Option>> + Unpin, + S::Error: Unpin + 'static, + S::Future: Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin, U: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { - type Output = Result<(), ServiceError>; +} - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - unsafe { self.inner.get_ref().task.register(cx.waker()) }; +impl FramedDispatcher +where + S: Service, Response = Option>> + Unpin, + S::Error: 'static, + S::Future: Unpin + 'static, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + pub(crate) fn poll( + &mut self, + cx: &mut Context, + ) -> Poll>> { + let this = self; + unsafe { this.inner.get_ref().task.register(cx.waker()) }; - let this = self.project(); poll( cx, - this.service, - this.state, - this.sink, - this.framed, - this.dispatch_state, - this.rx, - this.inner, - this.disconnect, + &mut this.service, + &mut this.state, + &mut this.sink, + &mut this.framed, + &mut this.dispatch_state, + &mut this.rx, + &mut this.inner, + &mut this.disconnect, ) } } diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs index 96cdab47..24b1d6a0 100644 --- a/actix-ioframe/src/service.rs +++ b/actix-ioframe/src/service.rs @@ -8,7 +8,6 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use either::Either; use futures::future::{FutureExt, LocalBoxFuture}; -use pin_project::{pin_project, project}; use crate::connect::{Connect, ConnectResult}; use crate::dispatcher::FramedDispatcher; @@ -77,6 +76,7 @@ where Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, C::Error: 'static, + C::Future: Unpin, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, @@ -93,10 +93,7 @@ where } /// Provide stream items handler service and construct service factory. - pub fn finish( - self, - service: F, - ) -> impl Service> + pub fn finish(self, service: F) -> FramedServiceImpl where F: IntoServiceFactory, T: ServiceFactory< @@ -106,6 +103,9 @@ where Error = C::Error, InitError = C::Error, > + 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, { FramedServiceImpl { connect: self.connect, @@ -132,7 +132,8 @@ where Response = ConnectResult, >, C::Error: 'static, - C::Future: 'static, + C::Future: Unpin + 'static, + ::Future: Unpin, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, @@ -148,15 +149,7 @@ where self } - pub fn finish( - self, - service: F, - ) -> impl ServiceFactory< - Config = Cfg, - Request = Io, - Response = (), - Error = ServiceError, - > + pub fn finish(self, service: F) -> FramedService where F: IntoServiceFactory, T: ServiceFactory< @@ -166,6 +159,9 @@ where Error = C::Error, InitError = C::Error, > + 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, { FramedService { connect: self.connect, @@ -176,7 +172,7 @@ where } } -pub(crate) struct FramedService { +pub struct FramedService { connect: C, handler: Rc, disconnect: Option>, @@ -193,7 +189,8 @@ where Response = ConnectResult, >, C::Error: 'static, - C::Future: 'static, + C::Future: Unpin + 'static, + ::Future: Unpin, T: ServiceFactory< Config = St, Request = RequestItem, @@ -201,6 +198,9 @@ where Error = C::Error, InitError = C::Error, > + 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, @@ -244,6 +244,7 @@ where Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, C::Error: 'static, + C::Future: Unpin, T: ServiceFactory< Config = St, Request = RequestItem, @@ -251,7 +252,9 @@ where Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, @@ -276,11 +279,11 @@ where } } -#[pin_project] pub struct FramedServiceImplResponse where C: Service, Response = ConnectResult>, C::Error: 'static, + C::Future: Unpin, T: ServiceFactory< Config = St, Request = RequestItem, @@ -288,7 +291,9 @@ where Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, @@ -297,9 +302,10 @@ where inner: FramedServiceImplResponseInner, } -impl Future for FramedServiceImplResponse +impl Unpin for FramedServiceImplResponse where C: Service, Response = ConnectResult>, + C::Future: Unpin, C::Error: 'static, T: ServiceFactory< Config = St, @@ -308,7 +314,31 @@ where Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ +} + +impl Future for FramedServiceImplResponse +where + C: Service, Response = ConnectResult>, + C::Future: Unpin, + C::Error: 'static, + T: ServiceFactory< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, @@ -320,7 +350,7 @@ where let this = self.get_mut(); loop { - match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) { + match this.inner.poll(cx) { Either::Left(new) => this.inner = new, Either::Right(poll) => return poll, }; @@ -328,10 +358,10 @@ where } } -#[pin_project] enum FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, + C::Future: Unpin, C::Error: 'static, T: ServiceFactory< Config = St, @@ -340,15 +370,17 @@ where Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { - Connect(#[pin] C::Future, Rc, Option>), + Connect(C::Future, Rc, Option>), Handler( - #[pin] T::Future, + T::Future, Option>, Option>, ), @@ -358,6 +390,7 @@ where impl FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, + C::Future: Unpin, C::Error: 'static, T: ServiceFactory< Config = St, @@ -366,22 +399,22 @@ where Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, + T::Future: Unpin, + T::Service: Unpin, + ::Future: Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { - #[project] fn poll( - self: Pin<&mut Self>, + &mut self, cx: &mut Context, ) -> Either< FramedServiceImplResponseInner, Poll>>, > { - #[project] - match self.project() { + match self { FramedServiceImplResponseInner::Connect( ref mut fut, ref handler, @@ -417,7 +450,7 @@ where Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), }, FramedServiceImplResponseInner::Dispatcher(ref mut fut) => { - Either::Right(Pin::new(fut).poll(cx)) + Either::Right(fut.poll(cx)) } } } diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 126ee37b..37f7fd30 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -33,7 +33,6 @@ actix-server-config = "0.3.0-alpha.1" log = "0.4" num_cpus = "1.0" -pin-project = "0.4.5" mio = "0.6.19" net2 = "0.2" futures = "0.3.1" diff --git a/actix-server/src/ssl/openssl.rs b/actix-server/src/ssl/openssl.rs index 69ed6fcd..24a50294 100644 --- a/actix-server/src/ssl/openssl.rs +++ b/actix-server/src/ssl/openssl.rs @@ -6,7 +6,6 @@ use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; use futures::future::{ok, FutureExt, LocalBoxFuture, Ready}; use open_ssl::ssl::SslAcceptor; -use pin_project::pin_project; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{HandshakeError, SslStream}; @@ -41,7 +40,9 @@ impl Clone for OpensslAcceptor { } } -impl ServiceFactory for OpensslAcceptor { +impl ServiceFactory + for OpensslAcceptor +{ type Request = Io; type Response = Io, P>; type Error = HandshakeError; @@ -69,7 +70,9 @@ pub struct OpensslAcceptorService { io: PhantomData<(T, P)>, } -impl Service for OpensslAcceptorService { +impl Service + for OpensslAcceptorService +{ type Request = Io; type Response = Io, P>; type Error = HandshakeError; @@ -98,24 +101,23 @@ impl Service for OpensslAcceptor } } -#[pin_project] pub struct OpensslAcceptorServiceFut where + P: Unpin, T: AsyncRead + AsyncWrite, { - #[pin] fut: LocalBoxFuture<'static, Result, HandshakeError>>, params: Option

, _guard: CounterGuard, } -impl Future for OpensslAcceptorServiceFut { +impl Future for OpensslAcceptorServiceFut { type Output = Result, P>, HandshakeError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); + let this = self.get_mut(); - let io = futures::ready!(this.fut.poll(cx))?; + let io = futures::ready!(Pin::new(&mut this.fut).poll(cx))?; let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() { const H2: &[u8] = b"\x02h2"; const HTTP10: &[u8] = b"\x08http/1.0"; diff --git a/actix-server/src/ssl/rustls.rs b/actix-server/src/ssl/rustls.rs index f2c30f60..12ed1b83 100644 --- a/actix-server/src/ssl/rustls.rs +++ b/actix-server/src/ssl/rustls.rs @@ -7,7 +7,6 @@ use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; use futures::future::{ok, Ready}; -use pin_project::pin_project; use rust_tls::ServerConfig; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor}; @@ -43,7 +42,7 @@ impl Clone for RustlsAcceptor { } } -impl ServiceFactory for RustlsAcceptor { +impl ServiceFactory for RustlsAcceptor { type Request = Io; type Response = Io, P>; type Error = io::Error; @@ -72,7 +71,7 @@ pub struct RustlsAcceptorService { conns: Counter, } -impl Service for RustlsAcceptorService { +impl Service for RustlsAcceptorService { type Request = Io; type Response = Io, P>; type Error = io::Error; @@ -96,25 +95,24 @@ impl Service for RustlsAcceptorService where T: AsyncRead + AsyncWrite + Unpin, + P: Unpin, { - #[pin] fut: Accept, params: Option

, _guard: CounterGuard, } -impl Future for RustlsAcceptorServiceFut { +impl Future for RustlsAcceptorServiceFut { type Output = Result, P>, io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = self.project(); + let this = self.get_mut(); let params = this.params.take().unwrap(); Poll::Ready( - futures::ready!(this.fut.poll(cx)) + futures::ready!(Pin::new(&mut this.fut).poll(cx)) .map(move |io| Io::from_parts(io, params, Protocol::Unknown)), ) } diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 5c1bd17b..68260129 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -154,10 +154,8 @@ where InitError = A::InitError, >, A::Future: Unpin, - A::Service: Unpin, ::Future: Unpin, B::Future: Unpin, - B::Service: Unpin, ::Future: Unpin, { type Request = A::Request; @@ -204,10 +202,8 @@ where A: ServiceFactory, B: ServiceFactory, A::Future: Unpin, - A::Service: Unpin, ::Future: Unpin, B::Future: Unpin, - B::Service: Unpin, ::Future: Unpin, { fn new(fut_a: A::Future, fut_b: B::Future) -> Self { @@ -220,15 +216,24 @@ where } } +impl Unpin for AndThenServiceFactoryResponse +where + A: ServiceFactory, + B: ServiceFactory, + A::Future: Unpin, + ::Future: Unpin, + B::Future: Unpin, + ::Future: Unpin, +{ +} + impl Future for AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, A::Future: Unpin, - A::Service: Unpin, ::Future: Unpin, B::Future: Unpin, - B::Service: Unpin, ::Future: Unpin, { type Output = Result, A::InitError>; diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index c14ea37e..e78af397 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -23,7 +23,8 @@ pub fn apply_fn_factory( ) -> ApplyServiceFactory where T: ServiceFactory, - F: FnMut(In, &mut T::Service) -> R + Clone, + T::Future: Unpin, + F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, R: Future>, U: IntoServiceFactory, { diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index ab7da630..141d25b0 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -73,7 +73,6 @@ pub trait Service { fn map(self, f: F) -> crate::dev::Map where Self: Sized, - Self::Future: Unpin, F: FnMut(Self::Response) -> R + Unpin, { crate::dev::Map::new(self, f) @@ -90,7 +89,6 @@ pub trait Service { fn map_err(self, f: F) -> crate::dev::MapErr where Self: Sized, - Self::Future: Unpin, F: Fn(Self::Error) -> E, { crate::dev::MapErr::new(self, f) @@ -140,7 +138,6 @@ pub trait ServiceFactory { fn map(self, f: F) -> crate::map::MapServiceFactory where Self: Sized, - ::Future: Unpin, F: FnMut(Self::Response) -> R + Unpin + Clone, { crate::map::MapServiceFactory::new(self, f) @@ -150,7 +147,6 @@ pub trait ServiceFactory { fn map_err(self, f: F) -> crate::map_err::MapErrServiceFactory where Self: Sized, - ::Future: Unpin, F: Fn(Self::Error) -> E + Unpin + Clone, { crate::map_err::MapErrServiceFactory::new(self, f) @@ -160,7 +156,6 @@ pub trait ServiceFactory { fn map_init_err(self, f: F) -> crate::map_init_err::MapInitErr where Self: Sized, - ::Future: Unpin, F: Fn(Self::InitError) -> E + Unpin + Clone, { crate::map_init_err::MapInitErr::new(self, f) @@ -299,6 +294,7 @@ pub mod dev { FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig, }; pub use crate::map::{Map, MapServiceFactory}; + pub use crate::map_config::{MapConfig, UnitConfig}; pub use crate::map_err::{MapErr, MapErrServiceFactory}; pub use crate::map_init_err::MapInitErr; pub use crate::then::{ThenService, ThenServiceFactory}; diff --git a/actix-service/src/map_config.rs b/actix-service/src/map_config.rs index ace723e4..101a5eaf 100644 --- a/actix-service/src/map_config.rs +++ b/actix-service/src/map_config.rs @@ -8,16 +8,7 @@ pub enum MappedConfig<'a, T> { } /// Adapt external config to a config for provided new service -pub fn map_config( - factory: T, - f: F, -) -> impl ServiceFactory< - Config = C, - Request = T::Request, - Response = T::Response, - Error = T::Error, - InitError = T::InitError, -> +pub fn map_config(factory: T, f: F) -> MapConfig where T: ServiceFactory, F: Fn(&C) -> MappedConfig, @@ -26,23 +17,15 @@ where } /// Replace config with unit -pub fn unit_config( - new_service: T, -) -> impl ServiceFactory< - Config = C, - Request = T::Request, - Response = T::Response, - Error = T::Error, - InitError = T::InitError, -> +pub fn unit_config(new_service: T) -> UnitConfig where T: ServiceFactory, { UnitConfig::new(new_service) } -/// `MapInitErr` service combinator -pub(crate) struct MapConfig { +/// `.map_config()` service combinator +pub struct MapConfig { a: A, f: F, e: PhantomData, @@ -50,7 +33,7 @@ pub(crate) struct MapConfig { impl MapConfig { /// Create new `MapConfig` combinator - pub fn new(a: A, f: F) -> Self + pub(crate) fn new(a: A, f: F) -> Self where A: ServiceFactory, F: Fn(&C) -> MappedConfig, @@ -99,8 +82,8 @@ where } } -/// `MapInitErr` service combinator -pub(crate) struct UnitConfig { +/// `unit_config()` config combinator +pub struct UnitConfig { a: A, e: PhantomData, } diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index fd58dbbd..9c63d2c4 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -43,7 +43,7 @@ impl Pipeline { where Self: Sized, F: IntoService, - U: Service + Unpin, + U: Service, { Pipeline { service: AndThenService::new(self.service, service.into_service()), diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index 37ddc21e..190eb5c5 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -149,10 +149,8 @@ where InitError = A::InitError, >, A::Future: Unpin, - A::Service: Unpin, ::Future: Unpin, B::Future: Unpin, - B::Service: Unpin, ::Future: Unpin, { type Request = A::Request; @@ -208,10 +206,8 @@ where InitError = A::InitError, >, A::Future: Unpin, - A::Service: Unpin, ::Future: Unpin, B::Future: Unpin, - B::Service: Unpin, ::Future: Unpin, { fn new(fut_a: A::Future, fut_b: B::Future) -> Self { @@ -224,6 +220,22 @@ where } } +impl Unpin for ThenServiceFactoryResponse +where + A: ServiceFactory, + B: ServiceFactory< + Config = A::Config, + Request = Result, + Error = A::Error, + InitError = A::InitError, + >, + A::Future: Unpin, + ::Future: Unpin, + B::Future: Unpin, + ::Future: Unpin, +{ +} + impl Future for ThenServiceFactoryResponse where A: ServiceFactory, @@ -234,10 +246,8 @@ where InitError = A::InitError, >, A::Future: Unpin, - A::Service: Unpin, ::Future: Unpin, B::Future: Unpin, - B::Service: Unpin, ::Future: Unpin, { type Output = Result, A::InitError>; diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 85f4212e..acb75340 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -23,7 +23,6 @@ actix-codec = "0.2.0-alpha.1" bytes = "0.4" either = "1.5.2" futures = "0.3.1" -pin-project = "0.4.5" tokio-timer = "0.3.0-alpha.6" tokio-executor = { version="=0.2.0-alpha.6", features=["current-thread"] } log = "0.4" diff --git a/actix-utils/src/either.rs b/actix-utils/src/either.rs index 2bc07611..09533a20 100644 --- a/actix-utils/src/either.rs +++ b/actix-utils/src/either.rs @@ -4,7 +4,6 @@ use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; use futures::{future, ready, Future}; -use pin_project::pin_project; /// Combine two different service types into a single type. /// @@ -84,6 +83,8 @@ where Error = A::Error, InitError = A::InitError, >, + A::Future: Unpin, + B::Future: Unpin, { type Request = either::Either; type Response = A::Response; @@ -112,32 +113,40 @@ impl Clone for Either { } } -#[pin_project] #[doc(hidden)] pub struct EitherNewService { left: Option, right: Option, - #[pin] left_fut: A::Future, - #[pin] right_fut: B::Future, } +impl Unpin for EitherNewService +where + A: ServiceFactory, + B: ServiceFactory, + A::Future: Unpin, + B::Future: Unpin, +{ +} + impl Future for EitherNewService where A: ServiceFactory, B: ServiceFactory, + A::Future: Unpin, + B::Future: Unpin, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = self.project(); + let this = self.get_mut(); if this.left.is_none() { - *this.left = Some(ready!(this.left_fut.poll(cx))?); + this.left = Some(ready!(Pin::new(&mut this.left_fut).poll(cx))?); } if this.right.is_none() { - *this.right = Some(ready!(this.right_fut.poll(cx))?); + this.right = Some(ready!(Pin::new(&mut this.right_fut).poll(cx))?); } if this.left.is_some() && this.right.is_some() { diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index c21d9f8c..ee95354c 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -10,7 +10,6 @@ use actix_service::{IntoService, Service}; use futures::future::{ready, FutureExt}; use futures::{Future, Sink, Stream}; use log::debug; -use pin_project::pin_project; use crate::cell::Cell; use crate::mpsc; @@ -78,7 +77,6 @@ type Inner = Cell::Item, S::E /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. -#[pin_project] pub struct FramedTransport where S: Service, Response = Response>, @@ -111,7 +109,7 @@ struct FramedTransportInner { impl FramedTransport where - S: Service, Response = Response>, + S: Service, Response = Response> + Unpin, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite + Unpin, @@ -167,27 +165,28 @@ where impl Future for FramedTransport where - S: Service, Response = Response>, - S::Error: 'static, + S: Service, Response = Response> + Unpin, + S::Error: Unpin + 'static, S::Future: 'static, T: AsyncRead + AsyncWrite + Unpin, U: Decoder + Encoder + Unpin, ::Item: 'static, - ::Error: std::fmt::Debug, + ::Error: Unpin + std::fmt::Debug, + ::Error: Unpin + std::fmt::Debug, { type Output = Result<(), FramedTransportError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.inner.get_ref().task.register(cx.waker()); - let this = self.project(); + let this = self.get_mut(); poll( cx, - this.service, - this.state, - this.framed, - this.rx, - this.inner, + &mut this.service, + &mut this.state, + &mut this.framed, + &mut this.rx, + &mut this.inner, ) } } diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs index 35cd904d..28761d84 100644 --- a/actix-utils/src/inflight.rs +++ b/actix-utils/src/inflight.rs @@ -5,7 +5,6 @@ use std::task::{Context, Poll}; use actix_service::{IntoService, Service, Transform}; use futures::future::{ok, Ready}; -use pin_project::pin_project; use super::counter::{Counter, CounterGuard}; @@ -29,7 +28,11 @@ impl Default for InFlight { } } -impl Transform for InFlight { +impl Transform for InFlight +where + S: Service, + S::Future: Unpin, +{ type Request = S::Request; type Response = S::Response; type Error = S::Error; @@ -65,6 +68,7 @@ where impl Service for InFlightService where T: Service, + T::Future: Unpin, { type Request = T::Request; type Response = T::Response; @@ -90,19 +94,20 @@ where } } -#[pin_project] #[doc(hidden)] pub struct InFlightServiceResponse { - #[pin] fut: T::Future, _guard: CounterGuard, } -impl Future for InFlightServiceResponse { +impl Future for InFlightServiceResponse +where + T::Future: Unpin, +{ type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.project().fut.poll(cx) + Pin::new(&mut self.get_mut().fut).poll(cx) } } diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 2e8939cb..7601ba49 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -10,7 +10,6 @@ use std::{fmt, time}; use actix_service::{IntoService, Service, Transform}; use futures::future::{ok, Ready}; -use pin_project::pin_project; use tokio_timer::{clock, delay, Delay}; /// Applies a timeout to requests. @@ -85,6 +84,7 @@ impl Clone for Timeout { impl Transform for Timeout where S: Service, + S::Future: Unpin, { type Request = S::Request; type Response = S::Response; @@ -126,6 +126,7 @@ where impl Service for TimeoutService where S: Service, + S::Future: Unpin, { type Request = S::Request; type Response = S::Response; @@ -145,10 +146,8 @@ where } /// `TimeoutService` response future -#[pin_project] #[derive(Debug)] pub struct TimeoutServiceResponse { - #[pin] fut: T::Future, sleep: Delay, } @@ -156,14 +155,15 @@ pub struct TimeoutServiceResponse { impl Future for TimeoutServiceResponse where T: Service, + T::Future: Unpin, { type Output = Result>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let this = self.get_mut(); // First, try polling the future - match this.fut.poll(cx) { + match Pin::new(&mut this.fut).poll(cx) { Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))), Poll::Pending => {}