1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-28 05:10:36 +02:00

cleanup Unpin constraint; simplify Framed impl

This commit is contained in:
Nikolay Kim
2019-11-19 14:51:40 +06:00
parent 617e40a7e9
commit 3bf83c1d98
30 changed files with 493 additions and 1252 deletions

View File

@ -24,6 +24,7 @@ actix-utils = "0.5.0-alpha.1"
bytes = "0.4"
either = "1.5.2"
futures = "0.3.1"
pin-project = "0.4.5"
tokio-executor = "=0.2.0-alpha.6"
log = "0.4"

View File

@ -16,7 +16,7 @@ pub struct Connect<Io, St = (), Codec = ()> {
impl<Io> Connect<Io>
where
Io: AsyncRead + AsyncWrite + Unpin,
Io: AsyncRead + AsyncWrite,
{
pub(crate) fn new(io: Io) -> Self {
Self {
@ -27,7 +27,7 @@ where
pub fn codec<Codec>(self, codec: Codec) -> ConnectResult<Io, (), Codec>
where
Codec: Encoder + Decoder + Unpin,
Codec: Encoder + Decoder,
{
let (tx, rx) = mpsc::channel();
let sink = Sink::new(tx);
@ -41,6 +41,7 @@ where
}
}
#[pin_project::pin_project]
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
pub(crate) state: St,
pub(crate) framed: Framed<Io, Codec>,
@ -75,41 +76,38 @@ impl<Io, St, Codec: Encoder + Decoder> ConnectResult<Io, St, Codec> {
}
}
impl<Io, St, Codec> Unpin for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
{
}
impl<Io, St, Codec> Stream for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.get_mut().framed.next_item(cx)
self.project().framed.next_item(cx)
}
}
impl<Io, St, Codec> futures::Sink<<Codec as Encoder>::Item> for ConnectResult<Io, St, Codec>
where
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type Error = <Codec as Encoder>::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().framed.is_ready(cx)
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
if self.framed.is_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn start_send(
self: Pin<&mut Self>,
item: <Codec as Encoder>::Item,
) -> Result<(), Self::Error> {
self.get_mut().framed.write(item)
self.project().framed.write(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {

View File

@ -30,13 +30,14 @@ pub(crate) enum FramedMessage<T> {
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
#[pin_project::pin_project]
pub(crate) struct FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Encoder + Decoder + Unpin,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -55,8 +56,8 @@ where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -84,7 +85,7 @@ where
}
}
enum FramedState<S: Service, U: Encoder + Decoder + Unpin> {
enum FramedState<S: Service, U: Encoder + Decoder> {
Processing,
Error(ServiceError<S::Error, U>),
FramedError(ServiceError<S::Error, U>),
@ -92,7 +93,7 @@ enum FramedState<S: Service, U: Encoder + Decoder + Unpin> {
Stopping,
}
impl<S: Service, U: Encoder + Decoder + Unpin> FramedState<S, U> {
impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
fn stop(&mut self, tx: Option<oneshot::Sender<()>>) {
match self {
FramedState::FlushAndStop(ref mut vec) => {
@ -121,25 +122,13 @@ struct FramedDispatcherInner<I, E> {
task: LocalWaker,
}
impl<St, S, T, U> Unpin for FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>> + Unpin,
S::Error: Unpin + 'static,
S::Future: Unpin + 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
}
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>> + Unpin,
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: Unpin + 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -179,8 +168,8 @@ where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -267,8 +256,8 @@ where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -328,8 +317,8 @@ where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{

View File

@ -8,6 +8,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use either::Either;
use futures::future::{FutureExt, LocalBoxFuture};
use pin_project::project;
use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::FramedDispatcher;
@ -31,9 +32,9 @@ impl<St, Codec> Builder<St, Codec> {
pub fn service<Io, C, F>(self, connect: F) -> ServiceBuilder<St, C, Io, Codec>
where
F: IntoService<C>,
Io: AsyncRead + AsyncWrite + Unpin,
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
Codec: Decoder + Encoder + Unpin,
Codec: Decoder + Encoder,
{
ServiceBuilder {
connect: connect.into_service(),
@ -46,7 +47,7 @@ impl<St, Codec> Builder<St, Codec> {
pub fn factory<Io, C, F>(self, connect: F) -> NewServiceBuilder<St, C, Io, Codec>
where
F: IntoServiceFactory<C>,
Io: AsyncRead + AsyncWrite + Unpin,
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io>,
@ -54,7 +55,7 @@ impl<St, Codec> Builder<St, Codec> {
>,
C::Error: 'static,
C::Future: 'static,
Codec: Decoder + Encoder + Unpin,
Codec: Decoder + Encoder,
{
NewServiceBuilder {
connect: connect.into_factory(),
@ -73,11 +74,10 @@ pub struct ServiceBuilder<St, C, Io, Codec> {
impl<St, C, Io, Codec> ServiceBuilder<St, C, Io, Codec>
where
St: 'static,
Io: AsyncRead + AsyncWrite + Unpin,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: Unpin,
Codec: Decoder + Encoder + Unpin,
Io: AsyncRead + AsyncWrite,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -103,9 +103,6 @@ where
Error = C::Error,
InitError = C::Error,
> + 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
{
FramedServiceImpl {
connect: self.connect,
@ -125,16 +122,15 @@ pub struct NewServiceBuilder<St, C, Io, Codec> {
impl<St, C, Io, Codec> NewServiceBuilder<St, C, Io, Codec>
where
St: 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io>,
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: Unpin + 'static,
<C::Service as Service>::Future: Unpin,
Codec: Decoder + Encoder + Unpin,
C::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -159,9 +155,6 @@ where
Error = C::Error,
InitError = C::Error,
> + 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
{
FramedService {
connect: self.connect,
@ -182,15 +175,14 @@ pub struct FramedService<St, C, T, Io, Codec, Cfg> {
impl<St, C, T, Io, Codec, Cfg> ServiceFactory for FramedService<St, C, T, Io, Codec, Cfg>
where
St: 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io>,
Response = ConnectResult<Io, St, Codec>,
>,
C::Error: 'static,
C::Future: Unpin + 'static,
<C::Service as Service>::Future: Unpin,
C::Future: 'static,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
@ -198,10 +190,8 @@ where
Error = C::Error,
InitError = C::Error,
> + 'static,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Codec: Decoder + Encoder + Unpin,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -241,10 +231,9 @@ pub struct FramedServiceImpl<St, C, T, Io, Codec> {
impl<St, C, T, Io, Codec> Service for FramedServiceImpl<St, C, T, Io, Codec>
where
Io: AsyncRead + AsyncWrite + Unpin,
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: Unpin,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
@ -252,10 +241,8 @@ where
Error = C::Error,
InitError = C::Error,
>,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Codec: Decoder + Encoder + Unpin,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
@ -279,11 +266,11 @@ where
}
}
#[pin_project::pin_project]
pub struct FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Error: 'static,
C::Future: Unpin,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
@ -291,43 +278,19 @@ where
Error = C::Error,
InitError = C::Error,
>,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
#[pin]
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
}
impl<St, Io, Codec, C, T> Unpin for FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
Request = RequestItem<St, Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
}
impl<St, Io, Codec, C, T> Future for FramedServiceImplResponse<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
@ -336,32 +299,33 @@ where
Error = C::Error,
InitError = C::Error,
>,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
type Output = Result<(), ServiceError<C::Error, Codec>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
loop {
match this.inner.poll(cx) {
Either::Left(new) => this.inner = new,
Either::Left(new) => {
this = self.as_mut().project();
this.inner.set(new)
}
Either::Right(poll) => return poll,
};
}
}
}
#[pin_project::pin_project]
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
@ -370,27 +334,24 @@ where
Error = C::Error,
InitError = C::Error,
>,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
Connect(C::Future, Rc<T>, Option<Rc<dyn Fn(&mut St, bool)>>),
Connect(#[pin] C::Future, Rc<T>, Option<Rc<dyn Fn(&mut St, bool)>>),
Handler(
T::Future,
#[pin] T::Future,
Option<ConnectResult<Io, St, Codec>>,
Option<Rc<dyn Fn(&mut St, bool)>>,
),
Dispatcher(FramedDispatcher<St, T::Service, Io, Codec>),
Dispatcher(#[pin] FramedDispatcher<St, T::Service, Io, Codec>),
}
impl<St, Io, Codec, C, T> FramedServiceImplResponseInner<St, Io, Codec, C, T>
where
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
C::Future: Unpin,
C::Error: 'static,
T: ServiceFactory<
Config = St,
@ -399,40 +360,37 @@ where
Error = C::Error,
InitError = C::Error,
>,
T::Future: Unpin,
T::Service: Unpin,
<T::Service as Service>::Future: Unpin + 'static,
Io: AsyncRead + AsyncWrite + Unpin,
Codec: Encoder + Decoder + Unpin,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
{
#[project]
fn poll(
&mut self,
self: Pin<&mut Self>,
cx: &mut Context,
) -> Either<
FramedServiceImplResponseInner<St, Io, Codec, C, T>,
Poll<Result<(), ServiceError<C::Error, Codec>>>,
> {
match self {
FramedServiceImplResponseInner::Connect(
ref mut fut,
ref handler,
ref mut disconnect,
) => match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler(
handler.new_service(&res.state),
Some(res),
disconnect.take(),
)),
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
},
FramedServiceImplResponseInner::Handler(
ref mut fut,
ref mut res,
ref mut disconnect,
) => match Pin::new(fut).poll(cx) {
#[project]
match self.project() {
FramedServiceImplResponseInner::Connect(fut, handler, disconnect) => {
match fut.poll(cx) {
Poll::Ready(Ok(res)) => {
Either::Left(FramedServiceImplResponseInner::Handler(
handler.new_service(&res.state),
Some(res),
disconnect.take(),
))
}
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
}
}
FramedServiceImplResponseInner::Handler(fut, res, disconnect) => match fut.poll(cx)
{
Poll::Ready(Ok(handler)) => {
let res = res.take().unwrap();
Either::Left(FramedServiceImplResponseInner::Dispatcher(