From a4148de226bcbee70188cf47b41ac0b725775118 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Tue, 28 Jan 2020 19:08:03 +0300 Subject: [PATCH 01/21] add test crashing with segfault according to #1321 --- actix-http/src/body.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 1a2428e14..1c34b06a0 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -612,6 +612,8 @@ mod tests { mod body_stream { use super::*; + use futures::task::noop_waker; + use futures::stream::once; #[actix_rt::test] async fn skips_empty_chunks() { @@ -629,6 +631,25 @@ mod tests { Some(Bytes::from("2")), ); } + + #[actix_rt::test] + async fn move_pinned_pointer() { + let (sender, receiver) = futures::channel::oneshot::channel(); + let mut body_stream = Ok(BodyStream::new(once(async { + let x = Box::new(0i32); + let y = &x; + receiver.await.unwrap(); + let _z = **y; + Ok::<_, ()>(Bytes::new()) + }))); + + let waker = noop_waker(); + let mut context = Context::from_waker(&waker); + + let _ = body_stream.as_mut().unwrap().poll_next(&mut context); + sender.send(()).unwrap(); + let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); + } } mod sized_stream { From 9d04b250f908f3304571162583772e4dc1bddd0e Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Wed, 29 Jan 2020 11:15:13 +0300 Subject: [PATCH 02/21] This is a squashed commit: - Convert MessageBody to accept Pin in poll_next - add CHANGES and increase versions aligned to semver - update crates to accomodate MessageBody Pin change - fix tests and dependencies --- CHANGES.md | 4 +- Cargo.toml | 8 +- actix-http/CHANGES.md | 4 + actix-http/Cargo.toml | 14 +-- actix-http/src/body.rs | 185 ++++++++++++++++------------- actix-http/src/client/h1proto.rs | 7 +- actix-http/src/client/h2proto.rs | 6 +- actix-http/src/encoding/encoder.rs | 77 +++++++----- actix-http/src/error.rs | 6 + actix-http/src/h1/dispatcher.rs | 9 +- actix-http/src/h1/service.rs | 8 +- actix-http/src/h1/utils.rs | 20 ++-- actix-http/src/h2/dispatcher.rs | 4 +- actix-http/src/response.rs | 2 +- actix-identity/Cargo.toml | 8 +- actix-multipart/Cargo.toml | 4 +- actix-session/Cargo.toml | 6 +- actix-web-actors/Cargo.toml | 2 +- awc/CHANGES.md | 1 + awc/Cargo.toml | 10 +- src/middleware/logger.rs | 20 ++-- src/test.rs | 6 +- test-server/CHANGES.md | 2 + test-server/Cargo.toml | 8 +- 24 files changed, 247 insertions(+), 174 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ab9caa7bd..ce8ffd619 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,7 +1,7 @@ # Changes -## [2.0.NEXT] - 2020-01-xx +## [2.1.NEXT] - 2020-01-xx ### Added @@ -15,6 +15,8 @@ * Update the `time` dependency to 0.2.7 +* Accomodate breaking change in actix-http: trait actix_http::MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next() + ## [2.0.0] - 2019-12-25 ### Changed diff --git a/Cargo.toml b/Cargo.toml index d06e47ef8..11d6292a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web" -version = "2.0.0" +version = "3.0.0" authors = ["Nikolay Kim "] description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust." readme = "README.md" @@ -71,8 +71,8 @@ actix-threadpool = "0.3.1" actix-tls = "1.0.0" actix-web-codegen = "0.2.0" -actix-http = "1.0.1" -awc = { version = "1.0.1", default-features = false } +actix-http = { version = "2.0.0", path = "actix-http" } +awc = { version = "2.0.0", path = "awc", default-features = false } bytes = "0.5.3" derive_more = "0.99.2" @@ -107,7 +107,7 @@ opt-level = 3 codegen-units = 1 [patch.crates-io] -actix-web = { path = "." } +actix-web = { path = "." } actix-http = { path = "actix-http" } actix-http-test = { path = "test-server" } actix-web-codegen = { path = "actix-web-codegen" } diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index f2ef217c1..8a25efea1 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -8,6 +8,10 @@ * Moved actors messages support from actix crate, enabled with feature `actors`. +* Breaking change: trait MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next(). + +* MessageBody is not implemented for &'static [u8] anymore. + ### Fixed * Allow `SameSite=None` cookies to be sent in a response. diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index afdb548f5..3b586521c 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "1.0.1" +version = "2.0.0" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" @@ -43,10 +43,10 @@ secure-cookies = ["ring"] actors = ["actix"] [dependencies] -actix-service = "1.0.1" +actix-service = "1.0.5" actix-codec = "0.2.0" -actix-connect = "1.0.1" -actix-utils = "1.0.3" +actix-connect = "1.0.2" +actix-utils = "1.0.6" actix-rt = "1.0.0" actix-threadpool = "0.3.1" actix-tls = { version = "1.0.0", optional = true } @@ -93,9 +93,9 @@ flate2 = { version = "1.0.13", optional = true } fail-ure = { version = "0.1.5", package="failure", optional = true } [dev-dependencies] -actix-server = "1.0.0" -actix-connect = { version = "1.0.0", features=["openssl"] } -actix-http-test = { version = "1.0.0", features=["openssl"] } +actix-server = "1.0.1" +actix-connect = { version = "1.0.2", features=["openssl"] } +actix-http-test = { version = "2.0.0", path = "../test-server", features=["openssl"] } actix-tls = { version = "1.0.0", features=["openssl"] } criterion = "0.3" futures = "0.3.1" diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 1c34b06a0..ea742af5f 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -33,10 +33,10 @@ impl BodySize { } /// Type that provides this trait can be streamed to a peer. -pub trait MessageBody { +pub trait MessageBody: Unpin { fn size(&self) -> BodySize; - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>>; downcast_get_type_id!(); } @@ -48,7 +48,7 @@ impl MessageBody for () { BodySize::Empty } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { Poll::Ready(None) } } @@ -58,15 +58,28 @@ impl MessageBody for Box { self.as_ref().size() } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.as_mut().poll_next(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let a: Pin<&mut T> = Pin::new(self.get_mut().as_mut()); + a.poll_next(cx) } } +impl MessageBody for Box { + fn size(&self) -> BodySize { + self.as_ref().size() + } + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let a: Pin<&mut dyn MessageBody> = Pin::new(self.get_mut().as_mut()); + a.poll_next(cx) + } +} + + #[pin_project] pub enum ResponseBody { - Body(B), - Other(Body), + Body(#[pin] B), + Other(#[pin] Body), } impl ResponseBody { @@ -102,10 +115,12 @@ impl MessageBody for ResponseBody { } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self { - ResponseBody::Body(ref mut body) => body.poll_next(cx), - ResponseBody::Other(ref mut body) => body.poll_next(cx), + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + #[project] + match self.project() { + ResponseBody::Body(body) => body.poll_next(cx), + ResponseBody::Other(body) => body.poll_next(cx), } } } @@ -120,12 +135,13 @@ impl Stream for ResponseBody { ) -> Poll> { #[project] match self.project() { - ResponseBody::Body(ref mut body) => body.poll_next(cx), - ResponseBody::Other(ref mut body) => body.poll_next(cx), + ResponseBody::Body(body) => body.poll_next(cx), + ResponseBody::Other(body) => body.poll_next(cx), } } } +#[pin_project] /// Represents various types of http message body. pub enum Body { /// Empty response. `Content-Length` header is not set. @@ -135,7 +151,7 @@ pub enum Body { /// Specific response body. Bytes(Bytes), /// Generic message body. - Message(Box), + Message(#[pin] Box), } impl Body { @@ -160,8 +176,10 @@ impl MessageBody for Body { } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self { + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + #[project] + match self.project() { Body::None => Poll::Ready(None), Body::Empty => Poll::Ready(None), Body::Bytes(ref mut bin) => { @@ -172,7 +190,7 @@ impl MessageBody for Body { Poll::Ready(Some(Ok(mem::replace(bin, Bytes::new())))) } } - Body::Message(ref mut body) => body.poll_next(cx), + Body::Message(body) => body.poll_next(cx), } } } @@ -258,7 +276,7 @@ impl From for Body { impl From> for Body where - S: Stream> + 'static, + S: Stream> + Unpin + 'static, { fn from(s: SizedStream) -> Body { Body::from_message(s) @@ -267,7 +285,7 @@ where impl From> for Body where - S: Stream> + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { fn from(s: BodyStream) -> Body { @@ -280,11 +298,11 @@ impl MessageBody for Bytes { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(mem::replace(self, Bytes::new())))) + Poll::Ready(Some(Ok(mem::replace(self.get_mut(), Bytes::new())))) } } } @@ -294,11 +312,11 @@ impl MessageBody for BytesMut { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(mem::replace(self, BytesMut::new()).freeze()))) + Poll::Ready(Some(Ok(mem::replace(self.get_mut(), BytesMut::new()).freeze()))) } } } @@ -308,41 +326,27 @@ impl MessageBody for &'static str { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { Poll::Ready(Some(Ok(Bytes::from_static( - mem::replace(self, "").as_ref(), + mem::replace(self.get_mut(), "").as_ref(), )))) } } } -impl MessageBody for &'static [u8] { - fn size(&self) -> BodySize { - BodySize::Sized(self.len()) - } - - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - if self.is_empty() { - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(Bytes::from_static(mem::replace(self, b""))))) - } - } -} - impl MessageBody for Vec { fn size(&self) -> BodySize { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(Bytes::from(mem::replace(self, Vec::new()))))) + Poll::Ready(Some(Ok(Bytes::from(mem::replace(self.get_mut(), Vec::new()))))) } } } @@ -352,12 +356,12 @@ impl MessageBody for String { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { Poll::Ready(Some(Ok(Bytes::from( - mem::replace(self, String::new()).into_bytes(), + mem::replace(self.get_mut(), String::new()).into_bytes(), )))) } } @@ -365,14 +369,16 @@ impl MessageBody for String { /// Type represent streaming body. /// Response does not contain `content-length` header and appropriate transfer encoding is used. -pub struct BodyStream { - stream: Pin>, +#[pin_project] +pub struct BodyStream { + #[pin] + stream: S, _t: PhantomData, } impl BodyStream where - S: Stream>, + S: Stream> + Unpin, E: Into, { pub fn new(stream: S) -> Self { @@ -385,7 +391,7 @@ where impl MessageBody for BodyStream where - S: Stream>, + S: Stream> + Unpin, E: Into, { fn size(&self) -> BodySize { @@ -397,10 +403,11 @@ where /// Empty values are skipped to prevent [`BodyStream`]'s transmission being /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let mut stream = self.stream.as_mut(); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let mut stream = self.project().stream; loop { - return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + let stream = stream.as_mut(); + return Poll::Ready(match ready!(stream.poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, opt => opt.map(|res| res.map_err(Into::into)), }); @@ -410,14 +417,15 @@ where /// Type represent streaming body. This body implementation should be used /// if total size of stream is known. Data get sent as is without using transfer encoding. -pub struct SizedStream { +#[pin_project] +pub struct SizedStream { size: u64, stream: Pin>, } impl SizedStream where - S: Stream>, + S: Stream> + Unpin, { pub fn new(size: u64, stream: S) -> Self { SizedStream { @@ -429,7 +437,7 @@ where impl MessageBody for SizedStream where - S: Stream>, + S: Stream> + Unpin, { fn size(&self) -> BodySize { BodySize::Sized64(self.size) @@ -440,10 +448,11 @@ where /// Empty values are skipped to prevent [`SizedStream`]'s transmission being /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let mut stream = self.stream.as_mut(); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let mut stream: Pin<&mut S> = self.project().stream; loop { - return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + let stream = stream.as_mut(); + return Poll::Ready(match ready!(stream.poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, val => val, }); @@ -456,6 +465,7 @@ mod tests { use super::*; use futures::stream; use futures_util::future::poll_fn; + use futures_util::pin_mut; impl Body { pub(crate) fn get_ref(&self) -> &[u8] { @@ -483,7 +493,7 @@ mod tests { assert_eq!("test".size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| "test".poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| Pin::new(&mut "test").poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } @@ -497,10 +507,12 @@ mod tests { BodySize::Sized(4) ); assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test"); + let sb = Bytes::from(&b"test"[..]); + pin_mut!(sb); - assert_eq!((&b"test"[..]).size(), BodySize::Sized(4)); + assert_eq!(sb.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| (&b"test"[..]).poll_next(cx)) + poll_fn(|cx| sb.as_mut().poll_next(cx)) .await .unwrap() .ok(), @@ -512,10 +524,12 @@ mod tests { async fn test_vec() { assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4)); assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test"); + let test_vec = Vec::from("test"); + pin_mut!(test_vec); - assert_eq!(Vec::from("test").size(), BodySize::Sized(4)); + assert_eq!(test_vec.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| Vec::from("test").poll_next(cx)) + poll_fn(|cx| test_vec.as_mut().poll_next(cx)) .await .unwrap() .ok(), @@ -525,41 +539,44 @@ mod tests { #[actix_rt::test] async fn test_bytes() { - let mut b = Bytes::from("test"); + let b = Bytes::from("test"); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).get_ref(), b"test"); + pin_mut!(b); assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } - + #[actix_rt::test] async fn test_bytes_mut() { - let mut b = BytesMut::from("test"); + let b = BytesMut::from("test"); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).get_ref(), b"test"); + pin_mut!(b); assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } #[actix_rt::test] async fn test_string() { - let mut b = "test".to_owned(); + let b = "test".to_owned(); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).get_ref(), b"test"); assert_eq!(Body::from(&b).size(), BodySize::Sized(4)); assert_eq!(Body::from(&b).get_ref(), b"test"); + pin_mut!(b); assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } @@ -567,14 +584,15 @@ mod tests { #[actix_rt::test] async fn test_unit() { assert_eq!(().size(), BodySize::Empty); - assert!(poll_fn(|cx| ().poll_next(cx)).await.is_none()); + assert!(poll_fn(|cx| Pin::new(&mut ()).poll_next(cx)).await.is_none()); } #[actix_rt::test] async fn test_box() { - let mut val = Box::new(()); + let val = Box::new(()); + pin_mut!(val); assert_eq!(val.size(), BodySize::Empty); - assert!(poll_fn(|cx| val.poll_next(cx)).await.is_none()); + assert!(poll_fn(|cx| val.as_mut().poll_next(cx)).await.is_none()); } #[actix_rt::test] @@ -612,26 +630,29 @@ mod tests { mod body_stream { use super::*; - use futures::task::noop_waker; - use futures::stream::once; + //use futures::task::noop_waker; + //use futures::stream::once; #[actix_rt::test] async fn skips_empty_chunks() { - let mut body = BodyStream::new(stream::iter( + let body = BodyStream::new(stream::iter( ["1", "", "2"] .iter() .map(|&v| Ok(Bytes::from(v)) as Result), )); + pin_mut!(body); + assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("1")), ); assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("2")), ); } + /* Now it does not compile as it should #[actix_rt::test] async fn move_pinned_pointer() { let (sender, receiver) = futures::channel::oneshot::channel(); @@ -645,11 +666,12 @@ mod tests { let waker = noop_waker(); let mut context = Context::from_waker(&waker); - + pin_mut!(body_stream); + let _ = body_stream.as_mut().unwrap().poll_next(&mut context); sender.send(()).unwrap(); let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); - } + }*/ } mod sized_stream { @@ -657,16 +679,17 @@ mod tests { #[actix_rt::test] async fn skips_empty_chunks() { - let mut body = SizedStream::new( + let body = SizedStream::new( 2, stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), ); + pin_mut!(body); assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("1")), ); assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("2")), ); } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index a0a20edf6..c1863b920 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -8,7 +8,7 @@ use bytes::buf::BufMutExt; use bytes::{Bytes, BytesMut}; use futures_core::Stream; use futures_util::future::poll_fn; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{SinkExt, StreamExt, pin_mut}; use crate::error::PayloadError; use crate::h1; @@ -120,7 +120,7 @@ where /// send request body to the peer pub(crate) async fn send_body( - mut body: B, + body: B, framed: &mut Framed, ) -> Result<(), SendRequestError> where @@ -128,9 +128,10 @@ where B: MessageBody, { let mut eof = false; + pin_mut!(body); while !eof { while !eof && !framed.is_write_buf_full() { - match poll_fn(|cx| body.poll_next(cx)).await { + match poll_fn(|cx| body.as_mut().poll_next(cx)).await { Some(result) => { framed.write(h1::Message::Chunk(Some(result?)))?; } diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index eabf54e97..69d20752a 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -4,6 +4,7 @@ use std::time; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; use futures_util::future::poll_fn; +use futures_util::pin_mut; use h2::{client::SendRequest, SendStream}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, Method, Version}; @@ -123,13 +124,14 @@ where } async fn send_body( - mut body: B, + body: B, mut send: SendStream, ) -> Result<(), SendRequestError> { let mut buf = None; + pin_mut!(body); loop { if buf.is_none() { - match poll_fn(|cx| body.poll_next(cx)).await { + match poll_fn(|cx| body.as_mut().poll_next(cx)).await { Some(Ok(b)) => { send.reserve_capacity(b.len()); buf = Some(b); diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index ca04845ab..6530609e1 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -9,6 +9,7 @@ use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::write::{GzEncoder, ZlibEncoder}; use futures_core::ready; +use pin_project::{pin_project, project}; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; @@ -19,8 +20,10 @@ use super::Writer; const INPLACE: usize = 1024; +#[pin_project] pub struct Encoder { eof: bool, + #[pin] body: EncoderBody, encoder: Option, fut: Option>, @@ -76,67 +79,83 @@ impl Encoder { } } +#[pin_project] enum EncoderBody { Bytes(Bytes), - Stream(B), - BoxedStream(Box), + Stream(#[pin] B), + BoxedStream(#[pin] Box), } +impl MessageBody for EncoderBody { + fn size(&self) -> BodySize { + match self { + EncoderBody::Bytes(ref b) => b.size(), + EncoderBody::Stream(ref b) => b.size(), + EncoderBody::BoxedStream(ref b) => b.size(), + } + } + + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + #[project] + match self.project() { + EncoderBody::Bytes(b) => { + if b.is_empty() { + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new())))) + } + } + EncoderBody::Stream(b) => b.poll_next(cx), + EncoderBody::BoxedStream(b) => b.poll_next(cx), + } + } +} + + impl MessageBody for Encoder { fn size(&self) -> BodySize { if self.encoder.is_none() { - match self.body { - EncoderBody::Bytes(ref b) => b.size(), - EncoderBody::Stream(ref b) => b.size(), - EncoderBody::BoxedStream(ref b) => b.size(), - } + self.body.size() } else { BodySize::Stream } } - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let mut this = self.project(); loop { - if self.eof { + if *this.eof { return Poll::Ready(None); } - if let Some(ref mut fut) = self.fut { + if let Some(ref mut fut) = this.fut { let mut encoder = match ready!(Pin::new(fut).poll(cx)) { Ok(item) => item, Err(e) => return Poll::Ready(Some(Err(e.into()))), }; let chunk = encoder.take(); - self.encoder = Some(encoder); - self.fut.take(); + *this.encoder = Some(encoder); + this.fut.take(); if !chunk.is_empty() { return Poll::Ready(Some(Ok(chunk))); } } - let result = match self.body { - EncoderBody::Bytes(ref mut b) => { - if b.is_empty() { - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new())))) - } - } - EncoderBody::Stream(ref mut b) => b.poll_next(cx), - EncoderBody::BoxedStream(ref mut b) => b.poll_next(cx), - }; + let result = this.body.as_mut().poll_next(cx); + match result { Poll::Ready(Some(Ok(chunk))) => { - if let Some(mut encoder) = self.encoder.take() { + if let Some(mut encoder) = this.encoder.take() { if chunk.len() < INPLACE { encoder.write(&chunk)?; let chunk = encoder.take(); - self.encoder = Some(encoder); + *this.encoder = Some(encoder); if !chunk.is_empty() { return Poll::Ready(Some(Ok(chunk))); } } else { - self.fut = Some(run(move || { + *this.fut = Some(run(move || { encoder.write(&chunk)?; Ok(encoder) })); @@ -146,12 +165,12 @@ impl MessageBody for Encoder { } } Poll::Ready(None) => { - if let Some(encoder) = self.encoder.take() { + if let Some(encoder) = this.encoder.take() { let chunk = encoder.finish()?; if chunk.is_empty() { return Poll::Ready(None); } else { - self.eof = true; + *this.eof = true; return Poll::Ready(Some(Ok(chunk))); } } else { diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 4b8f13cf0..8a1c1b5dc 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -59,6 +59,12 @@ impl Error { } } +/// A struct with a private constructor, for use with +/// `__private_get_type_id__`. Its single field is private, +/// ensuring that it can only be constructed from this module +#[doc(hidden)] +pub struct PrivateHelper(()); + /// Error that can be converted to `Response` pub trait ResponseError: fmt::Debug + fmt::Display { /// Response's status code diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index a496fd993..eb60893ff 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -170,7 +170,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody, + B: MessageBody+Unpin, X: Service, X::Error: Into, U: Service), Response = ()>, @@ -258,7 +258,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody, + B: MessageBody+Unpin, X: Service, X::Error: Into, U: Service), Response = ()>, @@ -402,9 +402,10 @@ where } } State::SendPayload(ref mut stream) => { + let mut stream = Pin::new(stream); loop { if self.write_buf.len() < HW_BUFFER_SIZE { - match stream.poll_next(cx) { + match stream.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { self.codec.encode( Message::Chunk(Some(item)), @@ -687,7 +688,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody, + B: MessageBody+Unpin, X: Service, X::Error: Into, U: Service), Response = ()>, diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 4d1a1dc1b..84e1112e9 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -63,7 +63,7 @@ where S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, - B: MessageBody, + B: MessageBody+Unpin, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, @@ -106,7 +106,7 @@ mod openssl { S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, - B: MessageBody, + B: MessageBody+Unpin, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, @@ -250,7 +250,7 @@ where S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, - B: MessageBody, + B: MessageBody+Unpin, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, @@ -408,7 +408,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody, + B: MessageBody+Unpin, X: Service, X::Error: Into, U: Service), Response = ()>, diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 9ba4aa053..be6a42793 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -13,6 +13,7 @@ use crate::response::Response; #[pin_project::pin_project] pub struct SendResponse { res: Option, BodySize)>>, + #[pin] body: Option>, framed: Option>, } @@ -39,20 +40,23 @@ where { type Output = Result, Error>; + // TODO: rethink if we need loops in polls fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.project(); + let mut body_done = this.body.is_none(); loop { - let mut body_ready = this.body.is_some(); + let mut body_ready = !body_done; let framed = this.framed.as_mut().unwrap(); // send body - if this.res.is_none() && this.body.is_some() { - while body_ready && this.body.is_some() && !framed.is_write_buf_full() { - match this.body.as_mut().unwrap().poll_next(cx)? { + if this.res.is_none() && body_ready { + while body_ready && !body_done && !framed.is_write_buf_full() { + match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { Poll::Ready(item) => { - // body is done - if item.is_none() { + // body is done when item is None + body_done = item.is_none(); + if body_done { let _ = this.body.take(); } framed.write(Message::Chunk(item))?; @@ -82,7 +86,7 @@ where continue; } - if this.body.is_some() { + if body_done { if body_ready { continue; } else { diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 8b17e9479..4b3752ffe 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -168,7 +168,7 @@ struct ServiceResponse { #[pin_project::pin_project] enum ServiceResponseState { ServiceCall(#[pin] F, Option>), - SendPayload(SendStream, ResponseBody), + SendPayload(SendStream, #[pin] ResponseBody), } impl ServiceResponse @@ -338,7 +338,7 @@ where } } } else { - match body.poll_next(cx) { + match body.as_mut().poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { if let Err(e) = stream.send_data(Bytes::new(), true) { diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index fcdcd7cdf..655d565ad 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -637,7 +637,7 @@ impl ResponseBuilder { /// `ResponseBuilder` can not be used after this call. pub fn streaming(&mut self, stream: S) -> Response where - S: Stream> + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { self.body(Body::from_message(BodyStream::new(stream))) diff --git a/actix-identity/Cargo.toml b/actix-identity/Cargo.toml index efeb24bda..47c792657 100644 --- a/actix-identity/Cargo.toml +++ b/actix-identity/Cargo.toml @@ -16,8 +16,8 @@ name = "actix_identity" path = "src/lib.rs" [dependencies] -actix-web = { version = "2.0.0", default-features = false, features = ["secure-cookies"] } -actix-service = "1.0.2" +actix-web = { version = "3.0.0", path = "..", default-features = false, features = ["secure-cookies"] } +actix-service = "1.0.5" futures = "0.3.1" serde = "1.0" serde_json = "1.0" @@ -25,5 +25,5 @@ time = { version = "0.2.5", default-features = false, features = ["std"] } [dev-dependencies] actix-rt = "1.0.0" -actix-http = "1.0.1" -bytes = "0.5.3" +actix-http = { version = "2.0.0", path = "../actix-http" } +bytes = "0.5.4" diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index f9cd7cfd2..7273fb4ce 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -16,7 +16,7 @@ name = "actix_multipart" path = "src/lib.rs" [dependencies] -actix-web = { version = "2.0.0-rc", default-features = false } +actix-web = { version = "2.0.0", default-features = false } actix-service = "1.0.1" actix-utils = "1.0.3" bytes = "0.5.3" @@ -29,4 +29,4 @@ twoway = "0.2" [dev-dependencies] actix-rt = "1.0.0" -actix-http = "1.0.0" +actix-http = "1.0.1" diff --git a/actix-session/Cargo.toml b/actix-session/Cargo.toml index b279c9d89..d886a5d8e 100644 --- a/actix-session/Cargo.toml +++ b/actix-session/Cargo.toml @@ -22,9 +22,9 @@ default = ["cookie-session"] cookie-session = ["actix-web/secure-cookies"] [dependencies] -actix-web = "2.0.0-rc" -actix-service = "1.0.1" -bytes = "0.5.3" +actix-web = { version = "3.0.0", path = ".." } +actix-service = "1.0.5" +bytes = "0.5.4" derive_more = "0.99.2" futures = "0.3.1" serde = "1.0" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index 6f573e442..7851e1b7f 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] actix = "0.9.0" -actix-web = "2.0.0-rc" +actix-web = "2.0.0" actix-http = "1.0.1" actix-codec = "0.2.0" bytes = "0.5.2" diff --git a/awc/CHANGES.md b/awc/CHANGES.md index d9b26e453..917549d09 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -4,6 +4,7 @@ * Fix compilation with default features off +* Accomodate breaking change: trait actix_http::MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next() ## [1.0.0] - 2019-12-13 diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 67e0a3ee4..71623e2dc 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awc" -version = "1.0.1" +version = "2.0.0" authors = ["Nikolay Kim "] description = "Actix http client." readme = "README.md" @@ -36,7 +36,7 @@ compress = ["actix-http/compress"] [dependencies] actix-codec = "0.2.0" actix-service = "1.0.1" -actix-http = "1.0.0" +actix-http = { version = "2.0.0", path = "../actix-http" } actix-rt = "1.0.0" base64 = "0.11" @@ -55,9 +55,9 @@ rust-tls = { version = "0.16.0", package="rustls", optional = true, features = [ [dev-dependencies] actix-connect = { version = "1.0.1", features=["openssl"] } -actix-web = { version = "2.0.0-rc", features=["openssl"] } -actix-http = { version = "1.0.1", features=["openssl"] } -actix-http-test = { version = "1.0.0", features=["openssl"] } +actix-web = { version = "3.0.0", path = "..", features=["openssl"] } +actix-http = { version = "2.0.0", path = "../actix-http", features=["openssl"] } +actix-http-test = { version = "2.0.0", path = "../test-server", features=["openssl"] } actix-utils = "1.0.3" actix-server = "1.0.0" actix-tls = { version = "1.0.0", features=["openssl", "rustls"] } diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index d692132ce..661ced966 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -238,19 +238,24 @@ where } } +use pin_project::{pin_project, pinned_drop}; +#[pin_project(PinnedDrop)] pub struct StreamLog { + #[pin] body: ResponseBody, format: Option, size: usize, time: OffsetDateTime, } -impl Drop for StreamLog { - fn drop(&mut self) { - if let Some(ref format) = self.format { +#[pinned_drop] +impl PinnedDrop for StreamLog { + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + if let Some(ref format) = this.format { let render = |fmt: &mut Formatter<'_>| { for unit in &format.0 { - unit.render(fmt, self.size, self.time)?; + unit.render(fmt, *this.size, *this.time)?; } Ok(()) }; @@ -264,10 +269,11 @@ impl MessageBody for StreamLog { self.body.size() } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self.body.poll_next(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let this = self.project(); + match this.body.poll_next(cx) { Poll::Ready(Some(Ok(chunk))) => { - self.size += chunk.len(); + *this.size += chunk.len(); Poll::Ready(Some(Ok(chunk))) } val => val, diff --git a/src/test.rs b/src/test.rs index 956980530..957e66638 100644 --- a/src/test.rs +++ b/src/test.rs @@ -953,7 +953,6 @@ impl Drop for TestServer { #[cfg(test)] mod tests { use actix_http::httpmessage::HttpMessage; - use futures::FutureExt; use serde::{Deserialize, Serialize}; use std::time::SystemTime; @@ -1163,6 +1162,9 @@ mod tests { assert!(res.status().is_success()); } +/* + use futures::FutureExt; + #[actix_rt::test] async fn test_actor() { use actix::Actor; @@ -1183,7 +1185,6 @@ mod tests { } } - let addr = MyActor.start(); let mut app = init_service(App::new().service(web::resource("/index.html").to( move || { @@ -1205,4 +1206,5 @@ mod tests { let res = app.call(req).await.unwrap(); assert!(res.status().is_success()); } +*/ } diff --git a/test-server/CHANGES.md b/test-server/CHANGES.md index f8b29f39d..d262d219c 100644 --- a/test-server/CHANGES.md +++ b/test-server/CHANGES.md @@ -4,6 +4,8 @@ * Update the `time` dependency to 0.2.7 +* Breaking change: trait actix_http::MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next() + ## [1.0.0] - 2019-12-13 diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index 774c8f0b2..7ea0a9ba4 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http-test" -version = "1.0.0" +version = "2.0.0" authors = ["Nikolay Kim "] description = "Actix http test server" readme = "README.md" @@ -37,7 +37,7 @@ actix-utils = "1.0.3" actix-rt = "1.0.0" actix-server = "1.0.0" actix-testing = "1.0.0" -awc = "1.0.0" +awc = { version = "2.0.0", path = "../awc" } base64 = "0.11" bytes = "0.5.3" @@ -55,5 +55,5 @@ time = { version = "0.2.7", default-features = false, features = ["std"] } open-ssl = { version="0.10", package="openssl", optional = true } [dev-dependencies] -actix-web = "2.0.0-rc" -actix-http = "1.0.1" +actix-web = { version = "3.0.0", path = ".." } +actix-http = { version = "2.0.0", path = "../actix-http" } From 62aba424e2a5c8ac41267b7a2e73e69fe6cdc1e2 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Thu, 30 Jan 2020 19:33:49 +0200 Subject: [PATCH 03/21] Rollback actix-http-test dependency to show the issue --- actix-http/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 3b586521c..10e9611fd 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -95,7 +95,7 @@ fail-ure = { version = "0.1.5", package="failure", optional = true } [dev-dependencies] actix-server = "1.0.1" actix-connect = { version = "1.0.2", features=["openssl"] } -actix-http-test = { version = "2.0.0", path = "../test-server", features=["openssl"] } +actix-http-test = { version = "1.0.0", features=["openssl"] } actix-tls = { version = "1.0.0", features=["openssl"] } criterion = "0.3" futures = "0.3.1" From 09a391a3ca598a8be03f41c656ab70706fd6d2bd Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Fri, 31 Jan 2020 10:29:10 +0200 Subject: [PATCH 04/21] rollback changes to actix-web, awc and test-server for now --- Cargo.toml | 8 ++++---- actix-http/src/body.rs | 10 ++++------ actix-http/tests/test_ws.rs | 5 +++++ awc/Cargo.toml | 10 +++++----- src/middleware/logger.rs | 20 +++++++------------- test-server/Cargo.toml | 4 ++-- 6 files changed, 27 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 11d6292a1..b9b647568 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,8 +33,8 @@ members = [ "actix-cors", "actix-files", "actix-framed", - "actix-session", - "actix-identity", +# "actix-session", +# "actix-identity", "actix-multipart", "actix-web-actors", "actix-web-codegen", @@ -71,8 +71,8 @@ actix-threadpool = "0.3.1" actix-tls = "1.0.0" actix-web-codegen = "0.2.0" -actix-http = { version = "2.0.0", path = "actix-http" } -awc = { version = "2.0.0", path = "awc", default-features = false } +actix-http = { version = "1.0.1" } +awc = { version = "1.0.1", default-features = false } bytes = "0.5.3" derive_more = "0.99.2" diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index ea742af5f..74e6e218d 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -383,7 +383,7 @@ where { pub fn new(stream: S) -> Self { BodyStream { - stream: Box::pin(stream), + stream, _t: PhantomData, } } @@ -420,7 +420,8 @@ where #[pin_project] pub struct SizedStream { size: u64, - stream: Pin>, + #[pin] + stream: S, } impl SizedStream @@ -428,10 +429,7 @@ where S: Stream> + Unpin, { pub fn new(size: u64, stream: S) -> Self { - SizedStream { - size, - stream: Box::pin(stream), - } + SizedStream { size, stream } } } diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index 7152fee48..2f2a28e2f 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -81,6 +81,9 @@ async fn service(msg: ws::Frame) -> Result { Ok(msg) } +/* +Temporarily commented out due to dependency on actix-http-test + #[actix_rt::test] async fn test_simple() { let ws_service = WsService::new(); @@ -192,3 +195,5 @@ async fn test_simple() { assert!(ws_service.was_polled()); } + +*/ \ No newline at end of file diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 71623e2dc..f7d5634e0 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awc" -version = "2.0.0" +version = "1.0.1" authors = ["Nikolay Kim "] description = "Actix http client." readme = "README.md" @@ -36,7 +36,7 @@ compress = ["actix-http/compress"] [dependencies] actix-codec = "0.2.0" actix-service = "1.0.1" -actix-http = { version = "2.0.0", path = "../actix-http" } +actix-http = { version = "1.0.1" } actix-rt = "1.0.0" base64 = "0.11" @@ -55,9 +55,9 @@ rust-tls = { version = "0.16.0", package="rustls", optional = true, features = [ [dev-dependencies] actix-connect = { version = "1.0.1", features=["openssl"] } -actix-web = { version = "3.0.0", path = "..", features=["openssl"] } -actix-http = { version = "2.0.0", path = "../actix-http", features=["openssl"] } -actix-http-test = { version = "2.0.0", path = "../test-server", features=["openssl"] } +actix-web = { version = "2.0.0", features=["openssl"] } +actix-http = { version = "1.0.1", features=["openssl"] } +actix-http-test = { version = "1.0.0", features=["openssl"] } actix-utils = "1.0.3" actix-server = "1.0.0" actix-tls = { version = "1.0.0", features=["openssl", "rustls"] } diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index 661ced966..d692132ce 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -238,24 +238,19 @@ where } } -use pin_project::{pin_project, pinned_drop}; -#[pin_project(PinnedDrop)] pub struct StreamLog { - #[pin] body: ResponseBody, format: Option, size: usize, time: OffsetDateTime, } -#[pinned_drop] -impl PinnedDrop for StreamLog { - fn drop(self: Pin<&mut Self>) { - let this = self.project(); - if let Some(ref format) = this.format { +impl Drop for StreamLog { + fn drop(&mut self) { + if let Some(ref format) = self.format { let render = |fmt: &mut Formatter<'_>| { for unit in &format.0 { - unit.render(fmt, *this.size, *this.time)?; + unit.render(fmt, self.size, self.time)?; } Ok(()) }; @@ -269,11 +264,10 @@ impl MessageBody for StreamLog { self.body.size() } - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let this = self.project(); - match this.body.poll_next(cx) { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + match self.body.poll_next(cx) { Poll::Ready(Some(Ok(chunk))) => { - *this.size += chunk.len(); + self.size += chunk.len(); Poll::Ready(Some(Ok(chunk))) } val => val, diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index 7ea0a9ba4..51c25f8d1 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http-test" -version = "2.0.0" +version = "1.0.0" authors = ["Nikolay Kim "] description = "Actix http test server" readme = "README.md" @@ -37,7 +37,7 @@ actix-utils = "1.0.3" actix-rt = "1.0.0" actix-server = "1.0.0" actix-testing = "1.0.0" -awc = { version = "2.0.0", path = "../awc" } +awc = { version = "1.0.1" } base64 = "0.11" bytes = "0.5.3" From d9c415e5403cdf28f34eafc9aeef240753d5356e Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Fri, 31 Jan 2020 12:09:18 +0200 Subject: [PATCH 05/21] disable weird poll test until actix-web based on actix-http:2 --- tests/test_weird_poll.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_weird_poll.rs b/tests/test_weird_poll.rs index 21d1d611a..768dda1c8 100644 --- a/tests/test_weird_poll.rs +++ b/tests/test_weird_poll.rs @@ -5,6 +5,9 @@ use futures::stream::once; use actix_http::body::{MessageBody, BodyStream}; use bytes::Bytes; +/* +Disable weird poll until actix-web is based on actix-http 2.0.0 + #[test] fn weird_poll() { let (sender, receiver) = futures::channel::oneshot::channel(); @@ -24,3 +27,4 @@ fn weird_poll() { let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); } +*/ \ No newline at end of file From 835a00599c1bb68561f9495b9ae38d2a3d119f08 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Fri, 31 Jan 2020 12:39:53 +0200 Subject: [PATCH 06/21] rollback missed dependencies and CHANGES in crates except actix-http --- CHANGES.md | 4 +--- Cargo.toml | 2 +- actix-identity/Cargo.toml | 4 ++-- actix-session/Cargo.toml | 2 +- awc/CHANGES.md | 2 -- src/test.rs | 4 ++++ test-server/CHANGES.md | 3 --- test-server/Cargo.toml | 4 ++-- tests/test_weird_poll.rs | 2 +- 9 files changed, 12 insertions(+), 15 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ce8ffd619..ab9caa7bd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,7 +1,7 @@ # Changes -## [2.1.NEXT] - 2020-01-xx +## [2.0.NEXT] - 2020-01-xx ### Added @@ -15,8 +15,6 @@ * Update the `time` dependency to 0.2.7 -* Accomodate breaking change in actix-http: trait actix_http::MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next() - ## [2.0.0] - 2019-12-25 ### Changed diff --git a/Cargo.toml b/Cargo.toml index b9b647568..5ba358e2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web" -version = "3.0.0" +version = "2.0.0" authors = ["Nikolay Kim "] description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust." readme = "README.md" diff --git a/actix-identity/Cargo.toml b/actix-identity/Cargo.toml index 47c792657..910aef48e 100644 --- a/actix-identity/Cargo.toml +++ b/actix-identity/Cargo.toml @@ -16,7 +16,7 @@ name = "actix_identity" path = "src/lib.rs" [dependencies] -actix-web = { version = "3.0.0", path = "..", default-features = false, features = ["secure-cookies"] } +actix-web = { version = "2.0.0", default-features = false, features = ["secure-cookies"] } actix-service = "1.0.5" futures = "0.3.1" serde = "1.0" @@ -25,5 +25,5 @@ time = { version = "0.2.5", default-features = false, features = ["std"] } [dev-dependencies] actix-rt = "1.0.0" -actix-http = { version = "2.0.0", path = "../actix-http" } +actix-http = { version = "1.0.1" } bytes = "0.5.4" diff --git a/actix-session/Cargo.toml b/actix-session/Cargo.toml index d886a5d8e..b0a89ee29 100644 --- a/actix-session/Cargo.toml +++ b/actix-session/Cargo.toml @@ -22,7 +22,7 @@ default = ["cookie-session"] cookie-session = ["actix-web/secure-cookies"] [dependencies] -actix-web = { version = "3.0.0", path = ".." } +actix-web = { version = "2.0.0" } actix-service = "1.0.5" bytes = "0.5.4" derive_more = "0.99.2" diff --git a/awc/CHANGES.md b/awc/CHANGES.md index 917549d09..d410ae514 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -4,8 +4,6 @@ * Fix compilation with default features off -* Accomodate breaking change: trait actix_http::MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next() - ## [1.0.0] - 2019-12-13 * Release diff --git a/src/test.rs b/src/test.rs index 957e66638..6a6ef27c5 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1163,6 +1163,10 @@ mod tests { } /* + + Comment out until actix decoupled of actix-http: + https://github.com/actix/actix/issues/321 + use futures::FutureExt; #[actix_rt::test] diff --git a/test-server/CHANGES.md b/test-server/CHANGES.md index d262d219c..96c010355 100644 --- a/test-server/CHANGES.md +++ b/test-server/CHANGES.md @@ -4,9 +4,6 @@ * Update the `time` dependency to 0.2.7 -* Breaking change: trait actix_http::MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next() - - ## [1.0.0] - 2019-12-13 ### Changed diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index 51c25f8d1..f4ec1e238 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -55,5 +55,5 @@ time = { version = "0.2.7", default-features = false, features = ["std"] } open-ssl = { version="0.10", package="openssl", optional = true } [dev-dependencies] -actix-web = { version = "3.0.0", path = ".." } -actix-http = { version = "2.0.0", path = "../actix-http" } +actix-web = { version = "2.0.0" } +actix-http = { version = "1.0.1" } diff --git a/tests/test_weird_poll.rs b/tests/test_weird_poll.rs index 768dda1c8..571b69f45 100644 --- a/tests/test_weird_poll.rs +++ b/tests/test_weird_poll.rs @@ -27,4 +27,4 @@ fn weird_poll() { let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); } -*/ \ No newline at end of file +*/ From eeebc653fdb5f0f2b9673923c8a79a312c78ffd1 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Fri, 31 Jan 2020 13:29:51 +0200 Subject: [PATCH 07/21] change actix-http version to alpha --- actix-http/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 10e9611fd..dfb467d40 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "2.0.0" +version = "2.0.0-alpha" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" From 2e2ea7ab800e7325bd9a264b6cad177e5390df87 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Fri, 31 Jan 2020 22:16:31 +0200 Subject: [PATCH 08/21] remove extra whitespaces and Unpins --- Cargo.toml | 2 +- actix-http/src/h1/dispatcher.rs | 6 +++--- actix-http/src/h1/service.rs | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ba358e2c..550df49dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ opt-level = 3 codegen-units = 1 [patch.crates-io] -actix-web = { path = "." } +actix-web = { path = "." } actix-http = { path = "actix-http" } actix-http-test = { path = "test-server" } actix-web-codegen = { path = "actix-web-codegen" } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index eb60893ff..0f897561d 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -170,7 +170,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody+Unpin, + B: MessageBody, X: Service, X::Error: Into, U: Service), Response = ()>, @@ -258,7 +258,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody+Unpin, + B: MessageBody, X: Service, X::Error: Into, U: Service), Response = ()>, @@ -688,7 +688,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody+Unpin, + B: MessageBody, X: Service, X::Error: Into, U: Service), Response = ()>, diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 84e1112e9..4d1a1dc1b 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -63,7 +63,7 @@ where S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, - B: MessageBody+Unpin, + B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, @@ -106,7 +106,7 @@ mod openssl { S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, - B: MessageBody+Unpin, + B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, @@ -250,7 +250,7 @@ where S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, - B: MessageBody+Unpin, + B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, @@ -408,7 +408,7 @@ where S: Service, S::Error: Into, S::Response: Into>, - B: MessageBody+Unpin, + B: MessageBody, X: Service, X::Error: Into, U: Service), Response = ()>, From ec5c7797325e63d59eaa54597916d72e95343975 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Mon, 3 Feb 2020 22:55:49 +0200 Subject: [PATCH 09/21] unlink MessageBody from Unpin --- actix-http/src/body.rs | 10 +- actix-http/src/h1/dispatcher.rs | 308 ++++++++++++++++++-------------- actix-http/src/h1/utils.rs | 2 +- 3 files changed, 181 insertions(+), 139 deletions(-) diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 74e6e218d..26134723d 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -33,7 +33,7 @@ impl BodySize { } /// Type that provides this trait can be streamed to a peer. -pub trait MessageBody: Unpin { +pub trait MessageBody { fn size(&self) -> BodySize; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>>; @@ -53,14 +53,13 @@ impl MessageBody for () { } } -impl MessageBody for Box { +impl MessageBody for Box { fn size(&self) -> BodySize { self.as_ref().size() } fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let a: Pin<&mut T> = Pin::new(self.get_mut().as_mut()); - a.poll_next(cx) + unsafe { self.map_unchecked_mut(|boxed| boxed.as_mut()) }.poll_next(cx) } } @@ -70,8 +69,7 @@ impl MessageBody for Box { } fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let a: Pin<&mut dyn MessageBody> = Pin::new(self.get_mut().as_mut()); - a.poll_next(cx) + unsafe { Pin::new_unchecked(self.get_mut().as_mut()) }.poll_next(cx) } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 0f897561d..7429c50f7 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -10,6 +10,7 @@ use actix_service::Service; use bitflags::bitflags; use bytes::{Buf, BytesMut}; use log::{error, trace}; +use pin_project::pin_project; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::cloneable::CloneableService; @@ -41,6 +42,7 @@ bitflags! { } } +#[pin_project::pin_project] /// Dispatcher for HTTP/1.1 protocol pub struct Dispatcher where @@ -52,9 +54,11 @@ where U: Service), Response = ()>, U::Error: fmt::Display, { + #[pin] inner: DispatcherState, } +#[pin_project] enum DispatcherState where S: Service, @@ -65,11 +69,12 @@ where U: Service), Response = ()>, U::Error: fmt::Display, { - Normal(InnerDispatcher), - Upgrade(Pin>), + Normal(#[pin] InnerDispatcher), + Upgrade(#[pin] U::Future), None, } +#[pin_project] struct InnerDispatcher where S: Service, @@ -88,6 +93,7 @@ where peer_addr: Option, error: Option, + #[pin] state: State, payload: Option, messages: VecDeque, @@ -107,6 +113,7 @@ enum DispatcherMessage { Error(Response<()>), } +#[pin_project] enum State where S: Service, @@ -114,9 +121,9 @@ where B: MessageBody, { None, - ExpectCall(Pin>), - ServiceCall(Pin>), - SendPayload(ResponseBody), + ExpectCall(#[pin] X::Future), + ServiceCall(#[pin] S::Future), + SendPayload(#[pin] ResponseBody), } impl State @@ -142,6 +149,21 @@ where } } +impl DispatcherState +where + S: Service, + S::Error: Into, + B: MessageBody, + X: Service, + X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, +{ + fn take(self: Pin<&mut Self>) -> Self { + std::mem::replace(unsafe { self.get_unchecked_mut() }, Self::None) + } +} + enum PollResponse { Upgrade(Request), DoNothing, @@ -278,10 +300,11 @@ where } // if checked is set to true, delay disconnect until all tasks have finished. - fn client_disconnected(&mut self) { - self.flags + fn client_disconnected(self: Pin<&mut Self>) { + let this = self.project(); + this.flags .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } } @@ -290,16 +313,18 @@ where /// /// true - got whouldblock /// false - didnt get whouldblock - fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result { + #[pin_project::project] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result { if self.write_buf.is_empty() { return Ok(false); } let len = self.write_buf.len(); let mut written = 0; + #[project] + let InnerDispatcher { mut io, write_buf, .. } = self.project(); while written < len { - match Pin::new(&mut self.io) - .poll_write(cx, &self.write_buf[written..]) + match Pin::new(&mut io).poll_write(cx, &write_buf[written..]) { Poll::Ready(Ok(0)) => { return Err(DispatchError::Io(io::Error::new( @@ -312,113 +337,120 @@ where } Poll::Pending => { if written > 0 { - self.write_buf.advance(written); + write_buf.advance(written); } return Ok(true); } Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)), } } - if written == self.write_buf.len() { - unsafe { self.write_buf.set_len(0) } + if written == write_buf.len() { + unsafe { write_buf.set_len(0) } } else { - self.write_buf.advance(written); + write_buf.advance(written); } Ok(false) } fn send_response( - &mut self, + self: Pin<&mut Self>, message: Response<()>, body: ResponseBody, ) -> Result, DispatchError> { - self.codec - .encode(Message::Item((message, body.size())), &mut self.write_buf) + let mut this = self.project(); + this.codec + .encode(Message::Item((message, body.size())), &mut this.write_buf) .map_err(|err| { - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } DispatchError::Io(err) })?; - self.flags.set(Flags::KEEPALIVE, self.codec.keepalive()); + this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); match body.size() { BodySize::None | BodySize::Empty => Ok(State::None), _ => Ok(State::SendPayload(body)), } } - fn send_continue(&mut self) { - self.write_buf + fn send_continue(self: Pin<&mut Self>) { + self.project().write_buf .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); } + #[pin_project::project] fn poll_response( - &mut self, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { loop { - let state = match self.state { - State::None => match self.messages.pop_front() { + let mut this = self.as_mut().project(); + #[project] + let state = match this.state.project() { + State::None => match this.messages.pop_front() { Some(DispatcherMessage::Item(req)) => { - Some(self.handle_request(req, cx)?) + Some(self.as_mut().handle_request(req, cx)?) } Some(DispatcherMessage::Error(res)) => { - Some(self.send_response(res, ResponseBody::Other(Body::Empty))?) + Some(self.as_mut().send_response(res, ResponseBody::Other(Body::Empty))?) } Some(DispatcherMessage::Upgrade(req)) => { return Ok(PollResponse::Upgrade(req)); } None => None, }, - State::ExpectCall(ref mut fut) => { - match fut.as_mut().poll(cx) { + State::ExpectCall(fut) => { + match fut.poll(cx) { Poll::Ready(Ok(req)) => { - self.send_continue(); - self.state = State::ServiceCall(Box::pin(self.service.call(req))); + self.as_mut().send_continue(); + this = self.as_mut().project(); + this.state.set(State::ServiceCall(this.service.call(req))); continue; } Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); - Some(self.send_response(res, body.into_body())?) + Some(self.as_mut().send_response(res, body.into_body())?) } Poll::Pending => None, } } - State::ServiceCall(ref mut fut) => { - match fut.as_mut().poll(cx) { + State::ServiceCall(fut) => { + match fut.poll(cx) { Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); - self.state = self.send_response(res, body)?; + let state = self.as_mut().send_response(res, body)?; + this = self.as_mut().project(); + this.state.set(state); continue; } Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); - Some(self.send_response(res, body.into_body())?) + Some(self.as_mut().send_response(res, body.into_body())?) } Poll::Pending => None, } } - State::SendPayload(ref mut stream) => { - let mut stream = Pin::new(stream); + State::SendPayload(mut stream) => { loop { - if self.write_buf.len() < HW_BUFFER_SIZE { + if this.write_buf.len() < HW_BUFFER_SIZE { match stream.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { - self.codec.encode( + this.codec.encode( Message::Chunk(Some(item)), - &mut self.write_buf, + &mut this.write_buf, )?; continue; } Poll::Ready(None) => { - self.codec.encode( + this.codec.encode( Message::Chunk(None), - &mut self.write_buf, + &mut this.write_buf, )?; - self.state = State::None; + this = self.as_mut().project(); + this.state.set(State::None); } Poll::Ready(Some(Err(_))) => { return Err(DispatchError::Unknown) @@ -434,9 +466,11 @@ where } }; + this = self.as_mut().project(); + // set new state if let Some(state) = state { - self.state = state; + this.state.set(state); if !self.state.is_empty() { continue; } @@ -444,7 +478,7 @@ where // if read-backpressure is enabled and we consumed some data. // we may read more data and retry if self.state.is_call() { - if self.poll_request(cx)? { + if self.as_mut().poll_request(cx)? { continue; } } else if !self.messages.is_empty() { @@ -458,16 +492,16 @@ where } fn handle_request( - &mut self, + mut self: Pin<&mut Self>, req: Request, cx: &mut Context<'_>, ) -> Result, DispatchError> { // Handle `EXPECT: 100-Continue` header let req = if req.head().expect() { - let mut task = Box::pin(self.expect.call(req)); - match task.as_mut().poll(cx) { + let mut task = self.as_mut().project().expect.call(req); + match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { Poll::Ready(Ok(req)) => { - self.send_continue(); + self.as_mut().send_continue(); req } Poll::Pending => return Ok(State::ExpectCall(task)), @@ -483,8 +517,8 @@ where }; // Call service - let mut task = Box::pin(self.service.call(req)); - match task.as_mut().poll(cx) { + let mut task = self.as_mut().project().service.call(req); + match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.send_response(res, body) @@ -500,7 +534,7 @@ where /// Process one incoming requests pub(self) fn poll_request( - &mut self, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { // limit a mount of non processed requests @@ -509,24 +543,25 @@ where } let mut updated = false; + let mut this = self.as_mut().project(); loop { - match self.codec.decode(&mut self.read_buf) { + match this.codec.decode(&mut this.read_buf) { Ok(Some(msg)) => { updated = true; - self.flags.insert(Flags::STARTED); + this.flags.insert(Flags::STARTED); match msg { Message::Item(mut req) => { - let pl = self.codec.message_type(); - req.head_mut().peer_addr = self.peer_addr; + let pl = this.codec.message_type(); + req.head_mut().peer_addr = *this.peer_addr; // set on_connect data - if let Some(ref on_connect) = self.on_connect { + if let Some(ref on_connect) = this.on_connect { on_connect.set(&mut req.extensions_mut()); } - if pl == MessageType::Stream && self.upgrade.is_some() { - self.messages.push_back(DispatcherMessage::Upgrade(req)); + if pl == MessageType::Stream && this.upgrade.is_some() { + this.messages.push_back(DispatcherMessage::Upgrade(req)); break; } if pl == MessageType::Payload || pl == MessageType::Stream { @@ -534,41 +569,43 @@ where let (req1, _) = req.replace_payload(crate::Payload::H1(pl)); req = req1; - self.payload = Some(ps); + *this.payload = Some(ps); } // handle request early - if self.state.is_empty() { - self.state = self.handle_request(req, cx)?; + if this.state.is_empty() { + let state = self.as_mut().handle_request(req, cx)?; + this = self.as_mut().project(); + this.state.set(state); } else { - self.messages.push_back(DispatcherMessage::Item(req)); + this.messages.push_back(DispatcherMessage::Item(req)); } } Message::Chunk(Some(chunk)) => { - if let Some(ref mut payload) = self.payload { + if let Some(ref mut payload) = this.payload { payload.feed_data(chunk); } else { error!( "Internal server error: unexpected payload chunk" ); - self.flags.insert(Flags::READ_DISCONNECT); - self.messages.push_back(DispatcherMessage::Error( + this.flags.insert(Flags::READ_DISCONNECT); + this.messages.push_back(DispatcherMessage::Error( Response::InternalServerError().finish().drop_body(), )); - self.error = Some(DispatchError::InternalError); + *this.error = Some(DispatchError::InternalError); break; } } Message::Chunk(None) => { - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.feed_eof(); } else { error!("Internal server error: unexpected eof"); - self.flags.insert(Flags::READ_DISCONNECT); - self.messages.push_back(DispatcherMessage::Error( + this.flags.insert(Flags::READ_DISCONNECT); + this.messages.push_back(DispatcherMessage::Error( Response::InternalServerError().finish().drop_body(), )); - self.error = Some(DispatchError::InternalError); + *this.error = Some(DispatchError::InternalError); break; } } @@ -576,44 +613,46 @@ where } Ok(None) => break, Err(ParseError::Io(e)) => { - self.client_disconnected(); - self.error = Some(DispatchError::Io(e)); + self.as_mut().client_disconnected(); + this = self.as_mut().project(); + *this.error = Some(DispatchError::Io(e)); break; } Err(e) => { - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); } // Malformed requests should be responded with 400 - self.messages.push_back(DispatcherMessage::Error( + this.messages.push_back(DispatcherMessage::Error( Response::BadRequest().finish().drop_body(), )); - self.flags.insert(Flags::READ_DISCONNECT); - self.error = Some(e.into()); + this.flags.insert(Flags::READ_DISCONNECT); + *this.error = Some(e.into()); break; } } } - if updated && self.ka_timer.is_some() { - if let Some(expire) = self.codec.config().keep_alive_expire() { - self.ka_expire = expire; + if updated && this.ka_timer.is_some() { + if let Some(expire) = this.codec.config().keep_alive_expire() { + *this.ka_expire = expire; } } Ok(updated) } /// keep-alive timer - fn poll_keepalive(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchError> { - if self.ka_timer.is_none() { + fn poll_keepalive(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> { + let mut this = self.as_mut().project(); + if this.ka_timer.is_none() { // shutdown timeout - if self.flags.contains(Flags::SHUTDOWN) { - if let Some(interval) = self.codec.config().client_disconnect_timer() { - self.ka_timer = Some(delay_until(interval)); + if this.flags.contains(Flags::SHUTDOWN) { + if let Some(interval) = this.codec.config().client_disconnect_timer() { + *this.ka_timer = Some(delay_until(interval)); } else { - self.flags.insert(Flags::READ_DISCONNECT); - if let Some(mut payload) = self.payload.take() { + this.flags.insert(Flags::READ_DISCONNECT); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } return Ok(()); @@ -623,55 +662,56 @@ where } } - match Pin::new(&mut self.ka_timer.as_mut().unwrap()).poll(cx) { + match Pin::new(&mut this.ka_timer.as_mut().unwrap()).poll(cx) { Poll::Ready(()) => { // if we get timeout during shutdown, drop connection - if self.flags.contains(Flags::SHUTDOWN) { + if this.flags.contains(Flags::SHUTDOWN) { return Err(DispatchError::DisconnectTimeout); - } else if self.ka_timer.as_mut().unwrap().deadline() >= self.ka_expire { + } else if this.ka_timer.as_mut().unwrap().deadline() >= *this.ka_expire { // check for any outstanding tasks - if self.state.is_empty() && self.write_buf.is_empty() { - if self.flags.contains(Flags::STARTED) { + if this.state.is_empty() && this.write_buf.is_empty() { + if this.flags.contains(Flags::STARTED) { trace!("Keep-alive timeout, close connection"); - self.flags.insert(Flags::SHUTDOWN); + this.flags.insert(Flags::SHUTDOWN); // start shutdown timer if let Some(deadline) = - self.codec.config().client_disconnect_timer() + this.codec.config().client_disconnect_timer() { - if let Some(mut timer) = self.ka_timer.as_mut() { + if let Some(mut timer) = this.ka_timer.as_mut() { timer.reset(deadline); let _ = Pin::new(&mut timer).poll(cx); } } else { // no shutdown timeout, drop socket - self.flags.insert(Flags::WRITE_DISCONNECT); + this.flags.insert(Flags::WRITE_DISCONNECT); return Ok(()); } } else { // timeout on first request (slow request) return 408 - if !self.flags.contains(Flags::STARTED) { + if !this.flags.contains(Flags::STARTED) { trace!("Slow request timeout"); - let _ = self.send_response( + let _ = self.as_mut().send_response( Response::RequestTimeout().finish().drop_body(), ResponseBody::Other(Body::Empty), ); + this = self.as_mut().project(); } else { trace!("Keep-alive connection timeout"); } - self.flags.insert(Flags::STARTED | Flags::SHUTDOWN); - self.state = State::None; + this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); + this.state.set(State::None); } } else if let Some(deadline) = - self.codec.config().keep_alive_expire() + this.codec.config().keep_alive_expire() { - if let Some(mut timer) = self.ka_timer.as_mut() { + if let Some(mut timer) = this.ka_timer.as_mut() { timer.reset(deadline); let _ = Pin::new(&mut timer).poll(cx); } } - } else if let Some(mut timer) = self.ka_timer.as_mut() { - timer.reset(self.ka_expire); + } else if let Some(mut timer) = this.ka_timer.as_mut() { + timer.reset(*this.ka_expire); let _ = Pin::new(&mut timer).poll(cx); } } @@ -696,22 +736,25 @@ where { type Output = Result<(), DispatchError>; + #[pin_project::project] #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().inner { - DispatcherState::Normal(ref mut inner) => { - inner.poll_keepalive(cx)?; + let this = self.as_mut().project(); + #[project] + match this.inner.project() { + DispatcherState::Normal(mut inner) => { + inner.as_mut().poll_keepalive(cx)?; if inner.flags.contains(Flags::SHUTDOWN) { if inner.flags.contains(Flags::WRITE_DISCONNECT) { Poll::Ready(Ok(())) } else { // flush buffer - inner.poll_flush(cx)?; + inner.as_mut().poll_flush(cx)?; if !inner.write_buf.is_empty() { Poll::Pending } else { - match Pin::new(&mut inner.io).poll_shutdown(cx) { + match Pin::new(inner.project().io).poll_shutdown(cx) { Poll::Ready(res) => { Poll::Ready(res.map_err(DispatchError::from)) } @@ -723,33 +766,34 @@ where // read socket into a buf let should_disconnect = if !inner.flags.contains(Flags::READ_DISCONNECT) { - read_available(cx, &mut inner.io, &mut inner.read_buf)? + let mut inner_p = inner.as_mut().project(); + read_available(cx, &mut inner_p.io, &mut inner_p.read_buf)? } else { None }; - inner.poll_request(cx)?; + inner.as_mut().poll_request(cx)?; if let Some(true) = should_disconnect { - inner.flags.insert(Flags::READ_DISCONNECT); - if let Some(mut payload) = inner.payload.take() { + let inner_p = inner.as_mut().project(); + inner_p.flags.insert(Flags::READ_DISCONNECT); + if let Some(mut payload) = inner_p.payload.take() { payload.feed_eof(); } }; loop { + let inner_p = inner.as_mut().project(); let remaining = - inner.write_buf.capacity() - inner.write_buf.len(); + inner_p.write_buf.capacity() - inner_p.write_buf.len(); if remaining < LW_BUFFER_SIZE { - inner.write_buf.reserve(HW_BUFFER_SIZE - remaining); + inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining); } - let result = inner.poll_response(cx)?; + let result = inner.as_mut().poll_response(cx)?; let drain = result == PollResponse::DrainWriteBuf; // switch to upgrade handler if let PollResponse::Upgrade(req) = result { - if let DispatcherState::Normal(inner) = - std::mem::replace(&mut self.inner, DispatcherState::None) - { + if let DispatcherState::Normal(inner) = self.as_mut().project().inner.take() { let mut parts = FramedParts::with_read_buf( inner.io, inner.codec, @@ -757,9 +801,8 @@ where ); parts.write_buf = inner.write_buf; let framed = Framed::from_parts(parts); - self.inner = DispatcherState::Upgrade( - Box::pin(inner.upgrade.unwrap().call((req, framed))), - ); + let upgrade = inner.upgrade.unwrap().call((req, framed)); + self.as_mut().project().inner.set(DispatcherState::Upgrade(upgrade)); return self.poll(cx); } else { panic!() @@ -769,7 +812,7 @@ where // we didnt get WouldBlock from write operation, // so data get written to kernel completely (OSX) // and we have to write again otherwise response can get stuck - if inner.poll_flush(cx)? || !drain { + if inner.as_mut().poll_flush(cx)? || !drain { break; } } @@ -781,25 +824,26 @@ where let is_empty = inner.state.is_empty(); + let inner_p = inner.as_mut().project(); // read half is closed and we do not processing any responses - if inner.flags.contains(Flags::READ_DISCONNECT) && is_empty { - inner.flags.insert(Flags::SHUTDOWN); + if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { + inner_p.flags.insert(Flags::SHUTDOWN); } // keep-alive and stream errors - if is_empty && inner.write_buf.is_empty() { - if let Some(err) = inner.error.take() { + if is_empty && inner_p.write_buf.is_empty() { + if let Some(err) = inner_p.error.take() { Poll::Ready(Err(err)) } // disconnect if keep-alive is not enabled - else if inner.flags.contains(Flags::STARTED) - && !inner.flags.intersects(Flags::KEEPALIVE) + else if inner_p.flags.contains(Flags::STARTED) + && !inner_p.flags.intersects(Flags::KEEPALIVE) { - inner.flags.insert(Flags::SHUTDOWN); + inner_p.flags.insert(Flags::SHUTDOWN); self.poll(cx) } // disconnect if shutdown - else if inner.flags.contains(Flags::SHUTDOWN) { + else if inner_p.flags.contains(Flags::SHUTDOWN) { self.poll(cx) } else { Poll::Pending diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index be6a42793..89013129a 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -36,7 +36,7 @@ where impl Future for SendResponse where T: AsyncRead + AsyncWrite, - B: MessageBody, + B: MessageBody + Unpin, { type Output = Result, Error>; From 69dab0063c27bf4ac5b684884d787e7772da1c50 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Mon, 10 Feb 2020 12:12:23 +0200 Subject: [PATCH 10/21] Get rid of one more unsafe --- actix-http/src/h1/dispatcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 7429c50f7..6e226a30d 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -853,8 +853,8 @@ where } } } - DispatcherState::Upgrade(ref mut fut) => { - fut.as_mut().poll(cx).map_err(|e| { + DispatcherState::Upgrade(fut) => { + fut.poll(cx).map_err(|e| { error!("Upgrade handler error: {}", e); DispatchError::Upgrade }) From c05f9475c547bdc09049a6e939e2b0acac3c0a0c Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Mon, 10 Feb 2020 13:17:38 +0200 Subject: [PATCH 11/21] refactor dispatcher to avoid possible UB with DispatcherState Pin --- actix-http/src/h1/dispatcher.rs | 62 +++++++++++---------------------- 1 file changed, 21 insertions(+), 41 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 6e226a30d..043271cb5 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -71,7 +71,6 @@ where { Normal(#[pin] InnerDispatcher), Upgrade(#[pin] U::Future), - None, } #[pin_project] @@ -101,7 +100,7 @@ where ka_expire: Instant, ka_timer: Option, - io: T, + io: Option, read_buf: BytesMut, write_buf: BytesMut, codec: Codec, @@ -148,22 +147,6 @@ where } } } - -impl DispatcherState -where - S: Service, - S::Error: Into, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service), Response = ()>, - U::Error: fmt::Display, -{ - fn take(self: Pin<&mut Self>) -> Self { - std::mem::replace(unsafe { self.get_unchecked_mut() }, Self::None) - } -} - enum PollResponse { Upgrade(Request), DoNothing, @@ -258,7 +241,7 @@ where state: State::None, error: None, messages: VecDeque::new(), - io, + io: Some(io), codec, read_buf, service, @@ -322,9 +305,10 @@ where let len = self.write_buf.len(); let mut written = 0; #[project] - let InnerDispatcher { mut io, write_buf, .. } = self.project(); + let InnerDispatcher { io, write_buf, .. } = self.project(); + let mut io = Pin::new(io.as_mut().unwrap()); while written < len { - match Pin::new(&mut io).poll_write(cx, &write_buf[written..]) + match io.as_mut().poll_write(cx, &write_buf[written..]) { Poll::Ready(Ok(0)) => { return Err(DispatchError::Io(io::Error::new( @@ -751,10 +735,10 @@ where } else { // flush buffer inner.as_mut().poll_flush(cx)?; - if !inner.write_buf.is_empty() { + if !inner.write_buf.is_empty() || inner.io.is_none() { Poll::Pending } else { - match Pin::new(inner.project().io).poll_shutdown(cx) { + match Pin::new(inner.project().io).as_pin_mut().unwrap().poll_shutdown(cx) { Poll::Ready(res) => { Poll::Ready(res.map_err(DispatchError::from)) } @@ -767,7 +751,7 @@ where let should_disconnect = if !inner.flags.contains(Flags::READ_DISCONNECT) { let mut inner_p = inner.as_mut().project(); - read_available(cx, &mut inner_p.io, &mut inner_p.read_buf)? + read_available(cx, inner_p.io.as_mut().unwrap(), &mut inner_p.read_buf)? } else { None }; @@ -793,20 +777,17 @@ where // switch to upgrade handler if let PollResponse::Upgrade(req) = result { - if let DispatcherState::Normal(inner) = self.as_mut().project().inner.take() { - let mut parts = FramedParts::with_read_buf( - inner.io, - inner.codec, - inner.read_buf, - ); - parts.write_buf = inner.write_buf; - let framed = Framed::from_parts(parts); - let upgrade = inner.upgrade.unwrap().call((req, framed)); - self.as_mut().project().inner.set(DispatcherState::Upgrade(upgrade)); - return self.poll(cx); - } else { - panic!() - } + let inner_p = inner.as_mut().project(); + let mut parts = FramedParts::with_read_buf( + inner_p.io.take().unwrap(), + std::mem::take(inner_p.codec), + std::mem::take(inner_p.read_buf), + ); + parts.write_buf = std::mem::take(inner_p.write_buf); + let framed = Framed::from_parts(parts); + let upgrade = inner_p.upgrade.take().unwrap().call((req, framed)); + self.as_mut().project().inner.set(DispatcherState::Upgrade(upgrade)); + return self.poll(cx); } // we didnt get WouldBlock from write operation, @@ -859,7 +840,6 @@ where DispatchError::Upgrade }) } - DispatcherState::None => panic!(), } } } @@ -949,9 +929,9 @@ mod tests { Poll::Ready(res) => assert!(res.is_err()), } - if let DispatcherState::Normal(ref inner) = h1.inner { + if let DispatcherState::Normal(ref mut inner) = h1.inner { assert!(inner.flags.contains(Flags::READ_DISCONNECT)); - assert_eq!(&inner.io.write_buf[..26], b"HTTP/1.1 400 Bad Request\r\n"); + assert_eq!(&inner.io.take().unwrap().write_buf[..26], b"HTTP/1.1 400 Bad Request\r\n"); } }) .await; From a84b37199af18d198f4a28c559a932804c8c92e7 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Mon, 10 Feb 2020 15:06:11 +0200 Subject: [PATCH 12/21] Add Unpin to Body to get rid of unsafe in MessageBody --- actix-http/src/body.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 26134723d..912f22e33 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -59,21 +59,10 @@ impl MessageBody for Box { } fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - unsafe { self.map_unchecked_mut(|boxed| boxed.as_mut()) }.poll_next(cx) + Pin::new(self.get_mut().as_mut()).poll_next(cx) } } -impl MessageBody for Box { - fn size(&self) -> BodySize { - self.as_ref().size() - } - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - unsafe { Pin::new_unchecked(self.get_mut().as_mut()) }.poll_next(cx) - } -} - - #[pin_project] pub enum ResponseBody { Body(#[pin] B), @@ -149,7 +138,7 @@ pub enum Body { /// Specific response body. Bytes(Bytes), /// Generic message body. - Message(#[pin] Box), + Message(Box), } impl Body { @@ -159,7 +148,7 @@ impl Body { } /// Create body from generic message body. - pub fn from_message(body: B) -> Body { + pub fn from_message(body: B) -> Body { Body::Message(Box::new(body)) } } @@ -188,7 +177,7 @@ impl MessageBody for Body { Poll::Ready(Some(Ok(mem::replace(bin, Bytes::new())))) } } - Body::Message(body) => body.poll_next(cx), + Body::Message(ref mut body) => Pin::new(body.as_mut()).poll_next(cx), } } } From e6078bf79272e4c1bfbc0493431d1b356d883fac Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Mon, 10 Feb 2020 15:19:56 +0200 Subject: [PATCH 13/21] Fix EncoderBody enum to align with Body::Message --- actix-http/src/encoding/encoder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 6530609e1..8a075e8b7 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -83,7 +83,7 @@ impl Encoder { enum EncoderBody { Bytes(Bytes), Stream(#[pin] B), - BoxedStream(#[pin] Box), + BoxedStream(Box), } impl MessageBody for EncoderBody { @@ -107,7 +107,7 @@ impl MessageBody for EncoderBody { } } EncoderBody::Stream(b) => b.poll_next(cx), - EncoderBody::BoxedStream(b) => b.poll_next(cx), + EncoderBody::BoxedStream(ref mut b) => Pin::new(b.as_mut()).poll_next(cx), } } } From de815dd99cfd95e2759080a7d27535b70b6544b4 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Mon, 10 Feb 2020 16:19:48 +0200 Subject: [PATCH 14/21] Fixed condition for finishing transfer of response --- actix-http/src/h1/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 89013129a..c44925c7a 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -86,7 +86,7 @@ where continue; } - if body_done { + if !body_done { if body_ready { continue; } else { From 78749a4b7e81fe2f37f4f84469d83c43264a0b08 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Sat, 15 Feb 2020 17:26:46 +0200 Subject: [PATCH 15/21] rollback actix-http version change --- Cargo.toml | 2 +- actix-http/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 550df49dc..af400b7b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ codegen-units = 1 [patch.crates-io] actix-web = { path = "." } -actix-http = { path = "actix-http" } +#actix-http = { path = "actix-http" } actix-http-test = { path = "test-server" } actix-web-codegen = { path = "actix-web-codegen" } actix-cors = { path = "actix-cors" } diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index dfb467d40..212129331 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "2.0.0-alpha" +version = "1.0.1" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" From 0a86907dd228f8267940b67ad1b9883474f47625 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Sun, 16 Feb 2020 19:48:09 +0200 Subject: [PATCH 16/21] use mem::replace instead of mem::take rust 1.40+ --- actix-http/src/h1/dispatcher.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 043271cb5..4bfcabab8 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -780,10 +780,10 @@ where let inner_p = inner.as_mut().project(); let mut parts = FramedParts::with_read_buf( inner_p.io.take().unwrap(), - std::mem::take(inner_p.codec), - std::mem::take(inner_p.read_buf), + std::mem::replace(inner_p.codec, Codec::default()), + std::mem::replace(inner_p.read_buf, BytesMut::default()), ); - parts.write_buf = std::mem::take(inner_p.write_buf); + parts.write_buf = std::mem::replace(inner_p.write_buf, BytesMut::default()); let framed = Framed::from_parts(parts); let upgrade = inner_p.upgrade.take().unwrap().call((req, framed)); self.as_mut().project().inner.set(DispatcherState::Upgrade(upgrade)); From e5f2feec45147321b05ece99a049aa4eb1b52025 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Tue, 18 Feb 2020 20:28:45 +0200 Subject: [PATCH 17/21] reenable actix-http from local path --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index af400b7b4..550df49dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ codegen-units = 1 [patch.crates-io] actix-web = { path = "." } -#actix-http = { path = "actix-http" } +actix-http = { path = "actix-http" } actix-http-test = { path = "test-server" } actix-web-codegen = { path = "actix-web-codegen" } actix-cors = { path = "actix-cors" } From 77058ef779449ad53a2ed9dd5b62b8c718b2bed3 Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Tue, 18 Feb 2020 20:48:37 +0200 Subject: [PATCH 18/21] adopt MessageBody Pin changes to actix-web root --- src/middleware/logger.rs | 17 ++++++++++++----- src/test.rs | 6 +++--- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index d692132ce..e40fe648a 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -238,15 +238,20 @@ where } } +use pin_project::{pin_project, pinned_drop}; + +#[pin_project(PinnedDrop)] pub struct StreamLog { + #[pin] body: ResponseBody, format: Option, size: usize, time: OffsetDateTime, } -impl Drop for StreamLog { - fn drop(&mut self) { +#[pinned_drop] +impl PinnedDrop for StreamLog { + fn drop(self: Pin<&mut Self>) { if let Some(ref format) = self.format { let render = |fmt: &mut Formatter<'_>| { for unit in &format.0 { @@ -259,15 +264,17 @@ impl Drop for StreamLog { } } + impl MessageBody for StreamLog { fn size(&self) -> BodySize { self.body.size() } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self.body.poll_next(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let this = self.project(); + match this.body.poll_next(cx) { Poll::Ready(Some(Ok(chunk))) => { - self.size += chunk.len(); + *this.size += chunk.len(); Poll::Ready(Some(Ok(chunk))) } val => val, diff --git a/src/test.rs b/src/test.rs index 6a6ef27c5..0eb02ff7c 100644 --- a/src/test.rs +++ b/src/test.rs @@ -150,7 +150,7 @@ where pub async fn read_response(app: &mut S, req: Request) -> Bytes where S: Service, Error = Error>, - B: MessageBody, + B: MessageBody + Unpin, { let mut resp = app .call(req) @@ -193,7 +193,7 @@ where /// ``` pub async fn read_body(mut res: ServiceResponse) -> Bytes where - B: MessageBody, + B: MessageBody + Unpin, { let mut body = res.take_body(); let mut bytes = BytesMut::new(); @@ -251,7 +251,7 @@ where pub async fn read_response_json(app: &mut S, req: Request) -> T where S: Service, Error = Error>, - B: MessageBody, + B: MessageBody + Unpin, T: DeserializeOwned, { let body = read_response(app, req).await; From ea28219d0fd6b7d3b97c84947c2c80ecebe1383f Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Tue, 18 Feb 2020 23:04:47 +0200 Subject: [PATCH 19/21] reenable actix-http test-ws --- actix-http/tests/test_ws.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index 2f2a28e2f..7152fee48 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -81,9 +81,6 @@ async fn service(msg: ws::Frame) -> Result { Ok(msg) } -/* -Temporarily commented out due to dependency on actix-http-test - #[actix_rt::test] async fn test_simple() { let ws_service = WsService::new(); @@ -195,5 +192,3 @@ async fn test_simple() { assert!(ws_service.was_polled()); } - -*/ \ No newline at end of file From cd1765035cf5fec4f83bb0eb84ccdfc4d5b479a5 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Wed, 26 Feb 2020 09:41:15 +0900 Subject: [PATCH 20/21] Avoid re-definition --- actix-http/src/error.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 8a1c1b5dc..4b8f13cf0 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -59,12 +59,6 @@ impl Error { } } -/// A struct with a private constructor, for use with -/// `__private_get_type_id__`. Its single field is private, -/// ensuring that it can only be constructed from this module -#[doc(hidden)] -pub struct PrivateHelper(()); - /// Error that can be converted to `Response` pub trait ResponseError: fmt::Debug + fmt::Display { /// Response's status code From d3ccf46e9216a86cbac9777c3a88b8f7ab8a35e5 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 27 Feb 2020 09:53:27 +0900 Subject: [PATCH 21/21] Clean-up metadata --- Cargo.toml | 2 +- actix-identity/Cargo.toml | 2 +- awc/Cargo.toml | 2 +- test-server/Cargo.toml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 550df49dc..73d068966 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ actix-threadpool = "0.3.1" actix-tls = "1.0.0" actix-web-codegen = "0.2.0" -actix-http = { version = "1.0.1" } +actix-http = "1.0.1" awc = { version = "1.0.1", default-features = false } bytes = "0.5.3" diff --git a/actix-identity/Cargo.toml b/actix-identity/Cargo.toml index 910aef48e..f97b66291 100644 --- a/actix-identity/Cargo.toml +++ b/actix-identity/Cargo.toml @@ -25,5 +25,5 @@ time = { version = "0.2.5", default-features = false, features = ["std"] } [dev-dependencies] actix-rt = "1.0.0" -actix-http = { version = "1.0.1" } +actix-http = "1.0.1" bytes = "0.5.4" diff --git a/awc/Cargo.toml b/awc/Cargo.toml index f7d5634e0..37392d28e 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -36,7 +36,7 @@ compress = ["actix-http/compress"] [dependencies] actix-codec = "0.2.0" actix-service = "1.0.1" -actix-http = { version = "1.0.1" } +actix-http = "1.0.1" actix-rt = "1.0.0" base64 = "0.11" diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index f4ec1e238..a458fb341 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -37,7 +37,7 @@ actix-utils = "1.0.3" actix-rt = "1.0.0" actix-server = "1.0.0" actix-testing = "1.0.0" -awc = { version = "1.0.1" } +awc = "1.0.1" base64 = "0.11" bytes = "0.5.3"