diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index 3d57a472a..b2b88a7ea 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -17,23 +17,18 @@ async fn main() -> io::Result<()> { HttpService::build() .client_timeout(1000) .client_disconnect(1000) - .finish(|mut req: Request| { - async move { - let mut body = BytesMut::new(); - while let Some(item) = req.payload().next().await { - body.extend_from_slice(&item?); - } - - info!("request body: {:?}", body); - Ok::<_, Error>( - Response::Ok() - .header( - "x-head", - HeaderValue::from_static("dummy value!"), - ) - .body(body), - ) + .finish(|mut req: Request| async move { + let mut body = BytesMut::new(); + while let Some(item) = req.payload().next().await { + body.extend_from_slice(&item?); } + + info!("request body: {:?}", body); + Ok::<_, Error>( + Response::Ok() + .header("x-head", HeaderValue::from_static("dummy value!")) + .body(body), + ) }) .tcp() })? diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 912f22e33..c581db604 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -36,7 +36,10 @@ impl BodySize { pub trait MessageBody { fn size(&self) -> BodySize; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>>; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>>; downcast_get_type_id!(); } @@ -48,7 +51,10 @@ impl MessageBody for () { BodySize::Empty } - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { Poll::Ready(None) } } @@ -58,7 +64,10 @@ impl MessageBody for Box { self.as_ref().size() } - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { Pin::new(self.get_mut().as_mut()).poll_next(cx) } } @@ -103,7 +112,10 @@ impl MessageBody for ResponseBody { } #[project] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { #[project] match self.project() { ResponseBody::Body(body) => body.poll_next(cx), @@ -164,7 +176,10 @@ impl MessageBody for Body { } #[project] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { #[project] match self.project() { Body::None => Poll::Ready(None), @@ -285,7 +300,10 @@ impl MessageBody for Bytes { BodySize::Sized(self.len()) } - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -299,11 +317,16 @@ impl MessageBody for BytesMut { BodySize::Sized(self.len()) } - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(mem::replace(self.get_mut(), BytesMut::new()).freeze()))) + Poll::Ready(Some(Ok( + mem::replace(self.get_mut(), BytesMut::new()).freeze() + ))) } } } @@ -313,7 +336,10 @@ impl MessageBody for &'static str { BodySize::Sized(self.len()) } - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -329,11 +355,17 @@ impl MessageBody for Vec { BodySize::Sized(self.len()) } - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(Bytes::from(mem::replace(self.get_mut(), Vec::new()))))) + Poll::Ready(Some(Ok(Bytes::from(mem::replace( + self.get_mut(), + Vec::new(), + ))))) } } } @@ -343,7 +375,10 @@ impl MessageBody for String { BodySize::Sized(self.len()) } - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -390,7 +425,10 @@ where /// Empty values are skipped to prevent [`BodyStream`]'s transmission being /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { let mut stream = self.project().stream; loop { let stream = stream.as_mut(); @@ -433,7 +471,10 @@ where /// Empty values are skipped to prevent [`SizedStream`]'s transmission being /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { let mut stream: Pin<&mut S> = self.project().stream; loop { let stream = stream.as_mut(); @@ -478,7 +519,10 @@ mod tests { assert_eq!("test".size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| Pin::new(&mut "test").poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| Pin::new(&mut "test").poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("test")) ); } @@ -497,10 +541,7 @@ mod tests { assert_eq!(sb.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| sb.as_mut().poll_next(cx)) - .await - .unwrap() - .ok(), + poll_fn(|cx| sb.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } @@ -535,7 +576,7 @@ mod tests { Some(Bytes::from("test")) ); } - + #[actix_rt::test] async fn test_bytes_mut() { let b = BytesMut::from("test"); @@ -569,7 +610,9 @@ mod tests { #[actix_rt::test] async fn test_unit() { assert_eq!(().size(), BodySize::Empty); - assert!(poll_fn(|cx| Pin::new(&mut ()).poll_next(cx)).await.is_none()); + assert!(poll_fn(|cx| Pin::new(&mut ()).poll_next(cx)) + .await + .is_none()); } #[actix_rt::test] @@ -628,11 +671,17 @@ mod tests { pin_mut!(body); assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("1")), ); assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("2")), ); } @@ -648,14 +697,14 @@ mod tests { let _z = **y; Ok::<_, ()>(Bytes::new()) }))); - + let waker = noop_waker(); let mut context = Context::from_waker(&waker); pin_mut!(body_stream); let _ = body_stream.as_mut().unwrap().poll_next(&mut context); sender.send(()).unwrap(); - let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); + let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); }*/ } @@ -670,11 +719,17 @@ mod tests { ); pin_mut!(body); assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("1")), ); assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("2")), ); } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index c1863b920..51e853b3d 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -8,7 +8,7 @@ use bytes::buf::BufMutExt; use bytes::{Bytes, BytesMut}; use futures_core::Stream; use futures_util::future::poll_fn; -use futures_util::{SinkExt, StreamExt, pin_mut}; +use futures_util::{pin_mut, SinkExt, StreamExt}; use crate::error::PayloadError; use crate::h1; diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 139cf9f66..38a51b558 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -16,8 +16,8 @@ use fxhash::FxHashMap; use h2::client::{handshake, Connection, SendRequest}; use http::uri::Authority; use indexmap::IndexSet; -use slab::Slab; use pin_project::pin_project; +use slab::Slab; use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index a38a80e76..899046231 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -211,7 +211,12 @@ impl Date { } fn update(&mut self) { self.pos = 0; - write!(self, "{}", OffsetDateTime::now().format("%a, %d %b %Y %H:%M:%S GMT")).unwrap(); + write!( + self, + "{}", + OffsetDateTime::now().format("%a, %d %b %Y %H:%M:%S GMT") + ) + .unwrap(); } } @@ -282,7 +287,6 @@ impl DateService { mod tests { use super::*; - // Test modifying the date from within the closure // passed to `set_date` #[test] @@ -290,9 +294,7 @@ mod tests { let service = DateService::new(); // Make sure that `check_date` doesn't try to spawn a task service.0.update(); - service.set_date(|_| { - service.0.reset() - }); + service.set_date(|_| service.0.reset()); } #[test] diff --git a/actix-http/src/cookie/builder.rs b/actix-http/src/cookie/builder.rs index c3820abf0..80e7ee71f 100644 --- a/actix-http/src/cookie/builder.rs +++ b/actix-http/src/cookie/builder.rs @@ -109,7 +109,8 @@ impl CookieBuilder { pub fn max_age_time(mut self, value: Duration) -> CookieBuilder { // Truncate any nanoseconds from the Duration, as they aren't represented within `Max-Age` // and would cause two otherwise identical `Cookie` instances to not be equivalent to one another. - self.cookie.set_max_age(Duration::seconds(value.whole_seconds())); + self.cookie + .set_max_age(Duration::seconds(value.whole_seconds())); self } diff --git a/actix-http/src/cookie/jar.rs b/actix-http/src/cookie/jar.rs index 64922897b..dd4ec477e 100644 --- a/actix-http/src/cookie/jar.rs +++ b/actix-http/src/cookie/jar.rs @@ -533,8 +533,8 @@ mod test { #[test] #[cfg(feature = "secure-cookies")] fn delta() { - use time::Duration; use std::collections::HashMap; + use time::Duration; let mut c = CookieJar::new(); diff --git a/actix-http/src/cookie/mod.rs b/actix-http/src/cookie/mod.rs index 8dccd0b6d..360d80883 100644 --- a/actix-http/src/cookie/mod.rs +++ b/actix-http/src/cookie/mod.rs @@ -1015,7 +1015,9 @@ mod tests { assert_eq!(&cookie.to_string(), "foo=bar; Domain=www.rust-lang.org"); let time_str = "Wed, 21 Oct 2015 07:28:00 GMT"; - let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S").unwrap().assume_utc(); + let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S") + .unwrap() + .assume_utc(); let cookie = Cookie::build("foo", "bar").expires(expires).finish(); assert_eq!( &cookie.to_string(), diff --git a/actix-http/src/cookie/parse.rs b/actix-http/src/cookie/parse.rs index 537069de3..ce261c758 100644 --- a/actix-http/src/cookie/parse.rs +++ b/actix-http/src/cookie/parse.rs @@ -376,7 +376,9 @@ mod tests { ); let time_str = "Wed, 21 Oct 2015 07:28:00 GMT"; - let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S").unwrap().assume_utc(); + let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S") + .unwrap() + .assume_utc(); expected.set_expires(expires); assert_eq_parse!( " foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \ @@ -385,7 +387,9 @@ mod tests { ); unexpected.set_domain("foo.com"); - let bad_expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%S:%M").unwrap().assume_utc(); + let bad_expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%S:%M") + .unwrap() + .assume_utc(); expected.set_expires(bad_expires); assert_ne_parse!( " foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \ @@ -414,8 +418,15 @@ mod tests { #[test] fn do_not_panic_on_large_max_ages() { let max_duration = Duration::max_value(); - let expected = Cookie::build("foo", "bar").max_age_time(max_duration).finish(); - let overflow_duration = max_duration.checked_add(Duration::nanoseconds(1)).unwrap_or(max_duration); - assert_eq_parse!(format!(" foo=bar; Max-Age={:?}", overflow_duration.whole_seconds()), expected); + let expected = Cookie::build("foo", "bar") + .max_age_time(max_duration) + .finish(); + let overflow_duration = max_duration + .checked_add(Duration::nanoseconds(1)) + .unwrap_or(max_duration); + assert_eq_parse!( + format!(" foo=bar; Max-Age={:?}", overflow_duration.whole_seconds()), + expected + ); } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 8a075e8b7..72bb7d603 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -94,9 +94,12 @@ impl MessageBody for EncoderBody { EncoderBody::BoxedStream(ref b) => b.size(), } } - + #[project] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { #[project] match self.project() { EncoderBody::Bytes(b) => { @@ -112,7 +115,6 @@ impl MessageBody for EncoderBody { } } - impl MessageBody for Encoder { fn size(&self) -> BodySize { if self.encoder.is_none() { @@ -121,8 +123,11 @@ impl MessageBody for Encoder { BodySize::Stream } } - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { let mut this = self.project(); loop { if *this.eof { diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 4b8f13cf0..b13481551 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -954,12 +954,12 @@ impl ResponseError for fail_ure::Error {} #[cfg(feature = "actors")] /// `InternalServerError` for `actix::MailboxError` /// This is supported on feature=`actors` only -impl ResponseError for actix::MailboxError {} +impl ResponseError for actix::MailboxError {} #[cfg(feature = "actors")] /// `InternalServerError` for `actix::ResolverError` /// This is supported on feature=`actors` only -impl ResponseError for actix::actors::resolver::ResolverError {} +impl ResponseError for actix::actors::resolver::ResolverError {} #[cfg(test)] mod tests { diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 4bfcabab8..acbb09960 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -297,7 +297,10 @@ where /// true - got whouldblock /// false - didnt get whouldblock #[pin_project::project] - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result { + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result { if self.write_buf.is_empty() { return Ok(false); } @@ -308,8 +311,7 @@ where let InnerDispatcher { io, write_buf, .. } = self.project(); let mut io = Pin::new(io.as_mut().unwrap()); while written < len { - match io.as_mut().poll_write(cx, &write_buf[written..]) - { + match io.as_mut().poll_write(cx, &write_buf[written..]) { Poll::Ready(Ok(0)) => { return Err(DispatchError::Io(io::Error::new( io::ErrorKind::WriteZero, @@ -359,7 +361,8 @@ where } fn send_continue(self: Pin<&mut Self>) { - self.project().write_buf + self.project() + .write_buf .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); } @@ -376,47 +379,44 @@ where Some(DispatcherMessage::Item(req)) => { Some(self.as_mut().handle_request(req, cx)?) } - Some(DispatcherMessage::Error(res)) => { - Some(self.as_mut().send_response(res, ResponseBody::Other(Body::Empty))?) - } + Some(DispatcherMessage::Error(res)) => Some( + self.as_mut() + .send_response(res, ResponseBody::Other(Body::Empty))?, + ), Some(DispatcherMessage::Upgrade(req)) => { return Ok(PollResponse::Upgrade(req)); } None => None, }, - State::ExpectCall(fut) => { - match fut.poll(cx) { - Poll::Ready(Ok(req)) => { - self.as_mut().send_continue(); - this = self.as_mut().project(); - this.state.set(State::ServiceCall(this.service.call(req))); - continue; - } - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); - Some(self.as_mut().send_response(res, body.into_body())?) - } - Poll::Pending => None, + State::ExpectCall(fut) => match fut.poll(cx) { + Poll::Ready(Ok(req)) => { + self.as_mut().send_continue(); + this = self.as_mut().project(); + this.state.set(State::ServiceCall(this.service.call(req))); + continue; } - } - State::ServiceCall(fut) => { - match fut.poll(cx) { - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - let state = self.as_mut().send_response(res, body)?; - this = self.as_mut().project(); - this.state.set(state); - continue; - } - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); - Some(self.as_mut().send_response(res, body.into_body())?) - } - Poll::Pending => None, + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + Some(self.as_mut().send_response(res, body.into_body())?) } - } + Poll::Pending => None, + }, + State::ServiceCall(fut) => match fut.poll(cx) { + Poll::Ready(Ok(res)) => { + let (res, body) = res.into().replace_body(()); + let state = self.as_mut().send_response(res, body)?; + this = self.as_mut().project(); + this.state.set(state); + continue; + } + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + Some(self.as_mut().send_response(res, body.into_body())?) + } + Poll::Pending => None, + }, State::SendPayload(mut stream) => { loop { if this.write_buf.len() < HW_BUFFER_SIZE { @@ -627,7 +627,10 @@ where } /// keep-alive timer - fn poll_keepalive(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> { + fn poll_keepalive( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { let mut this = self.as_mut().project(); if this.ka_timer.is_none() { // shutdown timeout @@ -738,7 +741,11 @@ where if !inner.write_buf.is_empty() || inner.io.is_none() { Poll::Pending } else { - match Pin::new(inner.project().io).as_pin_mut().unwrap().poll_shutdown(cx) { + match Pin::new(inner.project().io) + .as_pin_mut() + .unwrap() + .poll_shutdown(cx) + { Poll::Ready(res) => { Poll::Ready(res.map_err(DispatchError::from)) } @@ -751,7 +758,11 @@ where let should_disconnect = if !inner.flags.contains(Flags::READ_DISCONNECT) { let mut inner_p = inner.as_mut().project(); - read_available(cx, inner_p.io.as_mut().unwrap(), &mut inner_p.read_buf)? + read_available( + cx, + inner_p.io.as_mut().unwrap(), + &mut inner_p.read_buf, + )? } else { None }; @@ -783,11 +794,18 @@ where std::mem::replace(inner_p.codec, Codec::default()), std::mem::replace(inner_p.read_buf, BytesMut::default()), ); - parts.write_buf = std::mem::replace(inner_p.write_buf, BytesMut::default()); + parts.write_buf = std::mem::replace( + inner_p.write_buf, + BytesMut::default(), + ); let framed = Framed::from_parts(parts); - let upgrade = inner_p.upgrade.take().unwrap().call((req, framed)); - self.as_mut().project().inner.set(DispatcherState::Upgrade(upgrade)); - return self.poll(cx); + let upgrade = + inner_p.upgrade.take().unwrap().call((req, framed)); + self.as_mut() + .project() + .inner + .set(DispatcherState::Upgrade(upgrade)); + return self.poll(cx); } // we didnt get WouldBlock from write operation, @@ -834,12 +852,10 @@ where } } } - DispatcherState::Upgrade(fut) => { - fut.poll(cx).map_err(|e| { - error!("Upgrade handler error: {}", e); - DispatchError::Upgrade - }) - } + DispatcherState::Upgrade(fut) => fut.poll(cx).map_err(|e| { + error!("Upgrade handler error: {}", e); + DispatchError::Upgrade + }), } } } @@ -931,7 +947,10 @@ mod tests { if let DispatcherState::Normal(ref mut inner) = h1.inner { assert!(inner.flags.contains(Flags::READ_DISCONNECT)); - assert_eq!(&inner.io.take().unwrap().write_buf[..26], b"HTTP/1.1 400 Bad Request\r\n"); + assert_eq!( + &inner.io.take().unwrap().write_buf[..26], + b"HTTP/1.1 400 Bad Request\r\n" + ); } }) .await; diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 4b3752ffe..b07764a03 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -255,63 +255,60 @@ where #[project] match this.state.project() { - ServiceResponseState::ServiceCall(call, send) => { - match call.poll(cx) { - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); + ServiceResponseState::ServiceCall(call, send) => match call.poll(cx) { + Poll::Ready(Ok(res)) => { + let (res, body) = res.into().replace_body(()); - let mut send = send.take().unwrap(); - let mut size = body.size(); - let h2_res = - self.as_mut().prepare_response(res.head(), &mut size); - this = self.as_mut().project(); + let mut send = send.take().unwrap(); + let mut size = body.size(); + let h2_res = self.as_mut().prepare_response(res.head(), &mut size); + this = self.as_mut().project(); - let stream = match send.send_response(h2_res, size.is_eof()) { - Err(e) => { - trace!("Error sending h2 response: {:?}", e); - return Poll::Ready(()); - } - Ok(stream) => stream, - }; - - if size.is_eof() { - Poll::Ready(()) - } else { - this.state.set(ServiceResponseState::SendPayload(stream, body)); - self.poll(cx) + let stream = match send.send_response(h2_res, size.is_eof()) { + Err(e) => { + trace!("Error sending h2 response: {:?}", e); + return Poll::Ready(()); } - } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); + Ok(stream) => stream, + }; - let mut send = send.take().unwrap(); - let mut size = body.size(); - let h2_res = - self.as_mut().prepare_response(res.head(), &mut size); - this = self.as_mut().project(); - - let stream = match send.send_response(h2_res, size.is_eof()) { - Err(e) => { - trace!("Error sending h2 response: {:?}", e); - return Poll::Ready(()); - } - Ok(stream) => stream, - }; - - if size.is_eof() { - Poll::Ready(()) - } else { - this.state.set(ServiceResponseState::SendPayload( - stream, - body.into_body(), - )); - self.poll(cx) - } + if size.is_eof() { + Poll::Ready(()) + } else { + this.state + .set(ServiceResponseState::SendPayload(stream, body)); + self.poll(cx) } } - } + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + + let mut send = send.take().unwrap(); + let mut size = body.size(); + let h2_res = self.as_mut().prepare_response(res.head(), &mut size); + this = self.as_mut().project(); + + let stream = match send.send_response(h2_res, size.is_eof()) { + Err(e) => { + trace!("Error sending h2 response: {:?}", e); + return Poll::Ready(()); + } + Ok(stream) => stream, + }; + + if size.is_eof() { + Poll::Ready(()) + } else { + this.state.set(ServiceResponseState::SendPayload( + stream, + body.into_body(), + )); + self.poll(cx) + } + } + }, ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop { loop { if let Some(ref mut buffer) = this.buffer { diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index ff3f69faf..eef5dd02c 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -83,13 +83,11 @@ where Error = DispatchError, InitError = S::InitError, > { - pipeline_factory(fn_factory(|| { - async { - Ok::<_, S::InitError>(fn_service(|io: TcpStream| { - let peer_addr = io.peer_addr().ok(); - ok::<_, DispatchError>((io, peer_addr)) - })) - } + pipeline_factory(fn_factory(|| async { + Ok::<_, S::InitError>(fn_service(|io: TcpStream| { + let peer_addr = io.peer_addr().ok(); + ok::<_, DispatchError>((io, peer_addr)) + })) })) .and_then(self) } diff --git a/actix-http/src/header/shared/httpdate.rs b/actix-http/src/header/shared/httpdate.rs index 5227118fa..81caf6d53 100644 --- a/actix-http/src/header/shared/httpdate.rs +++ b/actix-http/src/header/shared/httpdate.rs @@ -5,7 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use bytes::{buf::BufMutExt, BytesMut}; use http::header::{HeaderValue, InvalidHeaderValue}; -use time::{PrimitiveDateTime, OffsetDateTime, offset}; +use time::{offset, OffsetDateTime, PrimitiveDateTime}; use crate::error::ParseError; use crate::header::IntoHeaderValue; @@ -21,7 +21,7 @@ impl FromStr for HttpDate { fn from_str(s: &str) -> Result { match time_parser::parse_http_date(s) { Some(t) => Ok(HttpDate(t.assume_utc())), - None => Err(ParseError::Header) + None => Err(ParseError::Header), } } } @@ -49,7 +49,14 @@ impl IntoHeaderValue for HttpDate { fn try_into(self) -> Result { let mut wrt = BytesMut::with_capacity(29).writer(); - write!(wrt, "{}", self.0.to_offset(offset!(UTC)).format("%a, %d %b %Y %H:%M:%S GMT")).unwrap(); + write!( + wrt, + "{}", + self.0 + .to_offset(offset!(UTC)) + .format("%a, %d %b %Y %H:%M:%S GMT") + ) + .unwrap(); HeaderValue::from_maybe_shared(wrt.get_mut().split().freeze()) } } @@ -66,14 +73,13 @@ impl From for SystemTime { #[cfg(test)] mod tests { use super::HttpDate; - use time::{PrimitiveDateTime, date, time}; + use time::{date, time, PrimitiveDateTime}; #[test] fn test_date() { - let nov_07 = HttpDate(PrimitiveDateTime::new( - date!(1994-11-07), - time!(8:48:37) - ).assume_utc()); + let nov_07 = HttpDate( + PrimitiveDateTime::new(date!(1994 - 11 - 07), time!(8:48:37)).assume_utc(), + ); assert_eq!( "Sun, 07 Nov 1994 08:48:37 GMT".parse::().unwrap(), diff --git a/actix-http/src/macros.rs b/actix-http/src/macros.rs index 0aaf1abec..b970b14f2 100644 --- a/actix-http/src/macros.rs +++ b/actix-http/src/macros.rs @@ -21,7 +21,7 @@ macro_rules! downcast_get_type_id { { (std::any::TypeId::of::(), PrivateHelper(())) } - } + }; } //Generate implementation for dyn $name diff --git a/actix-http/src/time_parser.rs b/actix-http/src/time_parser.rs index 34fac139e..b5b07ccba 100644 --- a/actix-http/src/time_parser.rs +++ b/actix-http/src/time_parser.rs @@ -1,4 +1,4 @@ -use time::{OffsetDateTime, PrimitiveDateTime, Date}; +use time::{Date, OffsetDateTime, PrimitiveDateTime}; /// Attempt to parse a `time` string as one of either RFC 1123, RFC 850, or asctime. pub fn parse_http_date(time: &str) -> Option { @@ -29,10 +29,10 @@ fn try_parse_rfc_850(time: &str) -> Option { match Date::try_from_ymd(expanded_year, dt.month(), dt.day()) { Ok(date) => Some(PrimitiveDateTime::new(date, dt.time())), - Err(_) => None + Err(_) => None, } } - Err(_) => None + Err(_) => None, } } diff --git a/actix-http/tests/test_openssl.rs b/actix-http/tests/test_openssl.rs index b25f05272..77caa045b 100644 --- a/actix-http/tests/test_openssl.rs +++ b/actix-http/tests/test_openssl.rs @@ -97,11 +97,9 @@ async fn test_h2_body() -> io::Result<()> { let data = "HELLOWORLD".to_owned().repeat(64 * 1024); let mut srv = test_server(move || { HttpService::build() - .h2(|mut req: Request<_>| { - async move { - let body = load_body(req.take_payload()).await?; - Ok::<_, Error>(Response::Ok().body(body)) - } + .h2(|mut req: Request<_>| async move { + let body = load_body(req.take_payload()).await?; + Ok::<_, Error>(Response::Ok().body(body)) }) .openssl(ssl_acceptor()) .map_err(|_| ()) diff --git a/actix-http/tests/test_rustls.rs b/actix-http/tests/test_rustls.rs index bc0c91cc3..933a6c894 100644 --- a/actix-http/tests/test_rustls.rs +++ b/actix-http/tests/test_rustls.rs @@ -104,11 +104,9 @@ async fn test_h2_body1() -> io::Result<()> { let data = "HELLOWORLD".to_owned().repeat(64 * 1024); let mut srv = test_server(move || { HttpService::build() - .h2(|mut req: Request<_>| { - async move { - let body = load_body(req.take_payload()).await?; - Ok::<_, Error>(Response::Ok().body(body)) - } + .h2(|mut req: Request<_>| async move { + let body = load_body(req.take_payload()).await?; + Ok::<_, Error>(Response::Ok().body(body)) }) .rustls(ssl_acceptor()) });