1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-28 05:10:36 +02:00

remove pin-project; update Unpin consrtaint

This commit is contained in:
Nikolay Kim
2019-11-18 18:28:54 +06:00
parent 7404d82a9b
commit 1354946460
22 changed files with 225 additions and 161 deletions

View File

@ -24,7 +24,6 @@ actix-utils = "0.5.0-alpha.1"
bytes = "0.4"
either = "1.5.2"
futures = "0.3.1"
pin-project = "0.4.5"
tokio-executor = "=0.2.0-alpha.6"
log = "0.4"

View File

@ -5,7 +5,6 @@ use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_utils::mpsc;
use futures::Stream;
use pin_project::pin_project;
use crate::dispatcher::FramedMessage;
use crate::sink::Sink;
@ -42,10 +41,8 @@ where
}
}
#[pin_project]
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
pub(crate) state: St,
#[pin]
pub(crate) framed: Framed<Io, Codec>,
pub(crate) rx: mpsc::Receiver<FramedMessage<<Codec as Encoder>::Item>>,
pub(crate) sink: Sink<<Codec as Encoder>::Item>,
@ -78,6 +75,13 @@ impl<Io, St, Codec: Encoder + Decoder> ConnectResult<Io, St, Codec> {
}
}
impl<Io, St, Codec> Unpin for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
{
}
impl<Io, St, Codec> Stream for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite + Unpin,
@ -86,7 +90,7 @@ where
type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().framed.poll_next(cx)
self.get_mut().framed.next_item(cx)
}
}
@ -98,21 +102,21 @@ where
type Error = <Codec as Encoder>::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().framed.poll_ready(cx)
self.get_mut().framed.is_ready(cx)
}
fn start_send(
self: Pin<&mut Self>,
item: <Codec as Encoder>::Item,
) -> Result<(), Self::Error> {
self.project().framed.start_send(item)
self.get_mut().framed.write(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().framed.poll_flush(cx)
self.get_mut().framed.flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().framed.poll_close(cx)
self.get_mut().framed.close(cx)
}
}

View File

@ -1,6 +1,5 @@
//! Framed dispatcher service and related utilities
use std::collections::VecDeque;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::rc::Rc;
@ -31,7 +30,6 @@ pub(crate) enum FramedMessage<T> {
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
#[pin_project::pin_project]
pub(crate) struct FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
@ -123,32 +121,45 @@ struct FramedDispatcherInner<I, E> {
task: LocalWaker,
}
impl<St, S, T, U> Future for FramedDispatcher<St, S, T, U>
impl<St, S, T, U> Unpin for FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
S: Service<Request = Request<St, U>, Response = Option<Response<U>>> + Unpin,
S::Error: Unpin + 'static,
S::Future: Unpin + 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Output = Result<(), ServiceError<S::Error, U>>;
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
unsafe { self.inner.get_ref().task.register(cx.waker()) };
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>> + Unpin,
S::Error: 'static,
S::Future: Unpin + 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
pub(crate) fn poll(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), ServiceError<S::Error, U>>> {
let this = self;
unsafe { this.inner.get_ref().task.register(cx.waker()) };
let this = self.project();
poll(
cx,
this.service,
this.state,
this.sink,
this.framed,
this.dispatch_state,
this.rx,
this.inner,
this.disconnect,
&mut this.service,
&mut this.state,
&mut this.sink,
&mut this.framed,
&mut this.dispatch_state,
&mut this.rx,
&mut this.inner,
&mut this.disconnect,
)
}
}

View File

@ -8,7 +8,6 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use either::Either;
use futures::future::{FutureExt, LocalBoxFuture};
use pin_project::{pin_project, project};
use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::FramedDispatcher;
@ -77,6 +76,7 @@ where
Io: AsyncRead + AsyncWrite + Unpin,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: Unpin,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
@ -93,10 +93,7 @@ where
}
/// Provide stream items handler service and construct service factory.
pub fn finish<F, T>(
self,
service: F,
) -> impl Service<Request = Io, Response = (), Error = ServiceError<C::Error, Codec>>
pub fn finish<F, T>(self, service: F) -> FramedServiceImpl<St, C, T, Io, Codec>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
@ -106,6 +103,9 @@ where
Error = C::Error,
InitError = C::Error,
> + 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
{
FramedServiceImpl {
connect: self.connect,
@ -132,7 +132,8 @@ where
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: 'static,
C::Future: Unpin + 'static,
<C::Service as Service>::Future: Unpin,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
@ -148,15 +149,7 @@ where
self
}
pub fn finish<F, T, Cfg>(
self,
service: F,
) -> impl ServiceFactory<
Config = Cfg,
Request = Io,
Response = (),
Error = ServiceError<C::Error, Codec>,
>
pub fn finish<F, T, Cfg>(self, service: F) -> FramedService<St, C, T, Io, Codec, Cfg>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
@ -166,6 +159,9 @@ where
Error = C::Error,
InitError = C::Error,
> + 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
{
FramedService {
connect: self.connect,
@ -176,7 +172,7 @@ where
}
}
pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> {
pub struct FramedService<St, C, T, Io, Codec, Cfg> {
connect: C,
handler: Rc<T>,
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
@ -193,7 +189,8 @@ where
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: 'static,
C::Future: Unpin + 'static,
<C::Service as Service>::Future: Unpin,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
@ -201,6 +198,9 @@ where
Error = C::Error,
InitError = C::Error,
> + 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
@ -244,6 +244,7 @@ where
Io: AsyncRead + AsyncWrite + Unpin,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: Unpin,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
@ -251,7 +252,9 @@ where
Error = C::Error,
InitError = C::Error,
>,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Codec: Decoder + Encoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
@ -276,11 +279,11 @@ where
}
}
#[pin_project]
pub struct FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: Unpin,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
@ -288,7 +291,9 @@ where
Error = C::Error,
InitError = C::Error,
>,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
@ -297,9 +302,10 @@ where
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
}
impl<St, Io, Codec, C, T> Future for FramedServiceImplResponse<St, Io, Codec, C, T>
impl<St, Io, Codec, C, T> Unpin for FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
@ -308,7 +314,31 @@ where
Error = C::Error,
InitError = C::Error,
>,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
}
impl<St, Io, Codec, C, T> Future for FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
@ -320,7 +350,7 @@ where
let this = self.get_mut();
loop {
match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) {
match this.inner.poll(cx) {
Either::Left(new) => this.inner = new,
Either::Right(poll) => return poll,
};
@ -328,10 +358,10 @@ where
}
}
#[pin_project]
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
@ -340,15 +370,17 @@ where
Error = C::Error,
InitError = C::Error,
>,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
Connect(#[pin] C::Future, Rc<T>, Option<Rc<dyn Fn(&mut St, bool)>>),
Connect(C::Future, Rc<T>, Option<Rc<dyn Fn(&mut St, bool)>>),
Handler(
#[pin] T::Future,
T::Future,
Option<ConnectResult<Io, St, Codec>>,
Option<Rc<dyn Fn(&mut St, bool)>>,
),
@ -358,6 +390,7 @@ where
impl<St, Io, Codec, C, T> FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
@ -366,22 +399,22 @@ where
Error = C::Error,
InitError = C::Error,
>,
<<T as ServiceFactory>::Service as Service>::Future: 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
#[project]
fn poll(
self: Pin<&mut Self>,
&mut self,
cx: &mut Context,
) -> Either<
FramedServiceImplResponseInner<St, Io, Codec, C, T>,
Poll<Result<(), ServiceError<C::Error, Codec>>>,
> {
#[project]
match self.project() {
match self {
FramedServiceImplResponseInner::Connect(
ref mut fut,
ref handler,
@ -417,7 +450,7 @@ where
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
},
FramedServiceImplResponseInner::Dispatcher(ref mut fut) => {
Either::Right(Pin::new(fut).poll(cx))
Either::Right(fut.poll(cx))
}
}
}