1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-24 00:21:08 +01:00

Merge pull request #1367 from actix/msg-body

Merge `MessageBody` improvements
This commit is contained in:
Yuki Okushi 2020-02-27 10:42:14 +09:00 committed by GitHub
commit 95c18dbdf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 414 additions and 315 deletions

View File

@ -33,8 +33,8 @@ members = [
"actix-cors", "actix-cors",
"actix-files", "actix-files",
"actix-framed", "actix-framed",
"actix-session", # "actix-session",
"actix-identity", # "actix-identity",
"actix-multipart", "actix-multipart",
"actix-web-actors", "actix-web-actors",
"actix-web-codegen", "actix-web-codegen",

View File

@ -8,6 +8,10 @@
* Moved actors messages support from actix crate, enabled with feature `actors`. * Moved actors messages support from actix crate, enabled with feature `actors`.
* Breaking change: trait MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next().
* MessageBody is not implemented for &'static [u8] anymore.
### Fixed ### Fixed
* Allow `SameSite=None` cookies to be sent in a response. * Allow `SameSite=None` cookies to be sent in a response.

View File

@ -43,10 +43,10 @@ secure-cookies = ["ring"]
actors = ["actix"] actors = ["actix"]
[dependencies] [dependencies]
actix-service = "1.0.1" actix-service = "1.0.5"
actix-codec = "0.2.0" actix-codec = "0.2.0"
actix-connect = "1.0.1" actix-connect = "1.0.2"
actix-utils = "1.0.3" actix-utils = "1.0.6"
actix-rt = "1.0.0" actix-rt = "1.0.0"
actix-threadpool = "0.3.1" actix-threadpool = "0.3.1"
actix-tls = { version = "1.0.0", optional = true } actix-tls = { version = "1.0.0", optional = true }
@ -93,8 +93,8 @@ flate2 = { version = "1.0.13", optional = true }
fail-ure = { version = "0.1.5", package="failure", optional = true } fail-ure = { version = "0.1.5", package="failure", optional = true }
[dev-dependencies] [dev-dependencies]
actix-server = "1.0.0" actix-server = "1.0.1"
actix-connect = { version = "1.0.0", features=["openssl"] } actix-connect = { version = "1.0.2", features=["openssl"] }
actix-http-test = { version = "1.0.0", features=["openssl"] } actix-http-test = { version = "1.0.0", features=["openssl"] }
actix-tls = { version = "1.0.0", features=["openssl"] } actix-tls = { version = "1.0.0", features=["openssl"] }
criterion = "0.3" criterion = "0.3"

View File

@ -36,7 +36,7 @@ impl BodySize {
pub trait MessageBody { pub trait MessageBody {
fn size(&self) -> BodySize; fn size(&self) -> BodySize;
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>>;
downcast_get_type_id!(); downcast_get_type_id!();
} }
@ -48,25 +48,25 @@ impl MessageBody for () {
BodySize::Empty BodySize::Empty
} }
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
Poll::Ready(None) Poll::Ready(None)
} }
} }
impl<T: MessageBody> MessageBody for Box<T> { impl<T: MessageBody + Unpin> MessageBody for Box<T> {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
self.as_ref().size() self.as_ref().size()
} }
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
self.as_mut().poll_next(cx) Pin::new(self.get_mut().as_mut()).poll_next(cx)
} }
} }
#[pin_project] #[pin_project]
pub enum ResponseBody<B> { pub enum ResponseBody<B> {
Body(B), Body(#[pin] B),
Other(Body), Other(#[pin] Body),
} }
impl ResponseBody<Body> { impl ResponseBody<Body> {
@ -102,10 +102,12 @@ impl<B: MessageBody> MessageBody for ResponseBody<B> {
} }
} }
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { #[project]
match self { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
ResponseBody::Body(ref mut body) => body.poll_next(cx), #[project]
ResponseBody::Other(ref mut body) => body.poll_next(cx), match self.project() {
ResponseBody::Body(body) => body.poll_next(cx),
ResponseBody::Other(body) => body.poll_next(cx),
} }
} }
} }
@ -120,12 +122,13 @@ impl<B: MessageBody> Stream for ResponseBody<B> {
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
#[project] #[project]
match self.project() { match self.project() {
ResponseBody::Body(ref mut body) => body.poll_next(cx), ResponseBody::Body(body) => body.poll_next(cx),
ResponseBody::Other(ref mut body) => body.poll_next(cx), ResponseBody::Other(body) => body.poll_next(cx),
} }
} }
} }
#[pin_project]
/// Represents various types of http message body. /// Represents various types of http message body.
pub enum Body { pub enum Body {
/// Empty response. `Content-Length` header is not set. /// Empty response. `Content-Length` header is not set.
@ -135,7 +138,7 @@ pub enum Body {
/// Specific response body. /// Specific response body.
Bytes(Bytes), Bytes(Bytes),
/// Generic message body. /// Generic message body.
Message(Box<dyn MessageBody>), Message(Box<dyn MessageBody + Unpin>),
} }
impl Body { impl Body {
@ -145,7 +148,7 @@ impl Body {
} }
/// Create body from generic message body. /// Create body from generic message body.
pub fn from_message<B: MessageBody + 'static>(body: B) -> Body { pub fn from_message<B: MessageBody + Unpin + 'static>(body: B) -> Body {
Body::Message(Box::new(body)) Body::Message(Box::new(body))
} }
} }
@ -160,8 +163,10 @@ impl MessageBody for Body {
} }
} }
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { #[project]
match self { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
#[project]
match self.project() {
Body::None => Poll::Ready(None), Body::None => Poll::Ready(None),
Body::Empty => Poll::Ready(None), Body::Empty => Poll::Ready(None),
Body::Bytes(ref mut bin) => { Body::Bytes(ref mut bin) => {
@ -172,7 +177,7 @@ impl MessageBody for Body {
Poll::Ready(Some(Ok(mem::replace(bin, Bytes::new())))) Poll::Ready(Some(Ok(mem::replace(bin, Bytes::new()))))
} }
} }
Body::Message(ref mut body) => body.poll_next(cx), Body::Message(ref mut body) => Pin::new(body.as_mut()).poll_next(cx),
} }
} }
} }
@ -258,7 +263,7 @@ impl From<serde_json::Value> for Body {
impl<S> From<SizedStream<S>> for Body impl<S> From<SizedStream<S>> for Body
where where
S: Stream<Item = Result<Bytes, Error>> + 'static, S: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{ {
fn from(s: SizedStream<S>) -> Body { fn from(s: SizedStream<S>) -> Body {
Body::from_message(s) Body::from_message(s)
@ -267,7 +272,7 @@ where
impl<S, E> From<BodyStream<S, E>> for Body impl<S, E> From<BodyStream<S, E>> for Body
where where
S: Stream<Item = Result<Bytes, E>> + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
fn from(s: BodyStream<S, E>) -> Body { fn from(s: BodyStream<S, E>) -> Body {
@ -280,11 +285,11 @@ impl MessageBody for Bytes {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(mem::replace(self, Bytes::new())))) Poll::Ready(Some(Ok(mem::replace(self.get_mut(), Bytes::new()))))
} }
} }
} }
@ -294,11 +299,11 @@ impl MessageBody for BytesMut {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(mem::replace(self, BytesMut::new()).freeze()))) Poll::Ready(Some(Ok(mem::replace(self.get_mut(), BytesMut::new()).freeze())))
} }
} }
} }
@ -308,41 +313,27 @@ impl MessageBody for &'static str {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(Bytes::from_static( Poll::Ready(Some(Ok(Bytes::from_static(
mem::replace(self, "").as_ref(), mem::replace(self.get_mut(), "").as_ref(),
)))) ))))
} }
} }
} }
impl MessageBody for &'static [u8] {
fn size(&self) -> BodySize {
BodySize::Sized(self.len())
}
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(Bytes::from_static(mem::replace(self, b"")))))
}
}
}
impl MessageBody for Vec<u8> { impl MessageBody for Vec<u8> {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(Bytes::from(mem::replace(self, Vec::new()))))) Poll::Ready(Some(Ok(Bytes::from(mem::replace(self.get_mut(), Vec::new())))))
} }
} }
} }
@ -352,12 +343,12 @@ impl MessageBody for String {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(Bytes::from( Poll::Ready(Some(Ok(Bytes::from(
mem::replace(self, String::new()).into_bytes(), mem::replace(self.get_mut(), String::new()).into_bytes(),
)))) ))))
} }
} }
@ -365,19 +356,21 @@ impl MessageBody for String {
/// Type represent streaming body. /// Type represent streaming body.
/// Response does not contain `content-length` header and appropriate transfer encoding is used. /// Response does not contain `content-length` header and appropriate transfer encoding is used.
pub struct BodyStream<S, E> { #[pin_project]
stream: Pin<Box<S>>, pub struct BodyStream<S: Unpin, E> {
#[pin]
stream: S,
_t: PhantomData<E>, _t: PhantomData<E>,
} }
impl<S, E> BodyStream<S, E> impl<S, E> BodyStream<S, E>
where where
S: Stream<Item = Result<Bytes, E>>, S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error>, E: Into<Error>,
{ {
pub fn new(stream: S) -> Self { pub fn new(stream: S) -> Self {
BodyStream { BodyStream {
stream: Box::pin(stream), stream,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -385,7 +378,7 @@ where
impl<S, E> MessageBody for BodyStream<S, E> impl<S, E> MessageBody for BodyStream<S, E>
where where
S: Stream<Item = Result<Bytes, E>>, S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error>, E: Into<Error>,
{ {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
@ -397,10 +390,11 @@ where
/// Empty values are skipped to prevent [`BodyStream`]'s transmission being /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
/// ended on a zero-length chunk, but rather proceed until the underlying /// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends. /// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream = self.stream.as_mut(); let mut stream = self.project().stream;
loop { loop {
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { let stream = stream.as_mut();
return Poll::Ready(match ready!(stream.poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue, Some(Ok(ref bytes)) if bytes.is_empty() => continue,
opt => opt.map(|res| res.map_err(Into::into)), opt => opt.map(|res| res.map_err(Into::into)),
}); });
@ -410,26 +404,25 @@ where
/// Type represent streaming body. This body implementation should be used /// Type represent streaming body. This body implementation should be used
/// if total size of stream is known. Data get sent as is without using transfer encoding. /// if total size of stream is known. Data get sent as is without using transfer encoding.
pub struct SizedStream<S> { #[pin_project]
pub struct SizedStream<S: Unpin> {
size: u64, size: u64,
stream: Pin<Box<S>>, #[pin]
stream: S,
} }
impl<S> SizedStream<S> impl<S> SizedStream<S>
where where
S: Stream<Item = Result<Bytes, Error>>, S: Stream<Item = Result<Bytes, Error>> + Unpin,
{ {
pub fn new(size: u64, stream: S) -> Self { pub fn new(size: u64, stream: S) -> Self {
SizedStream { SizedStream { size, stream }
size,
stream: Box::pin(stream),
}
} }
} }
impl<S> MessageBody for SizedStream<S> impl<S> MessageBody for SizedStream<S>
where where
S: Stream<Item = Result<Bytes, Error>>, S: Stream<Item = Result<Bytes, Error>> + Unpin,
{ {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
BodySize::Sized64(self.size) BodySize::Sized64(self.size)
@ -440,10 +433,11 @@ where
/// Empty values are skipped to prevent [`SizedStream`]'s transmission being /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
/// ended on a zero-length chunk, but rather proceed until the underlying /// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends. /// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream = self.stream.as_mut(); let mut stream: Pin<&mut S> = self.project().stream;
loop { loop {
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { let stream = stream.as_mut();
return Poll::Ready(match ready!(stream.poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue, Some(Ok(ref bytes)) if bytes.is_empty() => continue,
val => val, val => val,
}); });
@ -456,6 +450,7 @@ mod tests {
use super::*; use super::*;
use futures::stream; use futures::stream;
use futures_util::future::poll_fn; use futures_util::future::poll_fn;
use futures_util::pin_mut;
impl Body { impl Body {
pub(crate) fn get_ref(&self) -> &[u8] { pub(crate) fn get_ref(&self) -> &[u8] {
@ -483,7 +478,7 @@ mod tests {
assert_eq!("test".size(), BodySize::Sized(4)); assert_eq!("test".size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| "test".poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| Pin::new(&mut "test").poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("test")) Some(Bytes::from("test"))
); );
} }
@ -497,10 +492,12 @@ mod tests {
BodySize::Sized(4) BodySize::Sized(4)
); );
assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test"); assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
let sb = Bytes::from(&b"test"[..]);
pin_mut!(sb);
assert_eq!((&b"test"[..]).size(), BodySize::Sized(4)); assert_eq!(sb.size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| (&b"test"[..]).poll_next(cx)) poll_fn(|cx| sb.as_mut().poll_next(cx))
.await .await
.unwrap() .unwrap()
.ok(), .ok(),
@ -512,10 +509,12 @@ mod tests {
async fn test_vec() { async fn test_vec() {
assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4)); assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test"); assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
let test_vec = Vec::from("test");
pin_mut!(test_vec);
assert_eq!(Vec::from("test").size(), BodySize::Sized(4)); assert_eq!(test_vec.size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| Vec::from("test").poll_next(cx)) poll_fn(|cx| test_vec.as_mut().poll_next(cx))
.await .await
.unwrap() .unwrap()
.ok(), .ok(),
@ -525,41 +524,44 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn test_bytes() { async fn test_bytes() {
let mut b = Bytes::from("test"); let b = Bytes::from("test");
assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test"); assert_eq!(Body::from(b.clone()).get_ref(), b"test");
pin_mut!(b);
assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("test")) Some(Bytes::from("test"))
); );
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_bytes_mut() { async fn test_bytes_mut() {
let mut b = BytesMut::from("test"); let b = BytesMut::from("test");
assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test"); assert_eq!(Body::from(b.clone()).get_ref(), b"test");
pin_mut!(b);
assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("test")) Some(Bytes::from("test"))
); );
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_string() { async fn test_string() {
let mut b = "test".to_owned(); let b = "test".to_owned();
assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test"); assert_eq!(Body::from(b.clone()).get_ref(), b"test");
assert_eq!(Body::from(&b).size(), BodySize::Sized(4)); assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
assert_eq!(Body::from(&b).get_ref(), b"test"); assert_eq!(Body::from(&b).get_ref(), b"test");
pin_mut!(b);
assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("test")) Some(Bytes::from("test"))
); );
} }
@ -567,14 +569,15 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn test_unit() { async fn test_unit() {
assert_eq!(().size(), BodySize::Empty); assert_eq!(().size(), BodySize::Empty);
assert!(poll_fn(|cx| ().poll_next(cx)).await.is_none()); assert!(poll_fn(|cx| Pin::new(&mut ()).poll_next(cx)).await.is_none());
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_box() { async fn test_box() {
let mut val = Box::new(()); let val = Box::new(());
pin_mut!(val);
assert_eq!(val.size(), BodySize::Empty); assert_eq!(val.size(), BodySize::Empty);
assert!(poll_fn(|cx| val.poll_next(cx)).await.is_none()); assert!(poll_fn(|cx| val.as_mut().poll_next(cx)).await.is_none());
} }
#[actix_rt::test] #[actix_rt::test]
@ -612,23 +615,48 @@ mod tests {
mod body_stream { mod body_stream {
use super::*; use super::*;
//use futures::task::noop_waker;
//use futures::stream::once;
#[actix_rt::test] #[actix_rt::test]
async fn skips_empty_chunks() { async fn skips_empty_chunks() {
let mut body = BodyStream::new(stream::iter( let body = BodyStream::new(stream::iter(
["1", "", "2"] ["1", "", "2"]
.iter() .iter()
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>), .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>),
)); ));
pin_mut!(body);
assert_eq!( assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("1")), Some(Bytes::from("1")),
); );
assert_eq!( assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("2")), Some(Bytes::from("2")),
); );
} }
/* Now it does not compile as it should
#[actix_rt::test]
async fn move_pinned_pointer() {
let (sender, receiver) = futures::channel::oneshot::channel();
let mut body_stream = Ok(BodyStream::new(once(async {
let x = Box::new(0i32);
let y = &x;
receiver.await.unwrap();
let _z = **y;
Ok::<_, ()>(Bytes::new())
})));
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
pin_mut!(body_stream);
let _ = body_stream.as_mut().unwrap().poll_next(&mut context);
sender.send(()).unwrap();
let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context);
}*/
} }
mod sized_stream { mod sized_stream {
@ -636,16 +664,17 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn skips_empty_chunks() { async fn skips_empty_chunks() {
let mut body = SizedStream::new( let body = SizedStream::new(
2, 2,
stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
); );
pin_mut!(body);
assert_eq!( assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("1")), Some(Bytes::from("1")),
); );
assert_eq!( assert_eq!(
poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(),
Some(Bytes::from("2")), Some(Bytes::from("2")),
); );
} }

View File

@ -8,7 +8,7 @@ use bytes::buf::BufMutExt;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use futures_util::future::poll_fn; use futures_util::future::poll_fn;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt, pin_mut};
use crate::error::PayloadError; use crate::error::PayloadError;
use crate::h1; use crate::h1;
@ -120,7 +120,7 @@ where
/// send request body to the peer /// send request body to the peer
pub(crate) async fn send_body<I, B>( pub(crate) async fn send_body<I, B>(
mut body: B, body: B,
framed: &mut Framed<I, h1::ClientCodec>, framed: &mut Framed<I, h1::ClientCodec>,
) -> Result<(), SendRequestError> ) -> Result<(), SendRequestError>
where where
@ -128,9 +128,10 @@ where
B: MessageBody, B: MessageBody,
{ {
let mut eof = false; let mut eof = false;
pin_mut!(body);
while !eof { while !eof {
while !eof && !framed.is_write_buf_full() { while !eof && !framed.is_write_buf_full() {
match poll_fn(|cx| body.poll_next(cx)).await { match poll_fn(|cx| body.as_mut().poll_next(cx)).await {
Some(result) => { Some(result) => {
framed.write(h1::Message::Chunk(Some(result?)))?; framed.write(h1::Message::Chunk(Some(result?)))?;
} }

View File

@ -4,6 +4,7 @@ use std::time;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::poll_fn; use futures_util::future::poll_fn;
use futures_util::pin_mut;
use h2::{client::SendRequest, SendStream}; use h2::{client::SendRequest, SendStream};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, Method, Version}; use http::{request::Request, Method, Version};
@ -123,13 +124,14 @@ where
} }
async fn send_body<B: MessageBody>( async fn send_body<B: MessageBody>(
mut body: B, body: B,
mut send: SendStream<Bytes>, mut send: SendStream<Bytes>,
) -> Result<(), SendRequestError> { ) -> Result<(), SendRequestError> {
let mut buf = None; let mut buf = None;
pin_mut!(body);
loop { loop {
if buf.is_none() { if buf.is_none() {
match poll_fn(|cx| body.poll_next(cx)).await { match poll_fn(|cx| body.as_mut().poll_next(cx)).await {
Some(Ok(b)) => { Some(Ok(b)) => {
send.reserve_capacity(b.len()); send.reserve_capacity(b.len());
buf = Some(b); buf = Some(b);

View File

@ -9,6 +9,7 @@ use brotli2::write::BrotliEncoder;
use bytes::Bytes; use bytes::Bytes;
use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::write::{GzEncoder, ZlibEncoder};
use futures_core::ready; use futures_core::ready;
use pin_project::{pin_project, project};
use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; use crate::http::header::{ContentEncoding, CONTENT_ENCODING};
@ -19,8 +20,10 @@ use super::Writer;
const INPLACE: usize = 1024; const INPLACE: usize = 1024;
#[pin_project]
pub struct Encoder<B> { pub struct Encoder<B> {
eof: bool, eof: bool,
#[pin]
body: EncoderBody<B>, body: EncoderBody<B>,
encoder: Option<ContentEncoder>, encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>, fut: Option<CpuFuture<ContentEncoder, io::Error>>,
@ -76,67 +79,83 @@ impl<B: MessageBody> Encoder<B> {
} }
} }
#[pin_project]
enum EncoderBody<B> { enum EncoderBody<B> {
Bytes(Bytes), Bytes(Bytes),
Stream(B), Stream(#[pin] B),
BoxedStream(Box<dyn MessageBody>), BoxedStream(Box<dyn MessageBody + Unpin>),
} }
impl<B: MessageBody> MessageBody for EncoderBody<B> {
fn size(&self) -> BodySize {
match self {
EncoderBody::Bytes(ref b) => b.size(),
EncoderBody::Stream(ref b) => b.size(),
EncoderBody::BoxedStream(ref b) => b.size(),
}
}
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
#[project]
match self.project() {
EncoderBody::Bytes(b) => {
if b.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new()))))
}
}
EncoderBody::Stream(b) => b.poll_next(cx),
EncoderBody::BoxedStream(ref mut b) => Pin::new(b.as_mut()).poll_next(cx),
}
}
}
impl<B: MessageBody> MessageBody for Encoder<B> { impl<B: MessageBody> MessageBody for Encoder<B> {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
if self.encoder.is_none() { if self.encoder.is_none() {
match self.body { self.body.size()
EncoderBody::Bytes(ref b) => b.size(),
EncoderBody::Stream(ref b) => b.size(),
EncoderBody::BoxedStream(ref b) => b.size(),
}
} else { } else {
BodySize::Stream BodySize::Stream
} }
} }
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project();
loop { loop {
if self.eof { if *this.eof {
return Poll::Ready(None); return Poll::Ready(None);
} }
if let Some(ref mut fut) = self.fut { if let Some(ref mut fut) = this.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) { let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item, Ok(item) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))), Err(e) => return Poll::Ready(Some(Err(e.into()))),
}; };
let chunk = encoder.take(); let chunk = encoder.take();
self.encoder = Some(encoder); *this.encoder = Some(encoder);
self.fut.take(); this.fut.take();
if !chunk.is_empty() { if !chunk.is_empty() {
return Poll::Ready(Some(Ok(chunk))); return Poll::Ready(Some(Ok(chunk)));
} }
} }
let result = match self.body { let result = this.body.as_mut().poll_next(cx);
EncoderBody::Bytes(ref mut b) => {
if b.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new()))))
}
}
EncoderBody::Stream(ref mut b) => b.poll_next(cx),
EncoderBody::BoxedStream(ref mut b) => b.poll_next(cx),
};
match result { match result {
Poll::Ready(Some(Ok(chunk))) => { Poll::Ready(Some(Ok(chunk))) => {
if let Some(mut encoder) = self.encoder.take() { if let Some(mut encoder) = this.encoder.take() {
if chunk.len() < INPLACE { if chunk.len() < INPLACE {
encoder.write(&chunk)?; encoder.write(&chunk)?;
let chunk = encoder.take(); let chunk = encoder.take();
self.encoder = Some(encoder); *this.encoder = Some(encoder);
if !chunk.is_empty() { if !chunk.is_empty() {
return Poll::Ready(Some(Ok(chunk))); return Poll::Ready(Some(Ok(chunk)));
} }
} else { } else {
self.fut = Some(run(move || { *this.fut = Some(run(move || {
encoder.write(&chunk)?; encoder.write(&chunk)?;
Ok(encoder) Ok(encoder)
})); }));
@ -146,12 +165,12 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
} }
} }
Poll::Ready(None) => { Poll::Ready(None) => {
if let Some(encoder) = self.encoder.take() { if let Some(encoder) = this.encoder.take() {
let chunk = encoder.finish()?; let chunk = encoder.finish()?;
if chunk.is_empty() { if chunk.is_empty() {
return Poll::Ready(None); return Poll::Ready(None);
} else { } else {
self.eof = true; *this.eof = true;
return Poll::Ready(Some(Ok(chunk))); return Poll::Ready(Some(Ok(chunk)));
} }
} else { } else {

View File

@ -10,6 +10,7 @@ use actix_service::Service;
use bitflags::bitflags; use bitflags::bitflags;
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use log::{error, trace}; use log::{error, trace};
use pin_project::pin_project;
use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::cloneable::CloneableService; use crate::cloneable::CloneableService;
@ -41,6 +42,7 @@ bitflags! {
} }
} }
#[pin_project::pin_project]
/// Dispatcher for HTTP/1.1 protocol /// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S, B, X, U> pub struct Dispatcher<T, S, B, X, U>
where where
@ -52,9 +54,11 @@ where
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>, U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display, U::Error: fmt::Display,
{ {
#[pin]
inner: DispatcherState<T, S, B, X, U>, inner: DispatcherState<T, S, B, X, U>,
} }
#[pin_project]
enum DispatcherState<T, S, B, X, U> enum DispatcherState<T, S, B, X, U>
where where
S: Service<Request = Request>, S: Service<Request = Request>,
@ -65,11 +69,11 @@ where
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>, U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display, U::Error: fmt::Display,
{ {
Normal(InnerDispatcher<T, S, B, X, U>), Normal(#[pin] InnerDispatcher<T, S, B, X, U>),
Upgrade(Pin<Box<U::Future>>), Upgrade(#[pin] U::Future),
None,
} }
#[pin_project]
struct InnerDispatcher<T, S, B, X, U> struct InnerDispatcher<T, S, B, X, U>
where where
S: Service<Request = Request>, S: Service<Request = Request>,
@ -88,6 +92,7 @@ where
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
error: Option<DispatchError>, error: Option<DispatchError>,
#[pin]
state: State<S, B, X>, state: State<S, B, X>,
payload: Option<PayloadSender>, payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>, messages: VecDeque<DispatcherMessage>,
@ -95,7 +100,7 @@ where
ka_expire: Instant, ka_expire: Instant,
ka_timer: Option<Delay>, ka_timer: Option<Delay>,
io: T, io: Option<T>,
read_buf: BytesMut, read_buf: BytesMut,
write_buf: BytesMut, write_buf: BytesMut,
codec: Codec, codec: Codec,
@ -107,6 +112,7 @@ enum DispatcherMessage {
Error(Response<()>), Error(Response<()>),
} }
#[pin_project]
enum State<S, B, X> enum State<S, B, X>
where where
S: Service<Request = Request>, S: Service<Request = Request>,
@ -114,9 +120,9 @@ where
B: MessageBody, B: MessageBody,
{ {
None, None,
ExpectCall(Pin<Box<X::Future>>), ExpectCall(#[pin] X::Future),
ServiceCall(Pin<Box<S::Future>>), ServiceCall(#[pin] S::Future),
SendPayload(ResponseBody<B>), SendPayload(#[pin] ResponseBody<B>),
} }
impl<S, B, X> State<S, B, X> impl<S, B, X> State<S, B, X>
@ -141,7 +147,6 @@ where
} }
} }
} }
enum PollResponse { enum PollResponse {
Upgrade(Request), Upgrade(Request),
DoNothing, DoNothing,
@ -236,7 +241,7 @@ where
state: State::None, state: State::None,
error: None, error: None,
messages: VecDeque::new(), messages: VecDeque::new(),
io, io: Some(io),
codec, codec,
read_buf, read_buf,
service, service,
@ -278,10 +283,11 @@ where
} }
// if checked is set to true, delay disconnect until all tasks have finished. // if checked is set to true, delay disconnect until all tasks have finished.
fn client_disconnected(&mut self) { fn client_disconnected(self: Pin<&mut Self>) {
self.flags let this = self.project();
this.flags
.insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT);
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
} }
} }
@ -290,16 +296,19 @@ where
/// ///
/// true - got whouldblock /// true - got whouldblock
/// false - didnt get whouldblock /// false - didnt get whouldblock
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result<bool, DispatchError> { #[pin_project::project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<bool, DispatchError> {
if self.write_buf.is_empty() { if self.write_buf.is_empty() {
return Ok(false); return Ok(false);
} }
let len = self.write_buf.len(); let len = self.write_buf.len();
let mut written = 0; let mut written = 0;
#[project]
let InnerDispatcher { io, write_buf, .. } = self.project();
let mut io = Pin::new(io.as_mut().unwrap());
while written < len { while written < len {
match Pin::new(&mut self.io) match io.as_mut().poll_write(cx, &write_buf[written..])
.poll_write(cx, &self.write_buf[written..])
{ {
Poll::Ready(Ok(0)) => { Poll::Ready(Ok(0)) => {
return Err(DispatchError::Io(io::Error::new( return Err(DispatchError::Io(io::Error::new(
@ -312,112 +321,120 @@ where
} }
Poll::Pending => { Poll::Pending => {
if written > 0 { if written > 0 {
self.write_buf.advance(written); write_buf.advance(written);
} }
return Ok(true); return Ok(true);
} }
Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)), Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)),
} }
} }
if written == self.write_buf.len() { if written == write_buf.len() {
unsafe { self.write_buf.set_len(0) } unsafe { write_buf.set_len(0) }
} else { } else {
self.write_buf.advance(written); write_buf.advance(written);
} }
Ok(false) Ok(false)
} }
fn send_response( fn send_response(
&mut self, self: Pin<&mut Self>,
message: Response<()>, message: Response<()>,
body: ResponseBody<B>, body: ResponseBody<B>,
) -> Result<State<S, B, X>, DispatchError> { ) -> Result<State<S, B, X>, DispatchError> {
self.codec let mut this = self.project();
.encode(Message::Item((message, body.size())), &mut self.write_buf) this.codec
.encode(Message::Item((message, body.size())), &mut this.write_buf)
.map_err(|err| { .map_err(|err| {
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
} }
DispatchError::Io(err) DispatchError::Io(err)
})?; })?;
self.flags.set(Flags::KEEPALIVE, self.codec.keepalive()); this.flags.set(Flags::KEEPALIVE, this.codec.keepalive());
match body.size() { match body.size() {
BodySize::None | BodySize::Empty => Ok(State::None), BodySize::None | BodySize::Empty => Ok(State::None),
_ => Ok(State::SendPayload(body)), _ => Ok(State::SendPayload(body)),
} }
} }
fn send_continue(&mut self) { fn send_continue(self: Pin<&mut Self>) {
self.write_buf self.project().write_buf
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
} }
#[pin_project::project]
fn poll_response( fn poll_response(
&mut self, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<PollResponse, DispatchError> { ) -> Result<PollResponse, DispatchError> {
loop { loop {
let state = match self.state { let mut this = self.as_mut().project();
State::None => match self.messages.pop_front() { #[project]
let state = match this.state.project() {
State::None => match this.messages.pop_front() {
Some(DispatcherMessage::Item(req)) => { Some(DispatcherMessage::Item(req)) => {
Some(self.handle_request(req, cx)?) Some(self.as_mut().handle_request(req, cx)?)
} }
Some(DispatcherMessage::Error(res)) => { Some(DispatcherMessage::Error(res)) => {
Some(self.send_response(res, ResponseBody::Other(Body::Empty))?) Some(self.as_mut().send_response(res, ResponseBody::Other(Body::Empty))?)
} }
Some(DispatcherMessage::Upgrade(req)) => { Some(DispatcherMessage::Upgrade(req)) => {
return Ok(PollResponse::Upgrade(req)); return Ok(PollResponse::Upgrade(req));
} }
None => None, None => None,
}, },
State::ExpectCall(ref mut fut) => { State::ExpectCall(fut) => {
match fut.as_mut().poll(cx) { match fut.poll(cx) {
Poll::Ready(Ok(req)) => { Poll::Ready(Ok(req)) => {
self.send_continue(); self.as_mut().send_continue();
self.state = State::ServiceCall(Box::pin(self.service.call(req))); this = self.as_mut().project();
this.state.set(State::ServiceCall(this.service.call(req)));
continue; continue;
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let res: Response = e.into().into(); let res: Response = e.into().into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
Some(self.send_response(res, body.into_body())?) Some(self.as_mut().send_response(res, body.into_body())?)
} }
Poll::Pending => None, Poll::Pending => None,
} }
} }
State::ServiceCall(ref mut fut) => { State::ServiceCall(fut) => {
match fut.as_mut().poll(cx) { match fut.poll(cx) {
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.state = self.send_response(res, body)?; let state = self.as_mut().send_response(res, body)?;
this = self.as_mut().project();
this.state.set(state);
continue; continue;
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let res: Response = e.into().into(); let res: Response = e.into().into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
Some(self.send_response(res, body.into_body())?) Some(self.as_mut().send_response(res, body.into_body())?)
} }
Poll::Pending => None, Poll::Pending => None,
} }
} }
State::SendPayload(ref mut stream) => { State::SendPayload(mut stream) => {
loop { loop {
if self.write_buf.len() < HW_BUFFER_SIZE { if this.write_buf.len() < HW_BUFFER_SIZE {
match stream.poll_next(cx) { match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => { Poll::Ready(Some(Ok(item))) => {
self.codec.encode( this.codec.encode(
Message::Chunk(Some(item)), Message::Chunk(Some(item)),
&mut self.write_buf, &mut this.write_buf,
)?; )?;
continue; continue;
} }
Poll::Ready(None) => { Poll::Ready(None) => {
self.codec.encode( this.codec.encode(
Message::Chunk(None), Message::Chunk(None),
&mut self.write_buf, &mut this.write_buf,
)?; )?;
self.state = State::None; this = self.as_mut().project();
this.state.set(State::None);
} }
Poll::Ready(Some(Err(_))) => { Poll::Ready(Some(Err(_))) => {
return Err(DispatchError::Unknown) return Err(DispatchError::Unknown)
@ -433,9 +450,11 @@ where
} }
}; };
this = self.as_mut().project();
// set new state // set new state
if let Some(state) = state { if let Some(state) = state {
self.state = state; this.state.set(state);
if !self.state.is_empty() { if !self.state.is_empty() {
continue; continue;
} }
@ -443,7 +462,7 @@ where
// if read-backpressure is enabled and we consumed some data. // if read-backpressure is enabled and we consumed some data.
// we may read more data and retry // we may read more data and retry
if self.state.is_call() { if self.state.is_call() {
if self.poll_request(cx)? { if self.as_mut().poll_request(cx)? {
continue; continue;
} }
} else if !self.messages.is_empty() { } else if !self.messages.is_empty() {
@ -457,16 +476,16 @@ where
} }
fn handle_request( fn handle_request(
&mut self, mut self: Pin<&mut Self>,
req: Request, req: Request,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<State<S, B, X>, DispatchError> { ) -> Result<State<S, B, X>, DispatchError> {
// Handle `EXPECT: 100-Continue` header // Handle `EXPECT: 100-Continue` header
let req = if req.head().expect() { let req = if req.head().expect() {
let mut task = Box::pin(self.expect.call(req)); let mut task = self.as_mut().project().expect.call(req);
match task.as_mut().poll(cx) { match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) {
Poll::Ready(Ok(req)) => { Poll::Ready(Ok(req)) => {
self.send_continue(); self.as_mut().send_continue();
req req
} }
Poll::Pending => return Ok(State::ExpectCall(task)), Poll::Pending => return Ok(State::ExpectCall(task)),
@ -482,8 +501,8 @@ where
}; };
// Call service // Call service
let mut task = Box::pin(self.service.call(req)); let mut task = self.as_mut().project().service.call(req);
match task.as_mut().poll(cx) { match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) {
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.send_response(res, body) self.send_response(res, body)
@ -499,7 +518,7 @@ where
/// Process one incoming requests /// Process one incoming requests
pub(self) fn poll_request( pub(self) fn poll_request(
&mut self, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<bool, DispatchError> { ) -> Result<bool, DispatchError> {
// limit a mount of non processed requests // limit a mount of non processed requests
@ -508,24 +527,25 @@ where
} }
let mut updated = false; let mut updated = false;
let mut this = self.as_mut().project();
loop { loop {
match self.codec.decode(&mut self.read_buf) { match this.codec.decode(&mut this.read_buf) {
Ok(Some(msg)) => { Ok(Some(msg)) => {
updated = true; updated = true;
self.flags.insert(Flags::STARTED); this.flags.insert(Flags::STARTED);
match msg { match msg {
Message::Item(mut req) => { Message::Item(mut req) => {
let pl = self.codec.message_type(); let pl = this.codec.message_type();
req.head_mut().peer_addr = self.peer_addr; req.head_mut().peer_addr = *this.peer_addr;
// set on_connect data // set on_connect data
if let Some(ref on_connect) = self.on_connect { if let Some(ref on_connect) = this.on_connect {
on_connect.set(&mut req.extensions_mut()); on_connect.set(&mut req.extensions_mut());
} }
if pl == MessageType::Stream && self.upgrade.is_some() { if pl == MessageType::Stream && this.upgrade.is_some() {
self.messages.push_back(DispatcherMessage::Upgrade(req)); this.messages.push_back(DispatcherMessage::Upgrade(req));
break; break;
} }
if pl == MessageType::Payload || pl == MessageType::Stream { if pl == MessageType::Payload || pl == MessageType::Stream {
@ -533,41 +553,43 @@ where
let (req1, _) = let (req1, _) =
req.replace_payload(crate::Payload::H1(pl)); req.replace_payload(crate::Payload::H1(pl));
req = req1; req = req1;
self.payload = Some(ps); *this.payload = Some(ps);
} }
// handle request early // handle request early
if self.state.is_empty() { if this.state.is_empty() {
self.state = self.handle_request(req, cx)?; let state = self.as_mut().handle_request(req, cx)?;
this = self.as_mut().project();
this.state.set(state);
} else { } else {
self.messages.push_back(DispatcherMessage::Item(req)); this.messages.push_back(DispatcherMessage::Item(req));
} }
} }
Message::Chunk(Some(chunk)) => { Message::Chunk(Some(chunk)) => {
if let Some(ref mut payload) = self.payload { if let Some(ref mut payload) = this.payload {
payload.feed_data(chunk); payload.feed_data(chunk);
} else { } else {
error!( error!(
"Internal server error: unexpected payload chunk" "Internal server error: unexpected payload chunk"
); );
self.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
self.messages.push_back(DispatcherMessage::Error( this.messages.push_back(DispatcherMessage::Error(
Response::InternalServerError().finish().drop_body(), Response::InternalServerError().finish().drop_body(),
)); ));
self.error = Some(DispatchError::InternalError); *this.error = Some(DispatchError::InternalError);
break; break;
} }
} }
Message::Chunk(None) => { Message::Chunk(None) => {
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.feed_eof(); payload.feed_eof();
} else { } else {
error!("Internal server error: unexpected eof"); error!("Internal server error: unexpected eof");
self.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
self.messages.push_back(DispatcherMessage::Error( this.messages.push_back(DispatcherMessage::Error(
Response::InternalServerError().finish().drop_body(), Response::InternalServerError().finish().drop_body(),
)); ));
self.error = Some(DispatchError::InternalError); *this.error = Some(DispatchError::InternalError);
break; break;
} }
} }
@ -575,44 +597,46 @@ where
} }
Ok(None) => break, Ok(None) => break,
Err(ParseError::Io(e)) => { Err(ParseError::Io(e)) => {
self.client_disconnected(); self.as_mut().client_disconnected();
self.error = Some(DispatchError::Io(e)); this = self.as_mut().project();
*this.error = Some(DispatchError::Io(e));
break; break;
} }
Err(e) => { Err(e) => {
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted); payload.set_error(PayloadError::EncodingCorrupted);
} }
// Malformed requests should be responded with 400 // Malformed requests should be responded with 400
self.messages.push_back(DispatcherMessage::Error( this.messages.push_back(DispatcherMessage::Error(
Response::BadRequest().finish().drop_body(), Response::BadRequest().finish().drop_body(),
)); ));
self.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
self.error = Some(e.into()); *this.error = Some(e.into());
break; break;
} }
} }
} }
if updated && self.ka_timer.is_some() { if updated && this.ka_timer.is_some() {
if let Some(expire) = self.codec.config().keep_alive_expire() { if let Some(expire) = this.codec.config().keep_alive_expire() {
self.ka_expire = expire; *this.ka_expire = expire;
} }
} }
Ok(updated) Ok(updated)
} }
/// keep-alive timer /// keep-alive timer
fn poll_keepalive(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchError> { fn poll_keepalive(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> {
if self.ka_timer.is_none() { let mut this = self.as_mut().project();
if this.ka_timer.is_none() {
// shutdown timeout // shutdown timeout
if self.flags.contains(Flags::SHUTDOWN) { if this.flags.contains(Flags::SHUTDOWN) {
if let Some(interval) = self.codec.config().client_disconnect_timer() { if let Some(interval) = this.codec.config().client_disconnect_timer() {
self.ka_timer = Some(delay_until(interval)); *this.ka_timer = Some(delay_until(interval));
} else { } else {
self.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
} }
return Ok(()); return Ok(());
@ -622,55 +646,56 @@ where
} }
} }
match Pin::new(&mut self.ka_timer.as_mut().unwrap()).poll(cx) { match Pin::new(&mut this.ka_timer.as_mut().unwrap()).poll(cx) {
Poll::Ready(()) => { Poll::Ready(()) => {
// if we get timeout during shutdown, drop connection // if we get timeout during shutdown, drop connection
if self.flags.contains(Flags::SHUTDOWN) { if this.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout); return Err(DispatchError::DisconnectTimeout);
} else if self.ka_timer.as_mut().unwrap().deadline() >= self.ka_expire { } else if this.ka_timer.as_mut().unwrap().deadline() >= *this.ka_expire {
// check for any outstanding tasks // check for any outstanding tasks
if self.state.is_empty() && self.write_buf.is_empty() { if this.state.is_empty() && this.write_buf.is_empty() {
if self.flags.contains(Flags::STARTED) { if this.flags.contains(Flags::STARTED) {
trace!("Keep-alive timeout, close connection"); trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN); this.flags.insert(Flags::SHUTDOWN);
// start shutdown timer // start shutdown timer
if let Some(deadline) = if let Some(deadline) =
self.codec.config().client_disconnect_timer() this.codec.config().client_disconnect_timer()
{ {
if let Some(mut timer) = self.ka_timer.as_mut() { if let Some(mut timer) = this.ka_timer.as_mut() {
timer.reset(deadline); timer.reset(deadline);
let _ = Pin::new(&mut timer).poll(cx); let _ = Pin::new(&mut timer).poll(cx);
} }
} else { } else {
// no shutdown timeout, drop socket // no shutdown timeout, drop socket
self.flags.insert(Flags::WRITE_DISCONNECT); this.flags.insert(Flags::WRITE_DISCONNECT);
return Ok(()); return Ok(());
} }
} else { } else {
// timeout on first request (slow request) return 408 // timeout on first request (slow request) return 408
if !self.flags.contains(Flags::STARTED) { if !this.flags.contains(Flags::STARTED) {
trace!("Slow request timeout"); trace!("Slow request timeout");
let _ = self.send_response( let _ = self.as_mut().send_response(
Response::RequestTimeout().finish().drop_body(), Response::RequestTimeout().finish().drop_body(),
ResponseBody::Other(Body::Empty), ResponseBody::Other(Body::Empty),
); );
this = self.as_mut().project();
} else { } else {
trace!("Keep-alive connection timeout"); trace!("Keep-alive connection timeout");
} }
self.flags.insert(Flags::STARTED | Flags::SHUTDOWN); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
self.state = State::None; this.state.set(State::None);
} }
} else if let Some(deadline) = } else if let Some(deadline) =
self.codec.config().keep_alive_expire() this.codec.config().keep_alive_expire()
{ {
if let Some(mut timer) = self.ka_timer.as_mut() { if let Some(mut timer) = this.ka_timer.as_mut() {
timer.reset(deadline); timer.reset(deadline);
let _ = Pin::new(&mut timer).poll(cx); let _ = Pin::new(&mut timer).poll(cx);
} }
} }
} else if let Some(mut timer) = self.ka_timer.as_mut() { } else if let Some(mut timer) = this.ka_timer.as_mut() {
timer.reset(self.ka_expire); timer.reset(*this.ka_expire);
let _ = Pin::new(&mut timer).poll(cx); let _ = Pin::new(&mut timer).poll(cx);
} }
} }
@ -695,22 +720,25 @@ where
{ {
type Output = Result<(), DispatchError>; type Output = Result<(), DispatchError>;
#[pin_project::project]
#[inline] #[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().inner { let this = self.as_mut().project();
DispatcherState::Normal(ref mut inner) => { #[project]
inner.poll_keepalive(cx)?; match this.inner.project() {
DispatcherState::Normal(mut inner) => {
inner.as_mut().poll_keepalive(cx)?;
if inner.flags.contains(Flags::SHUTDOWN) { if inner.flags.contains(Flags::SHUTDOWN) {
if inner.flags.contains(Flags::WRITE_DISCONNECT) { if inner.flags.contains(Flags::WRITE_DISCONNECT) {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else { } else {
// flush buffer // flush buffer
inner.poll_flush(cx)?; inner.as_mut().poll_flush(cx)?;
if !inner.write_buf.is_empty() { if !inner.write_buf.is_empty() || inner.io.is_none() {
Poll::Pending Poll::Pending
} else { } else {
match Pin::new(&mut inner.io).poll_shutdown(cx) { match Pin::new(inner.project().io).as_pin_mut().unwrap().poll_shutdown(cx) {
Poll::Ready(res) => { Poll::Ready(res) => {
Poll::Ready(res.map_err(DispatchError::from)) Poll::Ready(res.map_err(DispatchError::from))
} }
@ -722,53 +750,50 @@ where
// read socket into a buf // read socket into a buf
let should_disconnect = let should_disconnect =
if !inner.flags.contains(Flags::READ_DISCONNECT) { if !inner.flags.contains(Flags::READ_DISCONNECT) {
read_available(cx, &mut inner.io, &mut inner.read_buf)? let mut inner_p = inner.as_mut().project();
read_available(cx, inner_p.io.as_mut().unwrap(), &mut inner_p.read_buf)?
} else { } else {
None None
}; };
inner.poll_request(cx)?; inner.as_mut().poll_request(cx)?;
if let Some(true) = should_disconnect { if let Some(true) = should_disconnect {
inner.flags.insert(Flags::READ_DISCONNECT); let inner_p = inner.as_mut().project();
if let Some(mut payload) = inner.payload.take() { inner_p.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = inner_p.payload.take() {
payload.feed_eof(); payload.feed_eof();
} }
}; };
loop { loop {
let inner_p = inner.as_mut().project();
let remaining = let remaining =
inner.write_buf.capacity() - inner.write_buf.len(); inner_p.write_buf.capacity() - inner_p.write_buf.len();
if remaining < LW_BUFFER_SIZE { if remaining < LW_BUFFER_SIZE {
inner.write_buf.reserve(HW_BUFFER_SIZE - remaining); inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining);
} }
let result = inner.poll_response(cx)?; let result = inner.as_mut().poll_response(cx)?;
let drain = result == PollResponse::DrainWriteBuf; let drain = result == PollResponse::DrainWriteBuf;
// switch to upgrade handler // switch to upgrade handler
if let PollResponse::Upgrade(req) = result { if let PollResponse::Upgrade(req) = result {
if let DispatcherState::Normal(inner) = let inner_p = inner.as_mut().project();
std::mem::replace(&mut self.inner, DispatcherState::None) let mut parts = FramedParts::with_read_buf(
{ inner_p.io.take().unwrap(),
let mut parts = FramedParts::with_read_buf( std::mem::replace(inner_p.codec, Codec::default()),
inner.io, std::mem::replace(inner_p.read_buf, BytesMut::default()),
inner.codec, );
inner.read_buf, parts.write_buf = std::mem::replace(inner_p.write_buf, BytesMut::default());
); let framed = Framed::from_parts(parts);
parts.write_buf = inner.write_buf; let upgrade = inner_p.upgrade.take().unwrap().call((req, framed));
let framed = Framed::from_parts(parts); self.as_mut().project().inner.set(DispatcherState::Upgrade(upgrade));
self.inner = DispatcherState::Upgrade( return self.poll(cx);
Box::pin(inner.upgrade.unwrap().call((req, framed))),
);
return self.poll(cx);
} else {
panic!()
}
} }
// we didnt get WouldBlock from write operation, // we didnt get WouldBlock from write operation,
// so data get written to kernel completely (OSX) // so data get written to kernel completely (OSX)
// and we have to write again otherwise response can get stuck // and we have to write again otherwise response can get stuck
if inner.poll_flush(cx)? || !drain { if inner.as_mut().poll_flush(cx)? || !drain {
break; break;
} }
} }
@ -780,25 +805,26 @@ where
let is_empty = inner.state.is_empty(); let is_empty = inner.state.is_empty();
let inner_p = inner.as_mut().project();
// read half is closed and we do not processing any responses // read half is closed and we do not processing any responses
if inner.flags.contains(Flags::READ_DISCONNECT) && is_empty { if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty {
inner.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);
} }
// keep-alive and stream errors // keep-alive and stream errors
if is_empty && inner.write_buf.is_empty() { if is_empty && inner_p.write_buf.is_empty() {
if let Some(err) = inner.error.take() { if let Some(err) = inner_p.error.take() {
Poll::Ready(Err(err)) Poll::Ready(Err(err))
} }
// disconnect if keep-alive is not enabled // disconnect if keep-alive is not enabled
else if inner.flags.contains(Flags::STARTED) else if inner_p.flags.contains(Flags::STARTED)
&& !inner.flags.intersects(Flags::KEEPALIVE) && !inner_p.flags.intersects(Flags::KEEPALIVE)
{ {
inner.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);
self.poll(cx) self.poll(cx)
} }
// disconnect if shutdown // disconnect if shutdown
else if inner.flags.contains(Flags::SHUTDOWN) { else if inner_p.flags.contains(Flags::SHUTDOWN) {
self.poll(cx) self.poll(cx)
} else { } else {
Poll::Pending Poll::Pending
@ -808,13 +834,12 @@ where
} }
} }
} }
DispatcherState::Upgrade(ref mut fut) => { DispatcherState::Upgrade(fut) => {
fut.as_mut().poll(cx).map_err(|e| { fut.poll(cx).map_err(|e| {
error!("Upgrade handler error: {}", e); error!("Upgrade handler error: {}", e);
DispatchError::Upgrade DispatchError::Upgrade
}) })
} }
DispatcherState::None => panic!(),
} }
} }
} }
@ -904,9 +929,9 @@ mod tests {
Poll::Ready(res) => assert!(res.is_err()), Poll::Ready(res) => assert!(res.is_err()),
} }
if let DispatcherState::Normal(ref inner) = h1.inner { if let DispatcherState::Normal(ref mut inner) = h1.inner {
assert!(inner.flags.contains(Flags::READ_DISCONNECT)); assert!(inner.flags.contains(Flags::READ_DISCONNECT));
assert_eq!(&inner.io.write_buf[..26], b"HTTP/1.1 400 Bad Request\r\n"); assert_eq!(&inner.io.take().unwrap().write_buf[..26], b"HTTP/1.1 400 Bad Request\r\n");
} }
}) })
.await; .await;

View File

@ -13,6 +13,7 @@ use crate::response::Response;
#[pin_project::pin_project] #[pin_project::pin_project]
pub struct SendResponse<T, B> { pub struct SendResponse<T, B> {
res: Option<Message<(Response<()>, BodySize)>>, res: Option<Message<(Response<()>, BodySize)>>,
#[pin]
body: Option<ResponseBody<B>>, body: Option<ResponseBody<B>>,
framed: Option<Framed<T, Codec>>, framed: Option<Framed<T, Codec>>,
} }
@ -35,24 +36,27 @@ where
impl<T, B> Future for SendResponse<T, B> impl<T, B> Future for SendResponse<T, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
B: MessageBody, B: MessageBody + Unpin,
{ {
type Output = Result<Framed<T, Codec>, Error>; type Output = Result<Framed<T, Codec>, Error>;
// TODO: rethink if we need loops in polls
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let mut this = self.project();
let mut body_done = this.body.is_none();
loop { loop {
let mut body_ready = this.body.is_some(); let mut body_ready = !body_done;
let framed = this.framed.as_mut().unwrap(); let framed = this.framed.as_mut().unwrap();
// send body // send body
if this.res.is_none() && this.body.is_some() { if this.res.is_none() && body_ready {
while body_ready && this.body.is_some() && !framed.is_write_buf_full() { while body_ready && !body_done && !framed.is_write_buf_full() {
match this.body.as_mut().unwrap().poll_next(cx)? { match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? {
Poll::Ready(item) => { Poll::Ready(item) => {
// body is done // body is done when item is None
if item.is_none() { body_done = item.is_none();
if body_done {
let _ = this.body.take(); let _ = this.body.take();
} }
framed.write(Message::Chunk(item))?; framed.write(Message::Chunk(item))?;
@ -82,7 +86,7 @@ where
continue; continue;
} }
if this.body.is_some() { if !body_done {
if body_ready { if body_ready {
continue; continue;
} else { } else {

View File

@ -168,7 +168,7 @@ struct ServiceResponse<F, I, E, B> {
#[pin_project::pin_project] #[pin_project::pin_project]
enum ServiceResponseState<F, B> { enum ServiceResponseState<F, B> {
ServiceCall(#[pin] F, Option<SendResponse<Bytes>>), ServiceCall(#[pin] F, Option<SendResponse<Bytes>>),
SendPayload(SendStream<Bytes>, ResponseBody<B>), SendPayload(SendStream<Bytes>, #[pin] ResponseBody<B>),
} }
impl<F, I, E, B> ServiceResponse<F, I, E, B> impl<F, I, E, B> ServiceResponse<F, I, E, B>
@ -338,7 +338,7 @@ where
} }
} }
} else { } else {
match body.poll_next(cx) { match body.as_mut().poll_next(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(None) => { Poll::Ready(None) => {
if let Err(e) = stream.send_data(Bytes::new(), true) { if let Err(e) = stream.send_data(Bytes::new(), true) {

View File

@ -637,7 +637,7 @@ impl ResponseBuilder {
/// `ResponseBuilder` can not be used after this call. /// `ResponseBuilder` can not be used after this call.
pub fn streaming<S, E>(&mut self, stream: S) -> Response pub fn streaming<S, E>(&mut self, stream: S) -> Response
where where
S: Stream<Item = Result<Bytes, E>> + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
self.body(Body::from_message(BodyStream::new(stream))) self.body(Body::from_message(BodyStream::new(stream)))

View File

@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-web = { version = "2.0.0", default-features = false, features = ["secure-cookies"] } actix-web = { version = "2.0.0", default-features = false, features = ["secure-cookies"] }
actix-service = "1.0.2" actix-service = "1.0.5"
futures = "0.3.1" futures = "0.3.1"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
@ -26,4 +26,4 @@ time = { version = "0.2.5", default-features = false, features = ["std"] }
[dev-dependencies] [dev-dependencies]
actix-rt = "1.0.0" actix-rt = "1.0.0"
actix-http = "1.0.1" actix-http = "1.0.1"
bytes = "0.5.3" bytes = "0.5.4"

View File

@ -16,7 +16,7 @@ name = "actix_multipart"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-web = { version = "2.0.0-rc", default-features = false } actix-web = { version = "2.0.0", default-features = false }
actix-service = "1.0.1" actix-service = "1.0.1"
actix-utils = "1.0.3" actix-utils = "1.0.3"
bytes = "0.5.3" bytes = "0.5.3"
@ -29,4 +29,4 @@ twoway = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "1.0.0" actix-rt = "1.0.0"
actix-http = "1.0.0" actix-http = "1.0.1"

View File

@ -22,9 +22,9 @@ default = ["cookie-session"]
cookie-session = ["actix-web/secure-cookies"] cookie-session = ["actix-web/secure-cookies"]
[dependencies] [dependencies]
actix-web = "2.0.0-rc" actix-web = { version = "2.0.0" }
actix-service = "1.0.1" actix-service = "1.0.5"
bytes = "0.5.3" bytes = "0.5.4"
derive_more = "0.99.2" derive_more = "0.99.2"
futures = "0.3.1" futures = "0.3.1"
serde = "1.0" serde = "1.0"

View File

@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix = "0.9.0" actix = "0.9.0"
actix-web = "2.0.0-rc" actix-web = "2.0.0"
actix-http = "1.0.1" actix-http = "1.0.1"
actix-codec = "0.2.0" actix-codec = "0.2.0"
bytes = "0.5.2" bytes = "0.5.2"

View File

@ -4,7 +4,6 @@
* Fix compilation with default features off * Fix compilation with default features off
## [1.0.0] - 2019-12-13 ## [1.0.0] - 2019-12-13
* Release * Release

View File

@ -36,7 +36,7 @@ compress = ["actix-http/compress"]
[dependencies] [dependencies]
actix-codec = "0.2.0" actix-codec = "0.2.0"
actix-service = "1.0.1" actix-service = "1.0.1"
actix-http = "1.0.0" actix-http = "1.0.1"
actix-rt = "1.0.0" actix-rt = "1.0.0"
base64 = "0.11" base64 = "0.11"
@ -55,7 +55,7 @@ rust-tls = { version = "0.16.0", package="rustls", optional = true, features = [
[dev-dependencies] [dev-dependencies]
actix-connect = { version = "1.0.1", features=["openssl"] } actix-connect = { version = "1.0.1", features=["openssl"] }
actix-web = { version = "2.0.0-rc", features=["openssl"] } actix-web = { version = "2.0.0", features=["openssl"] }
actix-http = { version = "1.0.1", features=["openssl"] } actix-http = { version = "1.0.1", features=["openssl"] }
actix-http-test = { version = "1.0.0", features=["openssl"] } actix-http-test = { version = "1.0.0", features=["openssl"] }
actix-utils = "1.0.3" actix-utils = "1.0.3"

View File

@ -238,15 +238,20 @@ where
} }
} }
use pin_project::{pin_project, pinned_drop};
#[pin_project(PinnedDrop)]
pub struct StreamLog<B> { pub struct StreamLog<B> {
#[pin]
body: ResponseBody<B>, body: ResponseBody<B>,
format: Option<Format>, format: Option<Format>,
size: usize, size: usize,
time: OffsetDateTime, time: OffsetDateTime,
} }
impl<B> Drop for StreamLog<B> { #[pinned_drop]
fn drop(&mut self) { impl<B> PinnedDrop for StreamLog<B> {
fn drop(self: Pin<&mut Self>) {
if let Some(ref format) = self.format { if let Some(ref format) = self.format {
let render = |fmt: &mut Formatter<'_>| { let render = |fmt: &mut Formatter<'_>| {
for unit in &format.0 { for unit in &format.0 {
@ -259,15 +264,17 @@ impl<B> Drop for StreamLog<B> {
} }
} }
impl<B: MessageBody> MessageBody for StreamLog<B> { impl<B: MessageBody> MessageBody for StreamLog<B> {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
self.body.size() self.body.size()
} }
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
match self.body.poll_next(cx) { let this = self.project();
match this.body.poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => { Poll::Ready(Some(Ok(chunk))) => {
self.size += chunk.len(); *this.size += chunk.len();
Poll::Ready(Some(Ok(chunk))) Poll::Ready(Some(Ok(chunk)))
} }
val => val, val => val,

View File

@ -150,7 +150,7 @@ where
pub async fn read_response<S, B>(app: &mut S, req: Request) -> Bytes pub async fn read_response<S, B>(app: &mut S, req: Request) -> Bytes
where where
S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>, S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>,
B: MessageBody, B: MessageBody + Unpin,
{ {
let mut resp = app let mut resp = app
.call(req) .call(req)
@ -193,7 +193,7 @@ where
/// ``` /// ```
pub async fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes pub async fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes
where where
B: MessageBody, B: MessageBody + Unpin,
{ {
let mut body = res.take_body(); let mut body = res.take_body();
let mut bytes = BytesMut::new(); let mut bytes = BytesMut::new();
@ -251,7 +251,7 @@ where
pub async fn read_response_json<S, B, T>(app: &mut S, req: Request) -> T pub async fn read_response_json<S, B, T>(app: &mut S, req: Request) -> T
where where
S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>, S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>,
B: MessageBody, B: MessageBody + Unpin,
T: DeserializeOwned, T: DeserializeOwned,
{ {
let body = read_response(app, req).await; let body = read_response(app, req).await;
@ -953,7 +953,6 @@ impl Drop for TestServer {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_http::httpmessage::HttpMessage; use actix_http::httpmessage::HttpMessage;
use futures::FutureExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::SystemTime; use std::time::SystemTime;
@ -1163,6 +1162,13 @@ mod tests {
assert!(res.status().is_success()); assert!(res.status().is_success());
} }
/*
Comment out until actix decoupled of actix-http:
https://github.com/actix/actix/issues/321
use futures::FutureExt;
#[actix_rt::test] #[actix_rt::test]
async fn test_actor() { async fn test_actor() {
use actix::Actor; use actix::Actor;
@ -1183,7 +1189,6 @@ mod tests {
} }
} }
let addr = MyActor.start();
let mut app = init_service(App::new().service(web::resource("/index.html").to( let mut app = init_service(App::new().service(web::resource("/index.html").to(
move || { move || {
@ -1205,4 +1210,5 @@ mod tests {
let res = app.call(req).await.unwrap(); let res = app.call(req).await.unwrap();
assert!(res.status().is_success()); assert!(res.status().is_success());
} }
*/
} }

View File

@ -4,7 +4,6 @@
* Update the `time` dependency to 0.2.7 * Update the `time` dependency to 0.2.7
## [1.0.0] - 2019-12-13 ## [1.0.0] - 2019-12-13
### Changed ### Changed

View File

@ -37,7 +37,7 @@ actix-utils = "1.0.3"
actix-rt = "1.0.0" actix-rt = "1.0.0"
actix-server = "1.0.0" actix-server = "1.0.0"
actix-testing = "1.0.0" actix-testing = "1.0.0"
awc = "1.0.0" awc = "1.0.1"
base64 = "0.11" base64 = "0.11"
bytes = "0.5.3" bytes = "0.5.3"
@ -55,5 +55,5 @@ time = { version = "0.2.7", default-features = false, features = ["std"] }
open-ssl = { version="0.10", package="openssl", optional = true } open-ssl = { version="0.10", package="openssl", optional = true }
[dev-dependencies] [dev-dependencies]
actix-web = "2.0.0-rc" actix-web = { version = "2.0.0" }
actix-http = "1.0.1" actix-http = { version = "1.0.1" }

View File

@ -5,6 +5,9 @@ use futures::stream::once;
use actix_http::body::{MessageBody, BodyStream}; use actix_http::body::{MessageBody, BodyStream};
use bytes::Bytes; use bytes::Bytes;
/*
Disable weird poll until actix-web is based on actix-http 2.0.0
#[test] #[test]
fn weird_poll() { fn weird_poll() {
let (sender, receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::oneshot::channel();
@ -24,3 +27,4 @@ fn weird_poll() {
let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context);
} }
*/