From 3bf83c1d9846658442963c2a444d0c9e2d937126 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 19 Nov 2019 14:51:40 +0600 Subject: [PATCH] cleanup Unpin constraint; simplify Framed impl --- actix-codec/Cargo.toml | 2 +- actix-codec/src/framed.rs | 378 ++++++++++++++++-------------- actix-codec/src/framed_read.rs | 248 -------------------- actix-codec/src/framed_write.rs | 333 -------------------------- actix-codec/src/lib.rs | 4 - actix-connect/src/service.rs | 24 +- actix-connect/src/ssl/openssl.rs | 4 +- actix-connect/src/ssl/rustls.rs | 8 +- actix-ioframe/Cargo.toml | 1 + actix-ioframe/src/connect.rs | 32 ++- actix-ioframe/src/dispatcher.rs | 45 ++-- actix-ioframe/src/service.rs | 162 +++++-------- actix-server/src/ssl/openssl.rs | 13 +- actix-server/src/ssl/rustls.rs | 9 +- actix-service/Cargo.toml | 3 +- actix-service/src/and_then.rs | 65 ++--- actix-service/src/apply.rs | 24 +- actix-service/src/apply_cfg.rs | 84 ++----- actix-service/src/fn_service.rs | 38 +-- actix-service/src/lib.rs | 8 +- actix-service/src/map.rs | 29 ++- actix-service/src/map_err.rs | 31 ++- actix-service/src/map_init_err.rs | 12 +- actix-service/src/then.rs | 68 ++---- actix-service/src/transform.rs | 30 +-- actix-utils/Cargo.toml | 1 + actix-utils/src/either.rs | 22 +- actix-utils/src/framed.rs | 47 ++-- actix-utils/src/inflight.rs | 11 +- actix-utils/src/timeout.rs | 9 +- 30 files changed, 493 insertions(+), 1252 deletions(-) delete mode 100644 actix-codec/src/framed_read.rs delete mode 100644 actix-codec/src/framed_write.rs diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 51a7b21b..c9970269 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -19,8 +19,8 @@ path = "src/lib.rs" [dependencies] bytes = "0.4.12" -pin-utils = "0.1.0-alpha.4" futures = "0.3.1" +pin-project = "0.4.5" tokio-io = "0.2.0-alpha.6" tokio-codec = "0.2.0-alpha.6" log = "0.4" \ No newline at end of file diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 949e7b97..a8c388cd 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -1,35 +1,40 @@ #![allow(deprecated)] use std::fmt; -use std::io::{self, Read, Write}; +use std::io::{self}; use std::pin::Pin; use std::task::{Context, Poll}; use bytes::BytesMut; -use futures::{Sink, Stream}; +use futures::{ready, Sink, Stream}; +use pin_project::pin_project; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; -use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; -use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; - const LW: usize = 1024; const HW: usize = 8 * 1024; +const INITIAL_CAPACITY: usize = 8 * 1024; /// A unified `Stream` and `Sink` interface to an underlying I/O object, using /// the `Encoder` and `Decoder` traits to encode and decode frames. /// /// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +#[pin_project] pub struct Framed { - inner: FramedRead2>>, + io: T, + codec: U, + eof: bool, + is_readable: bool, + read_buf: BytesMut, + write_buf: BytesMut, + write_lw: usize, + write_hw: usize, } -pub struct Fuse(pub T, pub U); - impl Framed where - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, { /// Provides a `Stream` and `Sink` interface for reading and writing to this /// `Io` object, using `Decode` and `Encode` to read and write the raw data. @@ -48,17 +53,31 @@ where /// If you want to work more directly with the streams and sink, consider /// calling `split` on the `Framed` returned by this method, which will /// break them into separate objects, allowing them to interact more easily. - pub fn new(inner: T, codec: U) -> Framed { + pub fn new(io: T, codec: U) -> Framed { Framed { - inner: framed_read2(framed_write2(Fuse(inner, codec), LW, HW)), + io, + codec, + eof: false, + is_readable: false, + read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), + write_buf: BytesMut::with_capacity(HW), + write_lw: LW, + write_hw: HW, } } /// Same as `Framed::new()` with ability to specify write buffer low/high capacity watermarks. - pub fn new_with_caps(inner: T, codec: U, lw: usize, hw: usize) -> Framed { + pub fn new_with_caps(io: T, codec: U, lw: usize, hw: usize) -> Framed { debug_assert!((lw < hw) && hw != 0); Framed { - inner: framed_read2(framed_write2(Fuse(inner, codec), lw, hw)), + io, + codec, + eof: false, + is_readable: false, + read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), + write_buf: BytesMut::with_capacity(hw), + write_lw: lw, + write_hw: hw, } } } @@ -87,26 +106,25 @@ impl Framed { /// break them into separate objects, allowing them to interact more easily. pub fn from_parts(parts: FramedParts) -> Framed { Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer( - Fuse(parts.io, parts.codec), - parts.write_buf, - parts.write_buf_lw, - parts.write_buf_hw, - ), - parts.read_buf, - ), + io: parts.io, + codec: parts.codec, + eof: false, + is_readable: false, + write_buf: parts.write_buf, + write_lw: parts.write_buf_lw, + write_hw: parts.write_buf_hw, + read_buf: parts.read_buf, } } /// Returns a reference to the underlying codec. pub fn get_codec(&self) -> &U { - &self.inner.get_ref().get_ref().1 + &self.codec } /// Returns a mutable reference to the underlying codec. pub fn get_codec_mut(&mut self) -> &mut U { - &mut self.inner.get_mut().get_mut().1 + &mut self.codec } /// Returns a reference to the underlying I/O stream wrapped by @@ -116,7 +134,7 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { - &self.inner.get_ref().get_ref().0 + &self.io } /// Returns a mutable reference to the underlying I/O stream wrapped by @@ -126,17 +144,17 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.get_mut().get_mut().0 + &mut self.io } /// Check if write buffer is empty. pub fn is_write_buf_empty(&self) -> bool { - self.inner.get_ref().is_empty() + self.write_buf.is_empty() } /// Check if write buffer is full. pub fn is_write_buf_full(&self) -> bool { - self.inner.get_ref().is_full() + self.write_buf.len() >= self.write_hw } /// Consumes the `Frame`, returning its underlying I/O stream. @@ -145,19 +163,20 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { - self.inner.into_inner().into_inner().0 + self.io } /// Consume the `Frame`, returning `Frame` with different codec. pub fn into_framed(self, codec: U2) -> Framed { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf, lw, hw) = inner.into_parts(); - Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer(Fuse(inner.0, codec), write_buf, lw, hw), - read_buf, - ), + io: self.io, + codec, + eof: self.eof, + is_readable: self.is_readable, + read_buf: self.read_buf, + write_buf: self.write_buf, + write_lw: self.write_lw, + write_hw: self.write_hw, } } @@ -166,14 +185,15 @@ impl Framed { where F: Fn(T) -> T2, { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf, lw, hw) = inner.into_parts(); - Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer(Fuse(f(inner.0), inner.1), write_buf, lw, hw), - read_buf, - ), + io: f(self.io), + codec: self.codec, + eof: self.eof, + is_readable: self.is_readable, + read_buf: self.read_buf, + write_buf: self.write_buf, + write_lw: self.write_lw, + write_hw: self.write_hw, } } @@ -182,14 +202,15 @@ impl Framed { where F: Fn(U) -> U2, { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf, lw, hw) = inner.into_parts(); - Framed { - inner: framed_read2_with_buffer( - framed_write2_with_buffer(Fuse(inner.0, f(inner.1)), write_buf, lw, hw), - read_buf, - ), + io: self.io, + codec: f(self.codec), + eof: self.eof, + is_readable: self.is_readable, + read_buf: self.read_buf, + write_buf: self.write_buf, + write_lw: self.write_lw, + write_hw: self.write_hw, } } @@ -200,16 +221,13 @@ impl Framed { /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_parts(self) -> FramedParts { - let (inner, read_buf) = self.inner.into_parts(); - let (inner, write_buf, write_buf_lw, write_buf_hw) = inner.into_parts(); - FramedParts { - io: inner.0, - codec: inner.1, - read_buf, - write_buf, - write_buf_lw, - write_buf_hw, + io: self.io, + codec: self.codec, + read_buf: self.read_buf, + write_buf: self.write_buf, + write_buf_lw: self.write_lw, + write_buf_hw: self.write_hw, _priv: (), } } @@ -219,92 +237,168 @@ impl Framed { /// Serialize item and Write to the inner buffer pub fn write(&mut self, item: ::Item) -> Result<(), ::Error> where - T: AsyncWrite + Unpin, - U: Encoder + Unpin, + T: AsyncWrite, + U: Encoder, { - self.inner.get_mut().force_send(item) + let len = self.write_buf.len(); + if len < self.write_lw { + self.write_buf.reserve(self.write_hw - len) + } + self.codec.encode(item, &mut self.write_buf)?; + Ok(()) } - pub fn is_ready(&mut self, cx: &mut Context<'_>) -> Poll> - where - T: AsyncWrite + Unpin, - U: Encoder + Unpin, - U::Error: From, - { - Pin::new(&mut self.inner.get_mut()).poll_ready(cx) + pub fn is_ready(&self) -> bool { + let len = self.write_buf.len(); + len < self.write_hw } - pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> + pub fn next_item(&mut self, cx: &mut Context) -> Poll>> where - T: AsyncRead + Unpin, - U: Decoder + Unpin, + T: AsyncRead, + U: Decoder, { - Pin::new(&mut self.inner).poll_next(cx) + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + + if self.is_readable { + if self.eof { + match self.codec.decode_eof(&mut self.read_buf) { + Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))), + Ok(None) => return Poll::Ready(None), + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + log::trace!("attempting to decode a frame"); + + match self.codec.decode(&mut self.read_buf) { + Ok(Some(frame)) => { + log::trace!("frame decoded from buffer"); + return Poll::Ready(Some(Ok(frame))); + } + Err(e) => return Poll::Ready(Some(Err(e))), + _ => { + // Need more data + } + } + + self.is_readable = false; + } + + assert!(!self.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + self.read_buf.reserve(1); + let cnt = unsafe { + match Pin::new_unchecked(&mut self.io).poll_read_buf(cx, &mut self.read_buf) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Ok(cnt)) => cnt, + } + }; + + if cnt == 0 { + self.eof = true; + } + self.is_readable = true; + } } - pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> + pub fn flush(&mut self, cx: &mut Context) -> Poll> where - T: AsyncWrite + Unpin, - U: Encoder + Unpin, + T: AsyncWrite, + U: Encoder, { - Pin::new(self.inner.get_mut()).poll_flush(cx) + log::trace!("flushing framed transport"); + + while !self.write_buf.is_empty() { + log::trace!("writing; remaining={}", self.write_buf.len()); + + let n = ready!( + unsafe { Pin::new_unchecked(&mut self.io) }.poll_write(cx, &self.write_buf) + )?; + + if n == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to \ + write frame to transport", + ) + .into())); + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = self.write_buf.split_to(n); + } + + // Try flushing the underlying IO + ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_flush(cx))?; + + log::trace!("framed transport flushed"); + Poll::Ready(Ok(())) } - pub fn close(&mut self, cx: &mut Context<'_>) -> Poll> + pub fn close(&mut self, cx: &mut Context) -> Poll> where - T: AsyncWrite + Unpin, - U: Encoder + Unpin, + T: AsyncWrite, + U: Encoder, { - Pin::new(&mut self.inner.get_mut()).poll_close(cx) + ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_flush(cx))?; + ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_shutdown(cx))?; + + Poll::Ready(Ok(())) } } impl Stream for Framed where - T: AsyncRead + Unpin, - U: Decoder + Unpin, + T: AsyncRead, + U: Decoder, { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.as_mut().inner).poll_next(cx) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.next_item(cx) } } impl Sink for Framed where - T: AsyncWrite + Unpin, - U: Encoder + Unpin, + T: AsyncWrite, + U: Encoder, U::Error: From, { type Error = U::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.as_mut().inner.get_mut()).poll_ready(cx) + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + if self.is_ready() { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } fn start_send( mut self: Pin<&mut Self>, item: ::Item, ) -> Result<(), Self::Error> { - Pin::new(&mut self.as_mut().inner.get_mut()).start_send(item) + self.write(item) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.as_mut().inner.get_mut()).poll_flush(cx) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.flush(cx) } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.as_mut().inner.get_mut()).poll_close(cx) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.close(cx) } } @@ -315,84 +409,12 @@ where { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Framed") - .field("io", &self.inner.get_ref().get_ref().0) - .field("codec", &self.inner.get_ref().get_ref().1) + .field("io", &self.io) + .field("codec", &self.codec) .finish() } } -// ===== impl Fuse ===== - -impl Read for Fuse { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.0.read(dst) - } -} - -impl AsyncRead for Fuse { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf) } - } -} - -impl Write for Fuse { - fn write(&mut self, src: &[u8]) -> io::Result { - self.0.write(src) - } - - fn flush(&mut self) -> io::Result<()> { - self.0.flush() - } -} - -impl AsyncWrite for Fuse { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf) } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx) } - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx) } - } -} - -impl Decoder for Fuse { - type Item = U::Item; - type Error = U::Error; - - fn decode(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { - self.1.decode(buffer) - } - - fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result, Self::Error> { - self.1.decode_eof(buffer) - } -} - -impl Encoder for Fuse { - type Item = U::Item; - type Error = U::Error; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.1.encode(item, dst) - } -} - /// `FramedParts` contains an export of the data of a Framed transport. /// It can be used to construct a new `Framed` with a different codec. /// It contains all current buffers and the inner transport. diff --git a/actix-codec/src/framed_read.rs b/actix-codec/src/framed_read.rs deleted file mode 100644 index 5d8d5ee8..00000000 --- a/actix-codec/src/framed_read.rs +++ /dev/null @@ -1,248 +0,0 @@ -use std::fmt; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use bytes::BytesMut; -use futures::{Sink, Stream}; -use log::trace; -use tokio_codec::Decoder; -use tokio_io::AsyncRead; - -use super::framed::Fuse; - -/// A `Stream` of messages decoded from an `AsyncRead`. -pub struct FramedRead { - inner: FramedRead2>, -} - -pub struct FramedRead2 { - inner: T, - eof: bool, - is_readable: bool, - buffer: BytesMut, -} - -const INITIAL_CAPACITY: usize = 8 * 1024; - -// ===== impl FramedRead ===== - -impl FramedRead -where - T: AsyncRead, - D: Decoder, -{ - /// Creates a new `FramedRead` with the given `decoder`. - pub fn new(inner: T, decoder: D) -> FramedRead { - FramedRead { - inner: framed_read2(Fuse(inner, decoder)), - } - } -} - -impl FramedRead { - /// Returns a reference to the underlying I/O stream wrapped by - /// `FramedRead`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_ref(&self) -> &T { - &self.inner.inner.0 - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `FramedRead`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 - } - - /// Consumes the `FramedRead`, returning its underlying I/O stream. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn into_inner(self) -> T { - self.inner.inner.0 - } - - /// Returns a reference to the underlying decoder. - pub fn decoder(&self) -> &D { - &self.inner.inner.1 - } - - /// Returns a mutable reference to the underlying decoder. - pub fn decoder_mut(&mut self) -> &mut D { - &mut self.inner.inner.1 - } -} - -impl Stream for FramedRead -where - T: AsyncRead, - D: Decoder, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx) } - } -} - -impl Sink for FramedRead -where - T: Sink, -{ - type Error = T::Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - self.map_unchecked_mut(|s| &mut s.inner.inner.0) - .poll_ready(cx) - } - } - - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - unsafe { - self.map_unchecked_mut(|s| &mut s.inner.inner.0) - .start_send(item) - } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - self.map_unchecked_mut(|s| &mut s.inner.inner.0) - .poll_flush(cx) - } - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - self.map_unchecked_mut(|s| &mut s.inner.inner.0) - .poll_close(cx) - } - } -} - -impl fmt::Debug for FramedRead -where - T: fmt::Debug, - D: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("FramedRead") - .field("inner", &self.inner.inner.0) - .field("decoder", &self.inner.inner.1) - .field("eof", &self.inner.eof) - .field("is_readable", &self.inner.is_readable) - .field("buffer", &self.inner.buffer) - .finish() - } -} - -// ===== impl FramedRead2 ===== - -pub fn framed_read2(inner: T) -> FramedRead2 { - FramedRead2 { - inner, - eof: false, - is_readable: false, - buffer: BytesMut::with_capacity(INITIAL_CAPACITY), - } -} - -pub fn framed_read2_with_buffer(inner: T, mut buf: BytesMut) -> FramedRead2 { - if buf.capacity() < INITIAL_CAPACITY { - let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); - buf.reserve(bytes_to_reserve); - } - FramedRead2 { - inner, - eof: false, - is_readable: !buf.is_empty(), - buffer: buf, - } -} - -impl FramedRead2 { - pub fn get_ref(&self) -> &T { - &self.inner - } - - pub fn into_inner(self) -> T { - self.inner - } - - pub fn into_parts(self) -> (T, BytesMut) { - (self.inner, self.buffer) - } - - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner - } -} - -impl Stream for FramedRead2 -where - T: tokio_io::AsyncRead + Decoder, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = unsafe { self.get_unchecked_mut() }; - loop { - // Repeatedly call `decode` or `decode_eof` as long as it is - // "readable". Readable is defined as not having returned `None`. If - // the upstream has returned EOF, and the decoder is no longer - // readable, it can be assumed that the decoder will never become - // readable again, at which point the stream is terminated. - - if this.is_readable { - if this.eof { - match this.inner.decode_eof(&mut this.buffer) { - Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))), - Ok(None) => return Poll::Ready(None), - Err(e) => return Poll::Ready(Some(Err(e))), - } - } - - trace!("attempting to decode a frame"); - - match this.inner.decode(&mut this.buffer) { - Ok(Some(frame)) => { - trace!("frame decoded from buffer"); - return Poll::Ready(Some(Ok(frame))); - } - Err(e) => return Poll::Ready(Some(Err(e))), - _ => { - // Need more data - } - } - - this.is_readable = false; - } - - assert!(!this.eof); - - // Otherwise, try to read more data and try again. Make sure we've - // got room for at least one byte to read to ensure that we don't - // get a spurious 0 that looks like EOF - this.buffer.reserve(1); - let cnt = unsafe { - match Pin::new_unchecked(&mut this.inner).poll_read_buf(cx, &mut this.buffer) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), - Poll::Ready(Ok(cnt)) => cnt, - } - }; - - if cnt == 0 { - this.eof = true; - } - this.is_readable = true; - } - } -} diff --git a/actix-codec/src/framed_write.rs b/actix-codec/src/framed_write.rs deleted file mode 100644 index 2791208a..00000000 --- a/actix-codec/src/framed_write.rs +++ /dev/null @@ -1,333 +0,0 @@ -use std::fmt; -use std::io::{self, Read}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use bytes::BytesMut; -use futures::{ready, Sink, Stream}; -use log::trace; -use tokio_codec::{Decoder, Encoder}; -use tokio_io::{AsyncRead, AsyncWrite}; - -use super::framed::Fuse; - -/// A `Sink` of frames encoded to an `AsyncWrite`. -pub struct FramedWrite { - inner: FramedWrite2>, -} - -pub struct FramedWrite2 { - inner: T, - buffer: BytesMut, - low_watermark: usize, - high_watermark: usize, -} - -impl FramedWrite -where - T: AsyncWrite, - E: Encoder, -{ - /// Creates a new `FramedWrite` with the given `encoder`. - pub fn new(inner: T, encoder: E, lw: usize, hw: usize) -> FramedWrite { - FramedWrite { - inner: framed_write2(Fuse(inner, encoder), lw, hw), - } - } -} - -impl FramedWrite { - /// Returns a reference to the underlying I/O stream wrapped by - /// `FramedWrite`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_ref(&self) -> &T { - &self.inner.inner.0 - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `FramedWrite`. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner.inner.0 - } - - /// Consumes the `FramedWrite`, returning its underlying I/O stream. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn into_inner(self) -> T { - self.inner.inner.0 - } - - /// Returns a reference to the underlying decoder. - pub fn encoder(&self) -> &E { - &self.inner.inner.1 - } - - /// Returns a mutable reference to the underlying decoder. - pub fn encoder_mut(&mut self) -> &mut E { - &mut self.inner.inner.1 - } - - /// Check if write buffer is full - pub fn is_full(&self) -> bool { - self.inner.is_full() - } - - /// Check if write buffer is empty. - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } -} - -impl FramedWrite -where - E: Encoder, -{ - /// Force send item - pub fn force_send(&mut self, item: E::Item) -> Result<(), E::Error> { - self.inner.force_send(item) - } -} - -impl Sink for FramedWrite -where - T: AsyncWrite, - E: Encoder, -{ - type Error = E::Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx) } - } - - fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item) } - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx) } - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx) } - } -} - -impl Stream for FramedWrite -where - T: Stream, -{ - type Item = T::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - self.map_unchecked_mut(|s| &mut s.inner.inner.0) - .poll_next(cx) - } - } -} - -impl fmt::Debug for FramedWrite -where - T: fmt::Debug, - U: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("FramedWrite") - .field("inner", &self.inner.get_ref().0) - .field("encoder", &self.inner.get_ref().1) - .field("buffer", &self.inner.buffer) - .finish() - } -} - -// ===== impl FramedWrite2 ===== - -pub fn framed_write2( - inner: T, - low_watermark: usize, - high_watermark: usize, -) -> FramedWrite2 { - FramedWrite2 { - inner, - low_watermark, - high_watermark, - buffer: BytesMut::with_capacity(high_watermark), - } -} - -pub fn framed_write2_with_buffer( - inner: T, - mut buffer: BytesMut, - low_watermark: usize, - high_watermark: usize, -) -> FramedWrite2 { - if buffer.capacity() < high_watermark { - let bytes_to_reserve = high_watermark - buffer.capacity(); - buffer.reserve(bytes_to_reserve); - } - FramedWrite2 { - inner, - buffer, - low_watermark, - high_watermark, - } -} - -impl FramedWrite2 { - pub fn get_ref(&self) -> &T { - &self.inner - } - - pub fn into_inner(self) -> T { - self.inner - } - - pub fn into_parts(self) -> (T, BytesMut, usize, usize) { - ( - self.inner, - self.buffer, - self.low_watermark, - self.high_watermark, - ) - } - - pub fn get_mut(&mut self) -> &mut T { - &mut self.inner - } - - pub fn is_full(&self) -> bool { - self.buffer.len() >= self.high_watermark - } - - pub fn is_empty(&self) -> bool { - self.buffer.is_empty() - } -} - -impl FramedWrite2 -where - T: Encoder, -{ - pub fn force_send(&mut self, item: T::Item) -> Result<(), T::Error> { - let len = self.buffer.len(); - if len < self.low_watermark { - self.buffer.reserve(self.high_watermark - len) - } - self.inner.encode(item, &mut self.buffer)?; - Ok(()) - } -} - -impl Sink for FramedWrite2 -where - T: AsyncWrite + Encoder, -{ - type Error = T::Error; - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - let len = self.buffer.len(); - if len >= self.high_watermark { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { - let this = unsafe { self.get_unchecked_mut() }; - - // Check the buffer capacity - let len = this.buffer.len(); - if len < this.low_watermark { - this.buffer.reserve(this.high_watermark - len) - } - - this.inner.encode(item, &mut this.buffer)?; - - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = unsafe { self.get_unchecked_mut() }; - trace!("flushing framed transport"); - - while !this.buffer.is_empty() { - trace!("writing; remaining={}", this.buffer.len()); - - let n = ready!( - unsafe { Pin::new_unchecked(&mut this.inner) }.poll_write(cx, &this.buffer) - )?; - - if n == 0 { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to \ - write frame to transport", - ) - .into())); - } - - // TODO: Add a way to `bytes` to do this w/o returning the drained - // data. - let _ = this.buffer.split_to(n); - } - - // Try flushing the underlying IO - ready!(unsafe { Pin::new_unchecked(&mut this.inner) }.poll_flush(cx))?; - - trace!("framed transport flushed"); - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = unsafe { self.get_unchecked_mut() }; - ready!( - unsafe { Pin::new_unchecked(&mut this).map_unchecked_mut(|s| *s) }.poll_flush(cx) - )?; - ready!(unsafe { Pin::new_unchecked(&mut this.inner) }.poll_shutdown(cx))?; - - Poll::Ready(Ok(())) - } -} - -impl Decoder for FramedWrite2 { - type Item = T::Item; - type Error = T::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, T::Error> { - self.inner.decode(src) - } - - fn decode_eof(&mut self, src: &mut BytesMut) -> Result, T::Error> { - self.inner.decode_eof(src) - } -} - -impl Read for FramedWrite2 { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.inner.read(dst) - } -} - -impl AsyncRead for FramedWrite2 { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf) } - } -} diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 77102a8c..4ea33aef 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -12,13 +12,9 @@ mod bcodec; mod framed; -mod framed_read; -mod framed_write; pub use self::bcodec::BytesCodec; pub use self::framed::{Framed, FramedParts}; -pub use self::framed_read::FramedRead; -pub use self::framed_write::FramedWrite; pub use tokio_codec::{Decoder, Encoder}; pub use tokio_io::{AsyncRead, AsyncWrite}; diff --git a/actix-connect/src/service.rs b/actix-connect/src/service.rs index 917c849d..65fa6d53 100644 --- a/actix-connect/src/service.rs +++ b/actix-connect/src/service.rs @@ -18,7 +18,7 @@ pub struct ConnectServiceFactory { resolver: ResolverFactory, } -impl ConnectServiceFactory { +impl ConnectServiceFactory { /// Construct new ConnectService factory pub fn new() -> Self { ConnectServiceFactory { @@ -70,7 +70,7 @@ impl Clone for ConnectServiceFactory { } } -impl ServiceFactory for ConnectServiceFactory { +impl ServiceFactory for ConnectServiceFactory { type Request = Connect; type Response = Connection; type Error = ConnectError; @@ -90,7 +90,7 @@ pub struct ConnectService { resolver: Resolver, } -impl Service for ConnectService { +impl Service for ConnectService { type Request = Connect; type Response = Connection; type Error = ConnectError; @@ -108,12 +108,12 @@ impl Service for ConnectService { } } -enum ConnectState { +enum ConnectState { Resolve( as Service>::Future), Connect( as Service>::Future), } -impl ConnectState { +impl ConnectState { fn poll( &mut self, cx: &mut Context, @@ -129,12 +129,12 @@ impl ConnectState { } } -pub struct ConnectServiceResponse { +pub struct ConnectServiceResponse { state: ConnectState, tcp: TcpConnector, } -impl Future for ConnectServiceResponse { +impl Future for ConnectServiceResponse { type Output = Result, ConnectError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -159,7 +159,7 @@ pub struct TcpConnectService { resolver: Resolver, } -impl Service for TcpConnectService { +impl Service for TcpConnectService { type Request = Connect; type Response = TcpStream; type Error = ConnectError; @@ -177,12 +177,12 @@ impl Service for TcpConnectService { } } -enum TcpConnectState { +enum TcpConnectState { Resolve( as Service>::Future), Connect( as Service>::Future), } -impl TcpConnectState { +impl TcpConnectState { fn poll( &mut self, cx: &mut Context, @@ -206,12 +206,12 @@ impl TcpConnectState { } } -pub struct TcpConnectServiceResponse { +pub struct TcpConnectServiceResponse { state: TcpConnectState, tcp: TcpConnector, } -impl Future for TcpConnectServiceResponse { +impl Future for TcpConnectServiceResponse { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index 072e8677..6cbe3d62 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -33,7 +33,7 @@ impl OpensslConnector { impl OpensslConnector where - T: Address + Unpin + 'static, + T: Address + 'static, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { pub fn service(connector: SslConnector) -> OpensslConnectorService { @@ -154,7 +154,7 @@ pub struct OpensslConnectServiceFactory { openssl: OpensslConnector, } -impl OpensslConnectServiceFactory { +impl OpensslConnectServiceFactory { /// Construct new OpensslConnectService factory pub fn new(connector: SslConnector) -> Self { OpensslConnectServiceFactory { diff --git a/actix-connect/src/ssl/rustls.rs b/actix-connect/src/ssl/rustls.rs index b5593420..ef6c8278 100644 --- a/actix-connect/src/ssl/rustls.rs +++ b/actix-connect/src/ssl/rustls.rs @@ -30,7 +30,7 @@ impl RustlsConnector { impl RustlsConnector where - T: Address + Unpin, + T: Address, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { pub fn service(connector: Arc) -> RustlsConnectorService { @@ -50,7 +50,7 @@ impl Clone for RustlsConnector { } } -impl ServiceFactory for RustlsConnector +impl ServiceFactory for RustlsConnector where U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { @@ -84,7 +84,7 @@ impl Clone for RustlsConnectorService { } } -impl Service for RustlsConnectorService +impl Service for RustlsConnectorService where U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { @@ -114,7 +114,7 @@ pub struct ConnectAsyncExt { stream: Option>, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 7c93fe26..df86ad13 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -24,6 +24,7 @@ actix-utils = "0.5.0-alpha.1" bytes = "0.4" either = "1.5.2" futures = "0.3.1" +pin-project = "0.4.5" tokio-executor = "=0.2.0-alpha.6" log = "0.4" diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs index 0f445b39..60dd1869 100644 --- a/actix-ioframe/src/connect.rs +++ b/actix-ioframe/src/connect.rs @@ -16,7 +16,7 @@ pub struct Connect { impl Connect where - Io: AsyncRead + AsyncWrite + Unpin, + Io: AsyncRead + AsyncWrite, { pub(crate) fn new(io: Io) -> Self { Self { @@ -27,7 +27,7 @@ where pub fn codec(self, codec: Codec) -> ConnectResult where - Codec: Encoder + Decoder + Unpin, + Codec: Encoder + Decoder, { let (tx, rx) = mpsc::channel(); let sink = Sink::new(tx); @@ -41,6 +41,7 @@ where } } +#[pin_project::pin_project] pub struct ConnectResult { pub(crate) state: St, pub(crate) framed: Framed, @@ -75,41 +76,38 @@ impl ConnectResult { } } -impl Unpin for ConnectResult -where - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, -{ -} - impl Stream for ConnectResult where - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, { type Item = Result<::Item, ::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.get_mut().framed.next_item(cx) + self.project().framed.next_item(cx) } } impl futures::Sink<::Item> for ConnectResult where - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, { type Error = ::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().framed.is_ready(cx) + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + if self.framed.is_ready() { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } fn start_send( self: Pin<&mut Self>, item: ::Item, ) -> Result<(), Self::Error> { - self.get_mut().framed.write(item) + self.project().framed.write(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index a3d8158e..60313f6e 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -30,13 +30,14 @@ pub(crate) enum FramedMessage { /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. +#[pin_project::pin_project] pub(crate) struct FramedDispatcher where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Encoder + Decoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -55,8 +56,8 @@ where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -84,7 +85,7 @@ where } } -enum FramedState { +enum FramedState { Processing, Error(ServiceError), FramedError(ServiceError), @@ -92,7 +93,7 @@ enum FramedState { Stopping, } -impl FramedState { +impl FramedState { fn stop(&mut self, tx: Option>) { match self { FramedState::FlushAndStop(ref mut vec) => { @@ -121,25 +122,13 @@ struct FramedDispatcherInner { task: LocalWaker, } -impl Unpin for FramedDispatcher -where - S: Service, Response = Option>> + Unpin, - S::Error: Unpin + 'static, - S::Future: Unpin + 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, - ::Item: 'static, - ::Error: std::fmt::Debug, -{ -} - impl FramedDispatcher where - S: Service, Response = Option>> + Unpin, + S: Service, Response = Option>>, S::Error: 'static, - S::Future: Unpin + 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -179,8 +168,8 @@ where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -267,8 +256,8 @@ where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -328,8 +317,8 @@ where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs index 24b1d6a0..0700f8b3 100644 --- a/actix-ioframe/src/service.rs +++ b/actix-ioframe/src/service.rs @@ -8,6 +8,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use either::Either; use futures::future::{FutureExt, LocalBoxFuture}; +use pin_project::project; use crate::connect::{Connect, ConnectResult}; use crate::dispatcher::FramedDispatcher; @@ -31,9 +32,9 @@ impl Builder { pub fn service(self, connect: F) -> ServiceBuilder where F: IntoService, - Io: AsyncRead + AsyncWrite + Unpin, + Io: AsyncRead + AsyncWrite, C: Service, Response = ConnectResult>, - Codec: Decoder + Encoder + Unpin, + Codec: Decoder + Encoder, { ServiceBuilder { connect: connect.into_service(), @@ -46,7 +47,7 @@ impl Builder { pub fn factory(self, connect: F) -> NewServiceBuilder where F: IntoServiceFactory, - Io: AsyncRead + AsyncWrite + Unpin, + Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, @@ -54,7 +55,7 @@ impl Builder { >, C::Error: 'static, C::Future: 'static, - Codec: Decoder + Encoder + Unpin, + Codec: Decoder + Encoder, { NewServiceBuilder { connect: connect.into_factory(), @@ -73,11 +74,10 @@ pub struct ServiceBuilder { impl ServiceBuilder where St: 'static, - Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, C::Error: 'static, - C::Future: Unpin, - Codec: Decoder + Encoder + Unpin, + Io: AsyncRead + AsyncWrite, + Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -103,9 +103,6 @@ where Error = C::Error, InitError = C::Error, > + 'static, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, { FramedServiceImpl { connect: self.connect, @@ -125,16 +122,15 @@ pub struct NewServiceBuilder { impl NewServiceBuilder where St: 'static, - Io: AsyncRead + AsyncWrite + Unpin, + Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, C::Error: 'static, - C::Future: Unpin + 'static, - ::Future: Unpin, - Codec: Decoder + Encoder + Unpin, + C::Future: 'static, + Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -159,9 +155,6 @@ where Error = C::Error, InitError = C::Error, > + 'static, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, { FramedService { connect: self.connect, @@ -182,15 +175,14 @@ pub struct FramedService { impl ServiceFactory for FramedService where St: 'static, - Io: AsyncRead + AsyncWrite + Unpin, + Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, C::Error: 'static, - C::Future: Unpin + 'static, - ::Future: Unpin, + C::Future: 'static, T: ServiceFactory< Config = St, Request = RequestItem, @@ -198,10 +190,8 @@ where Error = C::Error, InitError = C::Error, > + 'static, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, - Codec: Decoder + Encoder + Unpin, + ::Future: 'static, + Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -241,10 +231,9 @@ pub struct FramedServiceImpl { impl Service for FramedServiceImpl where - Io: AsyncRead + AsyncWrite + Unpin, + Io: AsyncRead + AsyncWrite, C: Service, Response = ConnectResult>, C::Error: 'static, - C::Future: Unpin, T: ServiceFactory< Config = St, Request = RequestItem, @@ -252,10 +241,8 @@ where Error = C::Error, InitError = C::Error, >, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, - Codec: Decoder + Encoder + Unpin, + ::Future: 'static, + Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -279,11 +266,11 @@ where } } +#[pin_project::pin_project] pub struct FramedServiceImplResponse where C: Service, Response = ConnectResult>, C::Error: 'static, - C::Future: Unpin, T: ServiceFactory< Config = St, Request = RequestItem, @@ -291,43 +278,19 @@ where Error = C::Error, InitError = C::Error, >, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, + ::Future: 'static, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, { + #[pin] inner: FramedServiceImplResponseInner, } -impl Unpin for FramedServiceImplResponse -where - C: Service, Response = ConnectResult>, - C::Future: Unpin, - C::Error: 'static, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, - ::Item: 'static, - ::Error: std::fmt::Debug, -{ -} - impl Future for FramedServiceImplResponse where C: Service, Response = ConnectResult>, - C::Future: Unpin, C::Error: 'static, T: ServiceFactory< Config = St, @@ -336,32 +299,33 @@ where Error = C::Error, InitError = C::Error, >, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, + ::Future: 'static, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, { type Output = Result<(), ServiceError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = self.get_mut(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut this = self.as_mut().project(); loop { match this.inner.poll(cx) { - Either::Left(new) => this.inner = new, + Either::Left(new) => { + this = self.as_mut().project(); + this.inner.set(new) + } Either::Right(poll) => return poll, }; } } } +#[pin_project::pin_project] enum FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, - C::Future: Unpin, C::Error: 'static, T: ServiceFactory< Config = St, @@ -370,27 +334,24 @@ where Error = C::Error, InitError = C::Error, >, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, + ::Future: 'static, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, { - Connect(C::Future, Rc, Option>), + Connect(#[pin] C::Future, Rc, Option>), Handler( - T::Future, + #[pin] T::Future, Option>, Option>, ), - Dispatcher(FramedDispatcher), + Dispatcher(#[pin] FramedDispatcher), } impl FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, - C::Future: Unpin, C::Error: 'static, T: ServiceFactory< Config = St, @@ -399,40 +360,37 @@ where Error = C::Error, InitError = C::Error, >, - T::Future: Unpin, - T::Service: Unpin, - ::Future: Unpin + 'static, - Io: AsyncRead + AsyncWrite + Unpin, - Codec: Encoder + Decoder + Unpin, + ::Future: 'static, + Io: AsyncRead + AsyncWrite, + Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, { + #[project] fn poll( - &mut self, + self: Pin<&mut Self>, cx: &mut Context, ) -> Either< FramedServiceImplResponseInner, Poll>>, > { - match self { - FramedServiceImplResponseInner::Connect( - ref mut fut, - ref handler, - ref mut disconnect, - ) => match Pin::new(fut).poll(cx) { - Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler( - handler.new_service(&res.state), - Some(res), - disconnect.take(), - )), - Poll::Pending => Either::Right(Poll::Pending), - Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), - }, - FramedServiceImplResponseInner::Handler( - ref mut fut, - ref mut res, - ref mut disconnect, - ) => match Pin::new(fut).poll(cx) { + #[project] + match self.project() { + FramedServiceImplResponseInner::Connect(fut, handler, disconnect) => { + match fut.poll(cx) { + Poll::Ready(Ok(res)) => { + Either::Left(FramedServiceImplResponseInner::Handler( + handler.new_service(&res.state), + Some(res), + disconnect.take(), + )) + } + Poll::Pending => Either::Right(Poll::Pending), + Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), + } + } + FramedServiceImplResponseInner::Handler(fut, res, disconnect) => match fut.poll(cx) + { Poll::Ready(Ok(handler)) => { let res = res.take().unwrap(); Either::Left(FramedServiceImplResponseInner::Dispatcher( diff --git a/actix-server/src/ssl/openssl.rs b/actix-server/src/ssl/openssl.rs index 24a50294..5cbe988b 100644 --- a/actix-server/src/ssl/openssl.rs +++ b/actix-server/src/ssl/openssl.rs @@ -40,9 +40,7 @@ impl Clone for OpensslAcceptor { } } -impl ServiceFactory - for OpensslAcceptor -{ +impl ServiceFactory for OpensslAcceptor { type Request = Io; type Response = Io, P>; type Error = HandshakeError; @@ -70,9 +68,7 @@ pub struct OpensslAcceptorService { io: PhantomData<(T, P)>, } -impl Service - for OpensslAcceptorService -{ +impl Service for OpensslAcceptorService { type Request = Io; type Response = Io, P>; type Error = HandshakeError; @@ -103,7 +99,6 @@ impl Service pub struct OpensslAcceptorServiceFut where - P: Unpin, T: AsyncRead + AsyncWrite, { fut: LocalBoxFuture<'static, Result, HandshakeError>>, @@ -111,7 +106,9 @@ where _guard: CounterGuard, } -impl Future for OpensslAcceptorServiceFut { +impl Unpin for OpensslAcceptorServiceFut {} + +impl Future for OpensslAcceptorServiceFut { type Output = Result, P>, HandshakeError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/ssl/rustls.rs b/actix-server/src/ssl/rustls.rs index 12ed1b83..e822e9a0 100644 --- a/actix-server/src/ssl/rustls.rs +++ b/actix-server/src/ssl/rustls.rs @@ -42,7 +42,7 @@ impl Clone for RustlsAcceptor { } } -impl ServiceFactory for RustlsAcceptor { +impl ServiceFactory for RustlsAcceptor { type Request = Io; type Response = Io, P>; type Error = io::Error; @@ -71,7 +71,7 @@ pub struct RustlsAcceptorService { conns: Counter, } -impl Service for RustlsAcceptorService { +impl Service for RustlsAcceptorService { type Request = Io; type Response = Io, P>; type Error = io::Error; @@ -98,14 +98,15 @@ impl Service for RustlsAcceptorServ pub struct RustlsAcceptorServiceFut where T: AsyncRead + AsyncWrite + Unpin, - P: Unpin, { fut: Accept, params: Option

, _guard: CounterGuard, } -impl Future for RustlsAcceptorServiceFut { +impl Unpin for RustlsAcceptorServiceFut {} + +impl Future for RustlsAcceptorServiceFut { type Output = Result, P>, io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 5fa51787..ad399205 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -24,7 +24,8 @@ path = "src/lib.rs" [dependencies] futures = "0.3.1" +pin-project = "0.4.5" [dev-dependencies] -tokio = "0.2.0-alpha.5" +tokio = "0.2.0-alpha.6" actix-rt = "0.2" diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 68260129..d2be3012 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -41,8 +41,6 @@ impl Service for AndThenService where A: Service, B: Service, - A::Future: Unpin, - B::Future: Unpin, { type Request = A::Request; type Response = B::Response; @@ -63,13 +61,16 @@ where } } +#[pin_project::pin_project] pub struct AndThenServiceResponse where A: Service, B: Service, { b: Cell, + #[pin] fut_b: Option, + #[pin] fut_a: Option, } @@ -77,8 +78,6 @@ impl AndThenServiceResponse where A: Service, B: Service, - A::Future: Unpin, - B::Future: Unpin, { fn new(a: A::Future, b: Cell) -> Self { AndThenServiceResponse { @@ -93,23 +92,27 @@ impl Future for AndThenServiceResponse where A: Service, B: Service, - A::Future: Unpin, - B::Future: Unpin, { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.get_mut(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); loop { - if let Some(ref mut fut) = this.fut_b { - return Pin::new(fut).poll(cx); + if let Some(fut) = this.fut_b.as_pin_mut() { + return fut.poll(cx); } - match Pin::new(&mut this.fut_a.as_mut().expect("Bug in actix-service")).poll(cx) { + match this + .fut_a + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx) + { Poll::Ready(Ok(resp)) => { - let _ = this.fut_a.take(); - this.fut_b = Some(this.b.get_mut().call(resp)); + this = self.as_mut().project(); + this.fut_a.set(None); + this.fut_b.set(Some(this.b.get_mut().call(resp))); } Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => return Poll::Pending, @@ -153,10 +156,6 @@ where Error = A::Error, InitError = A::InitError, >, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, { type Request = A::Request; type Response = B::Response; @@ -185,12 +184,15 @@ where } } +#[pin_project::pin_project] pub struct AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, a: Option, @@ -201,10 +203,6 @@ impl AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, { fn new(fut_a: A::Future, fut_b: B::Future) -> Self { AndThenServiceFactoryResponse { @@ -216,39 +214,24 @@ where } } -impl Unpin for AndThenServiceFactoryResponse -where - A: ServiceFactory, - B: ServiceFactory, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, -{ -} - impl Future for AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let this = self.project(); if this.a.is_none() { - if let Poll::Ready(service) = Pin::new(&mut this.fut_a).poll(cx)? { - this.a = Some(service); + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } if this.b.is_none() { - if let Poll::Ready(service) = Pin::new(&mut this.fut_b).poll(cx)? { - this.b = Some(service); + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } if this.a.is_some() && this.b.is_some() { diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index e78af397..3eefae27 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -23,8 +23,7 @@ pub fn apply_fn_factory( ) -> ApplyServiceFactory where T: ServiceFactory, - T::Future: Unpin, - F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, + F: FnMut(In, &mut T::Service) -> R + Clone, R: Future>, U: IntoServiceFactory, { @@ -106,8 +105,7 @@ where impl ServiceFactory for ApplyServiceFactory where T: ServiceFactory, - T::Future: Unpin, - F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, + F: FnMut(In, &mut T::Service) -> R + Clone, R: Future>, { type Request = In; @@ -124,12 +122,14 @@ where } } +#[pin_project::pin_project] pub struct ApplyServiceFactoryResponse where T: ServiceFactory, F: FnMut(In, &mut T::Service) -> R + Clone, R: Future>, { + #[pin] fut: T::Future, f: Option, r: PhantomData<(In, Out)>, @@ -150,28 +150,18 @@ where } } -impl Unpin for ApplyServiceFactoryResponse -where - T: ServiceFactory, - T::Future: Unpin, - F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, - R: Future>, -{ -} - impl Future for ApplyServiceFactoryResponse where T: ServiceFactory, - T::Future: Unpin, - F: FnMut(In, &mut T::Service) -> R + Unpin + Clone, + F: FnMut(In, &mut T::Service) -> R + Clone, R: Future>, { type Output = Result, T::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let this = self.project(); - if let Poll::Ready(svc) = Pin::new(&mut this.fut).poll(cx)? { + if let Poll::Ready(svc) = this.fut.poll(cx)? { Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap()))) } else { Poll::Pending diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index 312dec23..7e461567 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -11,7 +11,7 @@ pub fn apply_cfg(srv: T, f: F) -> ApplyConfigService R, T: Service, - R: Future> + Unpin, + R: Future>, S: Service, { ApplyConfigService { @@ -75,8 +75,7 @@ impl ServiceFactory for ApplyConfigService where F: FnMut(&C, &mut T) -> R, T: Service, - T::Future: Unpin, - R: Future> + Unpin, + R: Future>, S: Service, { type Config = C; @@ -86,41 +85,10 @@ where type Service = S; type InitError = E; - type Future = ApplyConfigServiceResponse; + type Future = R; fn new_service(&self, cfg: &C) -> Self::Future { - ApplyConfigServiceResponse { - fut: unsafe { (self.f.get_mut_unsafe())(cfg, self.srv.get_mut_unsafe()) }, - _t: PhantomData, - } - } -} - -pub struct ApplyConfigServiceResponse -where - R: Future>, - S: Service, -{ - fut: R, - _t: PhantomData<(S,)>, -} - -impl Unpin for ApplyConfigServiceResponse -where - R: Future> + Unpin, - S: Service, -{ -} - -impl Future for ApplyConfigServiceResponse -where - R: Future> + Unpin, - S: Service, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.get_mut().fut).poll(cx) + unsafe { (self.f.get_mut_unsafe())(cfg, self.srv.get_mut_unsafe()) } } } @@ -160,9 +128,8 @@ where C: Clone, F: FnMut(&C, &mut T::Service) -> R, T: ServiceFactory, - T::Future: Unpin, T::InitError: From, - R: Future> + Unpin, + R: Future>, S: Service, { type Config = C; @@ -186,6 +153,7 @@ where } } +#[pin_project::pin_project] pub struct ApplyConfigServiceFactoryResponse where C: Clone, @@ -198,56 +166,48 @@ where cfg: C, f: Cell, srv: Option, + #[pin] srv_fut: Option, + #[pin] fut: Option, _t: PhantomData<(S,)>, } -impl Unpin for ApplyConfigServiceFactoryResponse -where - C: Clone, - F: FnMut(&C, &mut T::Service) -> R, - T: ServiceFactory, - T::Future: Unpin, - T::InitError: From, - R: Future> + Unpin, - S: Service, -{ -} - impl Future for ApplyConfigServiceFactoryResponse where C: Clone, F: FnMut(&C, &mut T::Service) -> R, T: ServiceFactory, - T::Future: Unpin, T::InitError: From, - R: Future> + Unpin, + R: Future>, S: Service, { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); loop { - if let Some(ref mut fut) = this.srv_fut { - match Pin::new(fut).poll(cx)? { + if let Some(fut) = this.srv_fut.as_pin_mut() { + match fut.poll(cx)? { Poll::Pending => return Poll::Pending, Poll::Ready(srv) => { - let _ = this.srv_fut.take(); - this.srv = Some(srv); + this = self.as_mut().project(); + this.srv_fut.set(None); + *this.srv = Some(srv); continue; } } } - if let Some(ref mut fut) = this.fut { - return Pin::new(fut).poll(cx); - } else if let Some(ref mut srv) = this.srv { + if let Some(fut) = this.fut.as_pin_mut() { + return fut.poll(cx); + } else if let Some(srv) = this.srv { match srv.poll_ready(cx)? { Poll::Ready(_) => { - this.fut = Some(this.f.get_mut()(&this.cfg, srv)); + let fut = this.f.get_mut()(&this.cfg, srv); + this = self.as_mut().project(); + this.fut.set(Some(fut)); continue; } Poll::Pending => return Poll::Pending, diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 693d7f48..7737b77b 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -1,6 +1,5 @@ use std::future::Future; use std::marker::PhantomData; -use std::pin::Pin; use std::task::{Context, Poll}; use futures::future::{ok, Ready}; @@ -188,7 +187,7 @@ where impl ServiceFactory for FnServiceConfig where F: Fn(&Cfg) -> Fut, - Fut: Future> + Unpin, + Fut: Future>, Srv: Service, { type Request = Srv::Request; @@ -198,41 +197,10 @@ where type Config = Cfg; type Service = Srv; type InitError = Err; - type Future = NewServiceFnConfigFut; + type Future = Fut; fn new_service(&self, cfg: &Cfg) -> Self::Future { - NewServiceFnConfigFut { - fut: (self.f)(cfg), - _t: PhantomData, - } - } -} - -pub struct NewServiceFnConfigFut -where - R: Future> + Unpin, - S: Service, -{ - fut: R, - _t: PhantomData<(S,)>, -} - -impl Unpin for NewServiceFnConfigFut -where - R: Future> + Unpin, - S: Service, -{ -} - -impl Future for NewServiceFnConfigFut -where - R: Future> + Unpin, - S: Service, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.get_mut().fut).poll(cx) + (self.f)(cfg) } } diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index c0645c78..84410534 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -73,7 +73,7 @@ pub trait Service { fn map(self, f: F) -> crate::dev::Map where Self: Sized, - F: FnMut(Self::Response) -> R + Unpin, + F: FnMut(Self::Response) -> R, { crate::dev::Map::new(self, f) } @@ -138,7 +138,7 @@ pub trait ServiceFactory { fn map(self, f: F) -> crate::map::MapServiceFactory where Self: Sized, - F: FnMut(Self::Response) -> R + Unpin + Clone, + F: FnMut(Self::Response) -> R + Clone, { crate::map::MapServiceFactory::new(self, f) } @@ -147,7 +147,7 @@ pub trait ServiceFactory { fn map_err(self, f: F) -> crate::map_err::MapErrServiceFactory where Self: Sized, - F: Fn(Self::Error) -> E + Unpin + Clone, + F: Fn(Self::Error) -> E + Clone, { crate::map_err::MapErrServiceFactory::new(self, f) } @@ -156,7 +156,7 @@ pub trait ServiceFactory { fn map_init_err(self, f: F) -> crate::map_init_err::MapInitErr where Self: Sized, - F: Fn(Self::InitError) -> E + Unpin + Clone, + F: Fn(Self::InitError) -> E + Clone, { crate::map_init_err::MapInitErr::new(self, f) } diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index 29f3b301..6e533c84 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -46,8 +46,7 @@ where impl Service for Map where A: Service, - A::Future: Unpin, - F: FnMut(A::Response) -> Response + Unpin + Clone, + F: FnMut(A::Response) -> Response + Clone, { type Request = A::Request; type Response = Response; @@ -63,12 +62,14 @@ where } } +#[pin_project::pin_project] pub struct MapFuture where A: Service, F: FnMut(A::Response) -> Response, { f: F, + #[pin] fut: A::Future, } @@ -85,15 +86,14 @@ where impl Future for MapFuture where A: Service, - A::Future: Unpin, - F: FnMut(A::Response) -> Response + Unpin, + F: FnMut(A::Response) -> Response, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let this = self.project(); - match Pin::new(&mut this.fut).poll(cx) { + match this.fut.poll(cx) { Poll::Ready(Ok(resp)) => Poll::Ready(Ok((this.f)(resp))), Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Pending => Poll::Pending, @@ -113,7 +113,7 @@ impl MapServiceFactory { pub(crate) fn new(a: A, f: F) -> Self where A: ServiceFactory, - F: FnMut(A::Response) -> Res + Unpin, + F: FnMut(A::Response) -> Res, { Self { a, @@ -140,9 +140,7 @@ where impl ServiceFactory for MapServiceFactory where A: ServiceFactory, - A::Future: Unpin, - ::Future: Unpin, - F: FnMut(A::Response) -> Res + Unpin + Clone, + F: FnMut(A::Response) -> Res + Clone, { type Request = A::Request; type Response = Res; @@ -158,11 +156,13 @@ where } } +#[pin_project::pin_project] pub struct MapServiceFuture where A: ServiceFactory, F: FnMut(A::Response) -> Res, { + #[pin] fut: A::Future, f: Option, } @@ -170,7 +170,7 @@ where impl MapServiceFuture where A: ServiceFactory, - F: FnMut(A::Response) -> Res + Unpin, + F: FnMut(A::Response) -> Res, { fn new(fut: A::Future, f: F) -> Self { MapServiceFuture { f: Some(f), fut } @@ -180,15 +180,14 @@ where impl Future for MapServiceFuture where A: ServiceFactory, - A::Future: Unpin, - F: FnMut(A::Response) -> Res + Unpin, + F: FnMut(A::Response) -> Res, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let this = self.project(); - if let Poll::Ready(svc) = Pin::new(&mut this.fut).poll(cx)? { + if let Poll::Ready(svc) = this.fut.poll(cx)? { Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap()))) } else { Poll::Pending diff --git a/actix-service/src/map_err.rs b/actix-service/src/map_err.rs index b1cf0ae9..1b858d80 100644 --- a/actix-service/src/map_err.rs +++ b/actix-service/src/map_err.rs @@ -47,8 +47,7 @@ where impl Service for MapErr where A: Service, - A::Future: Unpin, - F: Fn(A::Error) -> E + Unpin + Clone, + F: Fn(A::Error) -> E + Clone, { type Request = A::Request; type Response = A::Response; @@ -64,21 +63,21 @@ where } } +#[pin_project::pin_project] pub struct MapErrFuture where A: Service, - A::Future: Unpin, - F: Fn(A::Error) -> E + Unpin, + F: Fn(A::Error) -> E, { f: F, + #[pin] fut: A::Future, } impl MapErrFuture where A: Service, - A::Future: Unpin, - F: Fn(A::Error) -> E + Unpin, + F: Fn(A::Error) -> E, { fn new(fut: A::Future, f: F) -> Self { MapErrFuture { f, fut } @@ -88,14 +87,13 @@ where impl Future for MapErrFuture where A: Service, - A::Future: Unpin, - F: Fn(A::Error) -> E + Unpin, + F: Fn(A::Error) -> E, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - Pin::new(&mut this.fut).poll(cx).map_err(&this.f) + let this = self.project(); + this.fut.poll(cx).map_err(this.f) } } @@ -145,9 +143,7 @@ where impl ServiceFactory for MapErrServiceFactory where A: ServiceFactory, - A::Future: Unpin, - ::Future: Unpin, - F: Fn(A::Error) -> E + Unpin + Clone, + F: Fn(A::Error) -> E + Clone, { type Request = A::Request; type Response = A::Response; @@ -163,11 +159,13 @@ where } } +#[pin_project::pin_project] pub struct MapErrServiceFuture where A: ServiceFactory, F: Fn(A::Error) -> E, { + #[pin] fut: A::Future, f: F, } @@ -185,14 +183,13 @@ where impl Future for MapErrServiceFuture where A: ServiceFactory, - A::Future: Unpin, - F: Fn(A::Error) -> E + Unpin + Clone, + F: Fn(A::Error) -> E + Clone, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - if let Poll::Ready(svc) = Pin::new(&mut this.fut).poll(cx)? { + let this = self.project(); + if let Poll::Ready(svc) = this.fut.poll(cx)? { Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) } else { Poll::Pending diff --git a/actix-service/src/map_init_err.rs b/actix-service/src/map_init_err.rs index 2ea96cbf..04c89ed8 100644 --- a/actix-service/src/map_init_err.rs +++ b/actix-service/src/map_init_err.rs @@ -44,8 +44,7 @@ where impl ServiceFactory for MapInitErr where A: ServiceFactory, - A::Future: Unpin, - F: Fn(A::InitError) -> E + Unpin + Clone, + F: Fn(A::InitError) -> E + Clone, { type Request = A::Request; type Response = A::Response; @@ -61,12 +60,14 @@ where } } +#[pin_project::pin_project] pub struct MapInitErrFuture where A: ServiceFactory, F: Fn(A::InitError) -> E, { f: F, + #[pin] fut: A::Future, } @@ -83,13 +84,12 @@ where impl Future for MapInitErrFuture where A: ServiceFactory, - A::Future: Unpin, - F: Fn(A::InitError) -> E + Unpin, + F: Fn(A::InitError) -> E, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - Pin::new(&mut this.fut).poll(cx).map_err(&this.f) + let this = self.project(); + this.fut.poll(cx).map_err(this.f) } } diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index 190eb5c5..9e5e7917 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -41,8 +41,6 @@ impl Service for ThenService where A: Service, B: Service, Error = A::Error>, - A::Future: Unpin, - B::Future: Unpin, { type Request = A::Request; type Response = B::Response; @@ -63,13 +61,16 @@ where } } +#[pin_project::pin_project] pub struct ThenServiceResponse where A: Service, B: Service>, { b: Cell, + #[pin] fut_b: Option, + #[pin] fut_a: Option, } @@ -77,8 +78,6 @@ impl ThenServiceResponse where A: Service, B: Service>, - A::Future: Unpin, - B::Future: Unpin, { fn new(a: A::Future, b: Cell) -> Self { ThenServiceResponse { @@ -93,22 +92,26 @@ impl Future for ThenServiceResponse where A: Service, B: Service>, - A::Future: Unpin, - B::Future: Unpin, { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); loop { - if let Some(ref mut fut) = this.fut_b { - return Pin::new(fut).poll(cx); + if let Some(fut) = this.fut_b.as_pin_mut() { + return fut.poll(cx); } - match Pin::new(this.fut_a.as_mut().expect("Bug in actix-service")).poll(cx) { + match this + .fut_a + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx) + { Poll::Ready(r) => { - this.fut_b = Some(this.b.get_mut().call(r)); + this = self.as_mut().project(); + this.fut_b.set(Some(this.b.get_mut().call(r))); } Poll::Pending => return Poll::Pending, @@ -148,10 +151,6 @@ where Error = A::Error, InitError = A::InitError, >, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, { type Request = A::Request; type Response = B::Response; @@ -180,6 +179,7 @@ where } } +#[pin_project::pin_project] pub struct ThenServiceFactoryResponse where A: ServiceFactory, @@ -190,7 +190,9 @@ where InitError = A::InitError, >, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, a: Option, b: Option, @@ -205,10 +207,6 @@ where Error = A::Error, InitError = A::InitError, >, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, { fn new(fut_a: A::Future, fut_b: B::Future) -> Self { Self { @@ -220,22 +218,6 @@ where } } -impl Unpin for ThenServiceFactoryResponse -where - A: ServiceFactory, - B: ServiceFactory< - Config = A::Config, - Request = Result, - Error = A::Error, - InitError = A::InitError, - >, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, -{ -} - impl Future for ThenServiceFactoryResponse where A: ServiceFactory, @@ -245,24 +227,20 @@ where Error = A::Error, InitError = A::InitError, >, - A::Future: Unpin, - ::Future: Unpin, - B::Future: Unpin, - ::Future: Unpin, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let this = self.project(); if this.a.is_none() { - if let Poll::Ready(service) = Pin::new(&mut this.fut_a).poll(cx)? { - this.a = Some(service); + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } if this.b.is_none() { - if let Poll::Ready(service) = Pin::new(&mut this.fut_b).poll(cx)? { - this.b = Some(service); + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } if this.a.is_some() && this.b.is_some() { diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 2b22b25f..bd298163 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -76,9 +76,7 @@ where pub fn apply(t: T, service: U) -> ApplyTransform where S: ServiceFactory, - S::Future: Unpin, T: Transform, - T::Future: Unpin, U: IntoServiceFactory, { ApplyTransform::new(t, service.into_factory()) @@ -116,9 +114,7 @@ impl Clone for ApplyTransform { impl ServiceFactory for ApplyTransform where S: ServiceFactory, - S::Future: Unpin, T: Transform, - T::Future: Unpin, { type Request = T::Request; type Response = T::Response; @@ -138,12 +134,15 @@ where } } +#[pin_project::pin_project] pub struct ApplyTransformFuture where S: ServiceFactory, T: Transform, { + #[pin] fut_a: S::Future, + #[pin] fut_t: Option, t_cell: Rc, } @@ -151,27 +150,24 @@ where impl Future for ApplyTransformFuture where S: ServiceFactory, - S::Future: Unpin, T: Transform, - T::Future: Unpin, { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.get_mut(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); - if this.fut_t.is_none() { - if let Poll::Ready(service) = Pin::new(&mut this.fut_a).poll(cx)? { - this.fut_t = Some(this.t_cell.new_transform(service)); - } else { - return Poll::Pending; - } + if let Some(fut) = this.fut_t.as_pin_mut() { + return fut.poll(cx); } - if let Some(ref mut fut) = this.fut_t { - Pin::new(fut).poll(cx) + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + let fut = this.t_cell.new_transform(service); + this = self.as_mut().project(); + this.fut_t.set(Some(fut)); + this.fut_t.as_pin_mut().unwrap().poll(cx) } else { - Poll::Pending + return Poll::Pending; } } } diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index acb75340..85f4212e 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -23,6 +23,7 @@ actix-codec = "0.2.0-alpha.1" bytes = "0.4" either = "1.5.2" futures = "0.3.1" +pin-project = "0.4.5" tokio-timer = "0.3.0-alpha.6" tokio-executor = { version="=0.2.0-alpha.6", features=["current-thread"] } log = "0.4" diff --git a/actix-utils/src/either.rs b/actix-utils/src/either.rs index 09533a20..21075526 100644 --- a/actix-utils/src/either.rs +++ b/actix-utils/src/either.rs @@ -83,8 +83,6 @@ where Error = A::Error, InitError = A::InitError, >, - A::Future: Unpin, - B::Future: Unpin, { type Request = either::Either; type Response = A::Response; @@ -114,39 +112,31 @@ impl Clone for Either { } #[doc(hidden)] +#[pin_project::pin_project] pub struct EitherNewService { left: Option, right: Option, + #[pin] left_fut: A::Future, + #[pin] right_fut: B::Future, } -impl Unpin for EitherNewService -where - A: ServiceFactory, - B: ServiceFactory, - A::Future: Unpin, - B::Future: Unpin, -{ -} - impl Future for EitherNewService where A: ServiceFactory, B: ServiceFactory, - A::Future: Unpin, - B::Future: Unpin, { type Output = Result, A::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = self.get_mut(); + let this = self.project(); if this.left.is_none() { - this.left = Some(ready!(Pin::new(&mut this.left_fut).poll(cx))?); + *this.left = Some(ready!(this.left_fut.poll(cx))?); } if this.right.is_none() { - this.right = Some(ready!(Pin::new(&mut this.right_fut).poll(cx))?); + *this.right = Some(ready!(this.right_fut.poll(cx))?); } if this.left.is_some() && this.right.is_some() { diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index ee95354c..3a7086f1 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -77,13 +77,14 @@ type Inner = Cell::Item, S::E /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. +#[pin_project::pin_project] pub struct FramedTransport where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Encoder + Decoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -109,11 +110,11 @@ struct FramedTransportInner { impl FramedTransport where - S: Service, Response = Response> + Unpin, + S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -165,28 +166,28 @@ where impl Future for FramedTransport where - S: Service, Response = Response> + Unpin, - S::Error: Unpin + 'static, + S: Service, Response = Response>, + S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, - ::Error: Unpin + std::fmt::Debug, - ::Error: Unpin + std::fmt::Debug, + ::Error: std::fmt::Debug, + ::Error: std::fmt::Debug, { type Output = Result<(), FramedTransportError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.inner.get_ref().task.register(cx.waker()); - let this = self.get_mut(); + let this = self.project(); poll( cx, - &mut this.service, - &mut this.state, - &mut this.framed, - &mut this.rx, - &mut this.inner, + this.service, + this.state, + this.framed, + this.rx, + this.inner, ) } } @@ -203,8 +204,8 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -257,8 +258,8 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -309,8 +310,8 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite + Unpin, - U: Decoder + Encoder + Unpin, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs index 28761d84..8beff7e7 100644 --- a/actix-utils/src/inflight.rs +++ b/actix-utils/src/inflight.rs @@ -31,7 +31,6 @@ impl Default for InFlight { impl Transform for InFlight where S: Service, - S::Future: Unpin, { type Request = S::Request; type Response = S::Response; @@ -68,7 +67,6 @@ where impl Service for InFlightService where T: Service, - T::Future: Unpin, { type Request = T::Request; type Response = T::Response; @@ -95,19 +93,18 @@ where } #[doc(hidden)] +#[pin_project::pin_project] pub struct InFlightServiceResponse { + #[pin] fut: T::Future, _guard: CounterGuard, } -impl Future for InFlightServiceResponse -where - T::Future: Unpin, -{ +impl Future for InFlightServiceResponse { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - Pin::new(&mut self.get_mut().fut).poll(cx) + self.project().fut.poll(cx) } } diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 7601ba49..b2fcdbd9 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -84,7 +84,6 @@ impl Clone for Timeout { impl Transform for Timeout where S: Service, - S::Future: Unpin, { type Request = S::Request; type Response = S::Response; @@ -126,7 +125,6 @@ where impl Service for TimeoutService where S: Service, - S::Future: Unpin, { type Request = S::Request; type Response = S::Response; @@ -146,8 +144,10 @@ where } /// `TimeoutService` response future +#[pin_project::pin_project] #[derive(Debug)] pub struct TimeoutServiceResponse { + #[pin] fut: T::Future, sleep: Delay, } @@ -155,15 +155,14 @@ pub struct TimeoutServiceResponse { impl Future for TimeoutServiceResponse where T: Service, - T::Future: Unpin, { type Output = Result>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.project(); // First, try polling the future - match Pin::new(&mut this.fut).poll(cx) { + match this.fut.poll(cx) { Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))), Poll::Pending => {}