diff --git a/.cargo/config.toml b/.cargo/config.toml index 9d0d9da8..40fe3e57 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,3 @@ [alias] -chk = "hack check --workspace --tests --examples" +chk = "hack check --workspace --all-features --tests --examples" lint = "hack --clean-per-run clippy --workspace --tests --examples" diff --git a/Cargo.toml b/Cargo.toml index 7dd7635c..15f9f5fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ actix-router = "0.2.7" actix-rt = "2.2" actix-server = "2.0.0-beta.3" actix-service = "2.0.0-beta.4" -actix-utils = "3.0.0-beta.2" +actix-utils = "3.0.0-beta.4" actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true } actix-web-codegen = "0.5.0-beta.2" diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index 472bd036..0cc02c6b 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -19,12 +19,12 @@ path = "src/lib.rs" [dependencies] actix-web = { version = "4.0.0-beta.4", default-features = false } actix-service = "2.0.0-beta.4" +actix-utils = "3.0.0-beta.4" askama_escape = "0.10" bitflags = "1" bytes = "1" -futures-core = { version = "0.3.7", default-features = false } -futures-util = { version = "0.3.7", default-features = false } +futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } http-range = "0.1.4" derive_more = "0.99.5" log = "0.4" diff --git a/actix-files/src/files.rs b/actix-files/src/files.rs index 292e3fdf..ff424134 100644 --- a/actix-files/src/files.rs +++ b/actix-files/src/files.rs @@ -1,6 +1,7 @@ use std::{cell::RefCell, fmt, io, path::PathBuf, rc::Rc}; use actix_service::{boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt}; +use actix_utils::future::ok; use actix_web::{ dev::{AppService, HttpServiceFactory, ResourceDef, ServiceRequest, ServiceResponse}, error::Error, @@ -8,7 +9,7 @@ use actix_web::{ http::header::DispositionType, HttpRequest, }; -use futures_util::future::{ok, FutureExt, LocalBoxFuture}; +use futures_core::future::LocalBoxFuture; use crate::{ directory_listing, named, Directory, DirectoryRenderer, FilesService, HttpNewService, @@ -263,18 +264,18 @@ impl ServiceFactory for Files { }; if let Some(ref default) = *self.default.borrow() { - default - .new_service(()) - .map(move |result| match result { + let fut = default.new_service(()); + Box::pin(async { + match fut.await { Ok(default) => { srv.default = Some(default); Ok(srv) } Err(_) => Err(()), - }) - .boxed_local() + } + }) } else { - ok(srv).boxed_local() + Box::pin(ok(srv)) } } } diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index 018079b2..f8583feb 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -65,6 +65,7 @@ mod tests { }; use actix_service::ServiceFactory; + use actix_utils::future::ok; use actix_web::{ guard, http::{ @@ -76,7 +77,6 @@ mod tests { web::{self, Bytes}, App, HttpResponse, Responder, }; - use futures_util::future::ok; use super::*; diff --git a/actix-files/src/path_buf.rs b/actix-files/src/path_buf.rs index dd8e5b50..8a87acd5 100644 --- a/actix-files/src/path_buf.rs +++ b/actix-files/src/path_buf.rs @@ -3,8 +3,8 @@ use std::{ str::FromStr, }; +use actix_utils::future::{ready, Ready}; use actix_web::{dev::Payload, FromRequest, HttpRequest}; -use futures_util::future::{ready, Ready}; use crate::error::UriSegmentError; diff --git a/actix-files/src/service.rs b/actix-files/src/service.rs index 3214963e..d2db8503 100644 --- a/actix-files/src/service.rs +++ b/actix-files/src/service.rs @@ -1,6 +1,7 @@ use std::{fmt, io, path::PathBuf, rc::Rc}; use actix_service::Service; +use actix_utils::future::ok; use actix_web::{ dev::{ServiceRequest, ServiceResponse}, error::Error, @@ -8,7 +9,7 @@ use actix_web::{ http::{header, Method}, HttpResponse, }; -use futures_util::future::{ok, Either, LocalBoxFuture, Ready}; +use futures_core::future::LocalBoxFuture; use crate::{ named, Directory, DirectoryRenderer, FilesError, HttpService, MimeOverride, NamedFile, @@ -29,19 +30,18 @@ pub struct FilesService { pub(crate) hidden_files: bool, } -type FilesServiceFuture = Either< - Ready>, - LocalBoxFuture<'static, Result>, ->; - impl FilesService { - fn handle_err(&self, e: io::Error, req: ServiceRequest) -> FilesServiceFuture { - log::debug!("Failed to handle {}: {}", req.path(), e); + fn handle_err( + &self, + err: io::Error, + req: ServiceRequest, + ) -> LocalBoxFuture<'static, Result> { + log::debug!("error handling {}: {}", req.path(), err); if let Some(ref default) = self.default { - Either::Right(default.call(req)) + Box::pin(default.call(req)) } else { - Either::Left(ok(req.error_response(e))) + Box::pin(ok(req.error_response(err))) } } } @@ -55,7 +55,7 @@ impl fmt::Debug for FilesService { impl Service for FilesService { type Response = ServiceResponse; type Error = Error; - type Future = FilesServiceFuture; + type Future = LocalBoxFuture<'static, Result>; actix_service::always_ready!(); @@ -69,7 +69,7 @@ impl Service for FilesService { }; if !is_method_valid { - return Either::Left(ok(req.into_response( + return Box::pin(ok(req.into_response( actix_web::HttpResponse::MethodNotAllowed() .insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8)) .body("Request did not meet this resource's requirements."), @@ -79,13 +79,13 @@ impl Service for FilesService { let real_path = match PathBufWrap::parse_path(req.match_info().path(), self.hidden_files) { Ok(item) => item, - Err(e) => return Either::Left(ok(req.error_response(e))), + Err(e) => return Box::pin(ok(req.error_response(e))), }; // full file path let path = match self.directory.join(&real_path).canonicalize() { Ok(path) => path, - Err(e) => return self.handle_err(e, req), + Err(err) => return Box::pin(self.handle_err(err, req)), }; if path.is_dir() { @@ -93,7 +93,7 @@ impl Service for FilesService { if self.redirect_to_slash && !req.path().ends_with('/') { let redirect_to = format!("{}/", req.path()); - return Either::Left(ok(req.into_response( + return Box::pin(ok(req.into_response( HttpResponse::Found() .insert_header((header::LOCATION, redirect_to)) .body("") @@ -114,9 +114,9 @@ impl Service for FilesService { let (req, _) = req.into_parts(); let res = named_file.into_response(&req); - Either::Left(ok(ServiceResponse::new(req, res))) + Box::pin(ok(ServiceResponse::new(req, res))) } - Err(e) => self.handle_err(e, req), + Err(err) => self.handle_err(err, req), } } else if self.show_index { let dir = Directory::new(self.directory.clone(), path); @@ -124,12 +124,12 @@ impl Service for FilesService { let (req, _) = req.into_parts(); let x = (self.renderer)(&dir, &req); - match x { - Ok(resp) => Either::Left(ok(resp)), - Err(e) => Either::Left(ok(ServiceResponse::from_err(e, req))), - } + Box::pin(match x { + Ok(resp) => ok(resp), + Err(err) => ok(ServiceResponse::from_err(err, req)), + }) } else { - Either::Left(ok(ServiceResponse::from_err( + Box::pin(ok(ServiceResponse::from_err( FilesError::IsDirectory, req.into_parts().0, ))) @@ -145,9 +145,9 @@ impl Service for FilesService { let (req, _) = req.into_parts(); let res = named_file.into_response(&req); - Either::Left(ok(ServiceResponse::new(req, res))) + Box::pin(ok(ServiceResponse::new(req, res))) } - Err(e) => self.handle_err(e, req), + Err(err) => self.handle_err(err, req), } } } diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index 0e7d57fc..31a83edd 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -32,7 +32,7 @@ openssl = ["tls-openssl", "awc/openssl"] actix-service = "2.0.0-beta.4" actix-codec = "0.4.0-beta.1" actix-tls = "3.0.0-beta.5" -actix-utils = "3.0.0-beta.2" +actix-utils = "3.0.0-beta.4" actix-rt = "2.2" actix-server = "2.0.0-beta.3" awc = { version = "3.0.0-beta.3", default-features = false } diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 2c71031a..3a7b4f02 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -9,8 +9,12 @@ ### Changed * `client::Connector` type now only have one generic type for `actix_service::Service`. [#2063] +### Removed +* `ResponseError` impl for `actix_utils::timeout::TimeoutError`. [#2127] + [#2063]: https://github.com/actix/actix-web/pull/2063 [#2081]: https://github.com/actix/actix-web/pull/2081 +[#2127]: https://github.com/actix/actix-web/pull/2127 ## 3.0.0-beta.4 - 2021-03-08 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 679e8c99..d9af75aa 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -46,7 +46,7 @@ trust-dns = ["trust-dns-resolver"] [dependencies] actix-service = "2.0.0-beta.4" actix-codec = "0.4.0-beta.1" -actix-utils = "3.0.0-beta.2" +actix-utils = "3.0.0-beta.4" actix-rt = "2.2" actix-tls = "3.0.0-beta.5" @@ -65,11 +65,13 @@ http = "0.2.2" httparse = "1.3" itoa = "0.4" language-tags = "0.2" +local-channel = "0.1" once_cell = "1.5" log = "0.4" mime = "0.3" percent-encoding = "2.1" pin-project = "1.0.0" +pin-project-lite = "0.2" rand = "0.8" regex = "1.3" serde = "1.0" diff --git a/actix-http/examples/hello-world.rs b/actix-http/examples/hello-world.rs index a84e9aac..a99ddae4 100644 --- a/actix-http/examples/hello-world.rs +++ b/actix-http/examples/hello-world.rs @@ -2,7 +2,7 @@ use std::{env, io}; use actix_http::{HttpService, Response}; use actix_server::Server; -use futures_util::future; +use actix_utils::future; use http::header::HeaderValue; use log::info; diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index a4d6ba2b..fa43e1b0 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -20,8 +20,9 @@ mod tests { use std::pin::Pin; use actix_rt::pin; + use actix_utils::future::poll_fn; use bytes::{Bytes, BytesMut}; - use futures_util::{future::poll_fn, stream}; + use futures_util::stream; use super::*; diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 8fb08b0c..fa4469d3 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -5,10 +5,11 @@ use std::{ }; use actix_codec::Framed; +use actix_utils::future::poll_fn; use bytes::buf::BufMut; use bytes::{Bytes, BytesMut}; use futures_core::{ready, Stream}; -use futures_util::{future::poll_fn, SinkExt as _}; +use futures_util::SinkExt as _; use crate::error::PayloadError; use crate::h1; diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 437b9ae7..8cb2e252 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -1,7 +1,7 @@ use std::future::Future; +use actix_utils::future::poll_fn; use bytes::Bytes; -use futures_util::future::poll_fn; use h2::{ client::{Builder, Connection, SendRequest}, SendStream, diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 1354e998..0178be80 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -6,9 +6,6 @@ use std::str::Utf8Error; use std::string::FromUtf8Error; use std::{fmt, io, result}; -use actix_codec::{Decoder, Encoder}; -use actix_utils::dispatcher::DispatcherError as FramedDispatcherError; -use actix_utils::timeout::TimeoutError; use bytes::BytesMut; use derive_more::{Display, From}; use http::uri::InvalidUri; @@ -148,19 +145,6 @@ impl From for Error { } } -/// Inspects the underlying enum and returns an appropriate status code. -/// -/// If the variant is [`TimeoutError::Service`], the error code of the service is returned. -/// Otherwise, [`StatusCode::GATEWAY_TIMEOUT`] is returned. -impl ResponseError for TimeoutError { - fn status_code(&self) -> StatusCode { - match self { - TimeoutError::Service(e) => e.status_code(), - TimeoutError::Timeout => StatusCode::GATEWAY_TIMEOUT, - } - } -} - #[derive(Debug, Display)] #[display(fmt = "UnknownError")] struct UnitError; @@ -469,14 +453,6 @@ impl ResponseError for ContentTypeError { } } -impl + Decoder, I> ResponseError for FramedDispatcherError -where - E: fmt::Debug + fmt::Display, - >::Error: fmt::Debug, - ::Error: fmt::Debug, -{ -} - /// Helper type that can wrap any error and generate custom response. /// /// In following example any `io::Error` will be converted into "BAD REQUEST" diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index e5989e5e..bf036569 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -951,7 +951,8 @@ mod tests { use std::str; use actix_service::fn_service; - use futures_util::future::{lazy, ready, Ready}; + use actix_utils::future::{ready, Ready}; + use futures_util::future::lazy; use super::*; use crate::{ diff --git a/actix-http/src/h1/expect.rs b/actix-http/src/h1/expect.rs index 5015069b..bb8e28e9 100644 --- a/actix-http/src/h1/expect.rs +++ b/actix-http/src/h1/expect.rs @@ -1,5 +1,5 @@ use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ready, Ready}; +use actix_utils::future::{ready, Ready}; use crate::error::Error; use crate::request::Request; diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index 32275ac6..e72493fa 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -263,7 +263,7 @@ impl Inner { #[cfg(test)] mod tests { use super::*; - use futures_util::future::poll_fn; + use actix_utils::future::poll_fn; #[actix_rt::test] async fn test_unread_data() { diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index f915bfa4..a98f6bd0 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -6,8 +6,8 @@ use std::{fmt, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; +use actix_utils::future::ready; use futures_core::future::LocalBoxFuture; -use futures_util::future::ready; use crate::body::MessageBody; use crate::config::ServiceConfig; diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index db0b580b..8f202e75 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -10,9 +10,9 @@ use actix_service::{ fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service, ServiceFactory, }; +use actix_utils::future::ready; use bytes::Bytes; use futures_core::{future::LocalBoxFuture, ready}; -use futures_util::future::ready; use h2::server::{handshake, Handshake}; use log::error; diff --git a/actix-http/src/ws/dispatcher.rs b/actix-http/src/ws/dispatcher.rs index 7be7cf63..57685113 100644 --- a/actix-http/src/ws/dispatcher.rs +++ b/actix-http/src/ws/dispatcher.rs @@ -4,7 +4,6 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_service::{IntoService, Service}; -use actix_utils::dispatcher::{Dispatcher as InnerDispatcher, DispatcherError}; use super::{Codec, Frame, Message}; @@ -15,7 +14,7 @@ where T: AsyncRead + AsyncWrite, { #[pin] - inner: InnerDispatcher, + inner: inner::Dispatcher, } impl Dispatcher @@ -27,13 +26,13 @@ where { pub fn new>(io: T, service: F) -> Self { Dispatcher { - inner: InnerDispatcher::new(Framed::new(io, Codec::new()), service), + inner: inner::Dispatcher::new(Framed::new(io, Codec::new()), service), } } pub fn with>(framed: Framed, service: F) -> Self { Dispatcher { - inner: InnerDispatcher::new(framed, service), + inner: inner::Dispatcher::new(framed, service), } } } @@ -45,9 +44,393 @@ where S::Future: 'static, S::Error: 'static, { - type Output = Result<(), DispatcherError>; + type Output = Result<(), inner::DispatcherError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.project().inner.poll(cx) } } + +/// Framed dispatcher service and related utilities. +mod inner { + // allow dead code since this mod was ripped from actix-utils + #![allow(dead_code)] + + use core::{ + fmt, + future::Future, + mem, + pin::Pin, + task::{Context, Poll}, + }; + + use actix_service::{IntoService, Service}; + use futures_core::stream::Stream; + use local_channel::mpsc; + use log::debug; + use pin_project_lite::pin_project; + + use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; + + use crate::ResponseError; + + /// Framed transport errors + pub enum DispatcherError + where + U: Encoder + Decoder, + { + /// Inner service error. + Service(E), + + /// Frame encoding error. + Encoder(>::Error), + + /// Frame decoding error. + Decoder(::Error), + } + + impl From for DispatcherError + where + U: Encoder + Decoder, + { + fn from(err: E) -> Self { + DispatcherError::Service(err) + } + } + + impl fmt::Debug for DispatcherError + where + E: fmt::Debug, + U: Encoder + Decoder, + >::Error: fmt::Debug, + ::Error: fmt::Debug, + { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + DispatcherError::Service(ref e) => { + write!(fmt, "DispatcherError::Service({:?})", e) + } + DispatcherError::Encoder(ref e) => { + write!(fmt, "DispatcherError::Encoder({:?})", e) + } + DispatcherError::Decoder(ref e) => { + write!(fmt, "DispatcherError::Decoder({:?})", e) + } + } + } + } + + impl fmt::Display for DispatcherError + where + E: fmt::Display, + U: Encoder + Decoder, + >::Error: fmt::Debug, + ::Error: fmt::Debug, + { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + DispatcherError::Service(ref e) => write!(fmt, "{}", e), + DispatcherError::Encoder(ref e) => write!(fmt, "{:?}", e), + DispatcherError::Decoder(ref e) => write!(fmt, "{:?}", e), + } + } + } + + impl ResponseError for DispatcherError + where + E: fmt::Debug + fmt::Display, + U: Encoder + Decoder, + >::Error: fmt::Debug, + ::Error: fmt::Debug, + { + } + + /// Message type wrapper for signalling end of message stream. + pub enum Message { + /// Message item. + Item(T), + + /// Signal from service to flush all messages and stop processing. + Close, + } + + pin_project! { + /// A future that reads frames from a [`Framed`] object and passes them to a [`Service`]. + pub struct Dispatcher + where + S: Service<::Item, Response = I>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead, + T: AsyncWrite, + U: Encoder, + U: Decoder, + I: 'static, + >::Error: fmt::Debug, + { + service: S, + state: State, + #[pin] + framed: Framed, + rx: mpsc::Receiver, S::Error>>, + tx: mpsc::Sender, S::Error>>, + } + } + + enum State + where + S: Service<::Item>, + U: Encoder + Decoder, + { + Processing, + Error(DispatcherError), + FramedError(DispatcherError), + FlushAndStop, + Stopping, + } + + impl State + where + S: Service<::Item>, + U: Encoder + Decoder, + { + fn take_error(&mut self) -> DispatcherError { + match mem::replace(self, State::Processing) { + State::Error(err) => err, + _ => panic!(), + } + } + + fn take_framed_error(&mut self) -> DispatcherError { + match mem::replace(self, State::Processing) { + State::FramedError(err) => err, + _ => panic!(), + } + } + } + + impl Dispatcher + where + S: Service<::Item, Response = I>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + I: 'static, + ::Error: fmt::Debug, + >::Error: fmt::Debug, + { + /// Create new `Dispatcher`. + pub fn new(framed: Framed, service: F) -> Self + where + F: IntoService::Item>, + { + let (tx, rx) = mpsc::channel(); + Dispatcher { + framed, + rx, + tx, + service: service.into_service(), + state: State::Processing, + } + } + + /// Construct new `Dispatcher` instance with customer `mpsc::Receiver` + pub fn with_rx( + framed: Framed, + service: F, + rx: mpsc::Receiver, S::Error>>, + ) -> Self + where + F: IntoService::Item>, + { + let tx = rx.sender(); + Dispatcher { + framed, + rx, + tx, + service: service.into_service(), + state: State::Processing, + } + } + + /// Get sender handle. + pub fn tx(&self) -> mpsc::Sender, S::Error>> { + self.tx.clone() + } + + /// Get reference to a service wrapped by `Dispatcher` instance. + pub fn service(&self) -> &S { + &self.service + } + + /// Get mutable reference to a service wrapped by `Dispatcher` instance. + pub fn service_mut(&mut self) -> &mut S { + &mut self.service + } + + /// Get reference to a framed instance wrapped by `Dispatcher` instance. + pub fn framed(&self) -> &Framed { + &self.framed + } + + /// Get mutable reference to a framed instance wrapped by `Dispatcher` instance. + pub fn framed_mut(&mut self) -> &mut Framed { + &mut self.framed + } + + /// Read from framed object. + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool + where + S: Service<::Item, Response = I>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + I: 'static, + >::Error: fmt::Debug, + { + loop { + let this = self.as_mut().project(); + match this.service.poll_ready(cx) { + Poll::Ready(Ok(_)) => { + let item = match this.framed.next_item(cx) { + Poll::Ready(Some(Ok(el))) => el, + Poll::Ready(Some(Err(err))) => { + *this.state = + State::FramedError(DispatcherError::Decoder(err)); + return true; + } + Poll::Pending => return false, + Poll::Ready(None) => { + *this.state = State::Stopping; + return true; + } + }; + + let tx = this.tx.clone(); + let fut = this.service.call(item); + actix_rt::spawn(async move { + let item = fut.await; + let _ = tx.send(item.map(Message::Item)); + }); + } + Poll::Pending => return false, + Poll::Ready(Err(err)) => { + *this.state = State::Error(DispatcherError::Service(err)); + return true; + } + } + } + } + + /// Write to framed object. + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool + where + S: Service<::Item, Response = I>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + I: 'static, + >::Error: fmt::Debug, + { + loop { + let mut this = self.as_mut().project(); + while !this.framed.is_write_buf_full() { + match Pin::new(&mut this.rx).poll_next(cx) { + Poll::Ready(Some(Ok(Message::Item(msg)))) => { + if let Err(err) = this.framed.as_mut().write(msg) { + *this.state = + State::FramedError(DispatcherError::Encoder(err)); + return true; + } + } + Poll::Ready(Some(Ok(Message::Close))) => { + *this.state = State::FlushAndStop; + return true; + } + Poll::Ready(Some(Err(err))) => { + *this.state = State::Error(DispatcherError::Service(err)); + return true; + } + Poll::Ready(None) | Poll::Pending => break, + } + } + + if !this.framed.is_write_buf_empty() { + match this.framed.flush(cx) { + Poll::Pending => break, + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(err)) => { + debug!("Error sending data: {:?}", err); + *this.state = + State::FramedError(DispatcherError::Encoder(err)); + return true; + } + } + } else { + break; + } + } + + false + } + } + + impl Future for Dispatcher + where + S: Service<::Item, Response = I>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + I: 'static, + >::Error: fmt::Debug, + ::Error: fmt::Debug, + { + type Output = Result<(), DispatcherError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let this = self.as_mut().project(); + + return match this.state { + State::Processing => { + if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) { + continue; + } else { + Poll::Pending + } + } + State::Error(_) => { + // flush write buffer + if !this.framed.is_write_buf_empty() + && this.framed.flush(cx).is_pending() + { + return Poll::Pending; + } + Poll::Ready(Err(this.state.take_error())) + } + State::FlushAndStop => { + if !this.framed.is_write_buf_empty() { + this.framed.flush(cx).map(|res| { + if let Err(err) = res { + debug!("Error sending data: {:?}", err); + } + + Ok(()) + }) + } else { + Poll::Ready(Ok(())) + } + } + State::FramedError(_) => { + Poll::Ready(Err(this.state.take_framed_error())) + } + State::Stopping => Poll::Ready(Ok(())), + }; + } + } + } +} diff --git a/actix-http/tests/test_client.rs b/actix-http/tests/test_client.rs index 758e3974..b5f8d54b 100644 --- a/actix-http/tests/test_client.rs +++ b/actix-http/tests/test_client.rs @@ -3,11 +3,9 @@ use actix_http::{ }; use actix_http_test::test_server; use actix_service::ServiceFactoryExt; +use actix_utils::future; use bytes::Bytes; -use futures_util::{ - future::{self, ok}, - StreamExt as _, -}; +use futures_util::StreamExt as _; const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \ @@ -63,7 +61,7 @@ async fn test_h1_v2() { async fn test_connection_close() { let srv = test_server(move || { HttpService::build() - .finish(|_| ok::<_, ()>(Response::Ok().body(STR))) + .finish(|_| future::ok::<_, ()>(Response::Ok().body(STR))) .tcp() .map(|_| ()) }) @@ -79,9 +77,9 @@ async fn test_with_query_parameter() { HttpService::build() .finish(|req: Request| { if req.uri().query().unwrap().contains("qp=") { - ok::<_, ()>(Response::Ok().finish()) + future::ok::<_, ()>(Response::Ok().finish()) } else { - ok::<_, ()>(Response::BadRequest().finish()) + future::ok::<_, ()>(Response::BadRequest().finish()) } }) .tcp() diff --git a/actix-http/tests/test_openssl.rs b/actix-http/tests/test_openssl.rs index 49a68a60..c9cfa7d1 100644 --- a/actix-http/tests/test_openssl.rs +++ b/actix-http/tests/test_openssl.rs @@ -11,12 +11,10 @@ use actix_http::HttpMessage; use actix_http::{body, Error, HttpService, Request, Response}; use actix_http_test::test_server; use actix_service::{fn_service, ServiceFactoryExt}; +use actix_utils::future::{err, ok, ready}; use bytes::{Bytes, BytesMut}; use futures_core::Stream; -use futures_util::{ - future::{err, ok, ready}, - stream::{once, StreamExt as _}, -}; +use futures_util::stream::{once, StreamExt as _}; use openssl::{ pkey::PKey, ssl::{SslAcceptor, SslMethod}, diff --git a/actix-http/tests/test_rustls.rs b/actix-http/tests/test_rustls.rs index 7a3cb147..5b8ba658 100644 --- a/actix-http/tests/test_rustls.rs +++ b/actix-http/tests/test_rustls.rs @@ -8,10 +8,10 @@ use actix_http::http::{Method, StatusCode, Version}; use actix_http::{body, error, Error, HttpService, Request, Response}; use actix_http_test::test_server; use actix_service::{fn_factory_with_config, fn_service}; +use actix_utils::future::{err, ok}; use bytes::{Bytes, BytesMut}; use futures_core::Stream; -use futures_util::future::{self, err, ok}; use futures_util::stream::{once, StreamExt as _}; use rustls::{ internal::pemfile::{certs, pkcs8_private_keys}, @@ -51,7 +51,7 @@ fn tls_config() -> RustlsServerConfig { async fn test_h1() -> io::Result<()> { let srv = test_server(move || { HttpService::build() - .h1(|_| future::ok::<_, Error>(Response::Ok().finish())) + .h1(|_| ok::<_, Error>(Response::Ok().finish())) .rustls(tls_config()) }) .await; @@ -65,7 +65,7 @@ async fn test_h1() -> io::Result<()> { async fn test_h2() -> io::Result<()> { let srv = test_server(move || { HttpService::build() - .h2(|_| future::ok::<_, Error>(Response::Ok().finish())) + .h2(|_| ok::<_, Error>(Response::Ok().finish())) .rustls(tls_config()) }) .await; @@ -82,7 +82,7 @@ async fn test_h1_1() -> io::Result<()> { .h1(|req: Request| { assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_11); - future::ok::<_, Error>(Response::Ok().finish()) + ok::<_, Error>(Response::Ok().finish()) }) .rustls(tls_config()) }) @@ -100,7 +100,7 @@ async fn test_h2_1() -> io::Result<()> { .finish(|req: Request| { assert!(req.peer_addr().is_some()); assert_eq!(req.version(), Version::HTTP_2); - future::ok::<_, Error>(Response::Ok().finish()) + ok::<_, Error>(Response::Ok().finish()) }) .rustls(tls_config()) }) @@ -144,7 +144,7 @@ async fn test_h2_content_length() { StatusCode::OK, StatusCode::NOT_FOUND, ]; - future::ok::<_, ()>(Response::new(statuses[indx])) + ok::<_, ()>(Response::new(statuses[indx])) }) .rustls(tls_config()) }) @@ -213,7 +213,7 @@ async fn test_h2_headers() { TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ", )); } - future::ok::<_, ()>(config.body(data.clone())) + ok::<_, ()>(config.body(data.clone())) }) .rustls(tls_config()) }).await; @@ -252,7 +252,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ async fn test_h2_body2() { let mut srv = test_server(move || { HttpService::build() - .h2(|_| future::ok::<_, ()>(Response::Ok().body(STR))) + .h2(|_| ok::<_, ()>(Response::Ok().body(STR))) .rustls(tls_config()) }) .await; diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index 6d145400..9084a597 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -5,9 +5,10 @@ use std::{net, thread}; use actix_http_test::test_server; use actix_rt::time::sleep; use actix_service::fn_service; +use actix_utils::future::{err, ok, ready}; use bytes::Bytes; -use futures_util::future::{self, err, ok, ready, FutureExt}; use futures_util::stream::{once, StreamExt as _}; +use futures_util::FutureExt as _; use regex::Regex; use actix_http::HttpMessage; @@ -24,7 +25,7 @@ async fn test_h1() { .client_disconnect(1000) .h1(|req: Request| { assert!(req.peer_addr().is_some()); - future::ok::<_, ()>(Response::Ok().finish()) + ok::<_, ()>(Response::Ok().finish()) }) .tcp() }) @@ -44,7 +45,7 @@ async fn test_h1_2() { .finish(|req: Request| { assert!(req.peer_addr().is_some()); assert_eq!(req.version(), http::Version::HTTP_11); - future::ok::<_, ()>(Response::Ok().finish()) + ok::<_, ()>(Response::Ok().finish()) }) .tcp() }) @@ -65,7 +66,7 @@ async fn test_expect_continue() { err(error::ErrorPreconditionFailed("error")) } })) - .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .finish(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -96,7 +97,7 @@ async fn test_expect_continue_h1() { } }) })) - .h1(fn_service(|_| future::ok::<_, ()>(Response::Ok().finish()))) + .h1(fn_service(|_| ok::<_, ()>(Response::Ok().finish()))) .tcp() }) .await; @@ -175,7 +176,7 @@ async fn test_slow_request() { let srv = test_server(|| { HttpService::build() .client_timeout(100) - .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .finish(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -191,7 +192,7 @@ async fn test_slow_request() { async fn test_http1_malformed_request() { let srv = test_server(|| { HttpService::build() - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -207,7 +208,7 @@ async fn test_http1_malformed_request() { async fn test_http1_keepalive() { let srv = test_server(|| { HttpService::build() - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -229,7 +230,7 @@ async fn test_http1_keepalive_timeout() { let srv = test_server(|| { HttpService::build() .keep_alive(1) - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -250,7 +251,7 @@ async fn test_http1_keepalive_timeout() { async fn test_http1_keepalive_close() { let srv = test_server(|| { HttpService::build() - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -271,7 +272,7 @@ async fn test_http1_keepalive_close() { async fn test_http10_keepalive_default_close() { let srv = test_server(|| { HttpService::build() - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -291,7 +292,7 @@ async fn test_http10_keepalive_default_close() { async fn test_http10_keepalive() { let srv = test_server(|| { HttpService::build() - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -319,7 +320,7 @@ async fn test_http1_keepalive_disabled() { let srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(|_| ok::<_, ()>(Response::Ok().finish())) .tcp() }) .await; @@ -354,7 +355,7 @@ async fn test_content_length() { StatusCode::OK, StatusCode::NOT_FOUND, ]; - future::ok::<_, ()>(Response::new(statuses[indx])) + ok::<_, ()>(Response::new(statuses[indx])) }) .tcp() }) @@ -409,7 +410,7 @@ async fn test_h1_headers() { TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ", )); } - future::ok::<_, ()>(builder.body(data.clone())) + ok::<_, ()>(builder.body(data.clone())) }).tcp() }).await; @@ -645,7 +646,7 @@ async fn test_h1_response_http_error_handling() { async fn test_h1_service_error() { let mut srv = test_server(|| { HttpService::build() - .h1(|_| future::err::(error::ErrorBadRequest("error"))) + .h1(|_| err::(error::ErrorBadRequest("error"))) .tcp() }) .await; @@ -667,7 +668,7 @@ async fn test_h1_on_connect() { }) .h1(|req: Request| { assert!(req.extensions().contains::()); - future::ok::<_, ()>(Response::Ok().finish()) + ok::<_, ()>(Response::Ok().finish()) }) .tcp() }) diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index 51238215..9a2e5771 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -9,11 +9,12 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::{body, h1, ws, Error, HttpService, Request, Response}; use actix_http_test::test_server; use actix_service::{fn_factory, Service}; -use actix_utils::dispatcher::Dispatcher; +use actix_utils::future; use bytes::Bytes; -use futures_util::future; use futures_util::{SinkExt as _, StreamExt as _}; +use crate::ws::Dispatcher; + struct WsService(Arc, Cell)>>); impl WsService { @@ -58,7 +59,7 @@ where .await .unwrap(); - Dispatcher::new(framed.replace_codec(ws::Codec::new()), service) + Dispatcher::with(framed.replace_codec(ws::Codec::new()), service) .await .map_err(|_| panic!()) }; diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 607e9084..cb00a0e7 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -17,12 +17,14 @@ path = "src/lib.rs" [dependencies] actix-web = { version = "4.0.0-beta.4", default-features = false } -actix-utils = "3.0.0-beta.2" +actix-utils = "3.0.0-beta.4" bytes = "1" derive_more = "0.99.5" -httparse = "1.3" +futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } +httparse = "1.3" +local-waker = "0.1" log = "0.4" mime = "0.3" twoway = "0.2" diff --git a/actix-multipart/src/extractor.rs b/actix-multipart/src/extractor.rs index bffbe8a1..c87f8cc2 100644 --- a/actix-multipart/src/extractor.rs +++ b/actix-multipart/src/extractor.rs @@ -1,21 +1,22 @@ //! Multipart payload support + +use actix_utils::future::{ready, Ready}; use actix_web::{dev::Payload, Error, FromRequest, HttpRequest}; -use futures_util::future::{ok, Ready}; use crate::server::Multipart; -/// Get request's payload as multipart stream +/// Get request's payload as multipart stream. /// /// Content-type: multipart/form-data; /// /// ## Server example /// /// ``` -/// use futures_util::stream::{Stream, StreamExt}; /// use actix_web::{web, HttpResponse, Error}; -/// use actix_multipart as mp; +/// use actix_multipart::Multipart; +/// use futures_util::stream::StreamExt as _; /// -/// async fn index(mut payload: mp::Multipart) -> Result { +/// async fn index(mut payload: Multipart) -> Result { /// // iterate over multipart stream /// while let Some(item) = payload.next().await { /// let mut field = item?; @@ -25,9 +26,9 @@ use crate::server::Multipart; /// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk?)); /// } /// } +/// /// Ok(HttpResponse::Ok().into()) /// } -/// # fn main() {} /// ``` impl FromRequest for Multipart { type Error = Error; @@ -36,9 +37,9 @@ impl FromRequest for Multipart { #[inline] fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - ok(match Multipart::boundary(req.headers()) { + ready(Ok(match Multipart::boundary(req.headers()) { Ok(boundary) => Multipart::from_boundary(boundary, payload.take()), Err(err) => Multipart::from_error(err), - }) + })) } } diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index d9ff3d57..b7d25153 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -1,4 +1,4 @@ -//! Multipart payload support +//! Multipart response payload support. use std::cell::{Cell, RefCell, RefMut}; use std::convert::TryFrom; @@ -8,12 +8,12 @@ use std::rc::Rc; use std::task::{Context, Poll}; use std::{cmp, fmt}; -use bytes::{Bytes, BytesMut}; -use futures_util::stream::{LocalBoxStream, Stream, StreamExt}; - -use actix_utils::task::LocalWaker; use actix_web::error::{ParseError, PayloadError}; use actix_web::http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue}; +use bytes::{Bytes, BytesMut}; +use futures_core::stream::{LocalBoxStream, Stream}; +use futures_util::stream::StreamExt as _; +use local_waker::LocalWaker; use crate::error::MultipartError; diff --git a/actix-web-actors/tests/test_ws.rs b/actix-web-actors/tests/test_ws.rs index 7999beed..4ffedb2e 100644 --- a/actix-web-actors/tests/test_ws.rs +++ b/actix-web-actors/tests/test_ws.rs @@ -5,7 +5,7 @@ use actix_web::{ }; use actix_web_actors::*; use bytes::Bytes; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{SinkExt as _, StreamExt as _}; struct Ws; diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index fdfb9f6b..a513b820 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -21,6 +21,7 @@ proc-macro2 = "1" [dev-dependencies] actix-rt = "2.2" actix-web = "4.0.0-beta.4" -futures-util = { version = "0.3.7", default-features = false } +actix-utils = "3.0.0-beta.4" +futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } trybuild = "1" rustversion = "1" diff --git a/actix-web-codegen/tests/test_macro.rs b/actix-web-codegen/tests/test_macro.rs index 0cbb64ba..4e06e15e 100644 --- a/actix-web-codegen/tests/test_macro.rs +++ b/actix-web-codegen/tests/test_macro.rs @@ -1,11 +1,12 @@ use std::future::Future; use std::task::{Context, Poll}; +use actix_utils::future; use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; use actix_web::http::header::{HeaderName, HeaderValue}; use actix_web::{http, test, web::Path, App, Error, HttpResponse, Responder}; use actix_web_codegen::{connect, delete, get, head, options, patch, post, put, route, trace}; -use futures_util::future::{self, LocalBoxFuture}; +use futures_core::future::LocalBoxFuture; // Make sure that we can name function as 'config' #[get("/config")] diff --git a/awc/Cargo.toml b/awc/Cargo.toml index cc684160..d0e781e5 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -69,7 +69,7 @@ tls-rustls = { version = "0.19.0", package = "rustls", optional = true, features actix-web = { version = "4.0.0-beta.4", features = ["openssl"] } actix-http = { version = "3.0.0-beta.4", features = ["openssl"] } actix-http-test = { version = "3.0.0-beta.3", features = ["openssl"] } -actix-utils = "3.0.0-beta.1" +actix-utils = "3.0.0-beta.4" actix-server = "2.0.0-beta.3" actix-tls = { version = "3.0.0-beta.5", features = ["openssl", "rustls"] } diff --git a/awc/src/ws.rs b/awc/src/ws.rs index df25b728..8458d3e3 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -6,7 +6,7 @@ //! //! ```no_run //! use awc::{Client, ws}; -//! use futures_util::{sink::SinkExt, stream::StreamExt}; +//! use futures_util::{sink::SinkExt as _, stream::StreamExt as _}; //! //! #[actix_rt::main] //! async fn main() { diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index 50d2b5ea..9682bc25 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -5,12 +5,13 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use actix_utils::future::ok; use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use flate2::Compression; -use futures_util::{future::ok, stream}; +use futures_util::stream; use rand::Rng; use actix_http::{ @@ -159,7 +160,7 @@ async fn test_timeout_override() { #[actix_rt::test] async fn test_response_timeout() { - use futures_util::stream::{once, StreamExt}; + use futures_util::stream::{once, StreamExt as _}; let srv = test::start(|| { App::new().service(web::resource("/").route(web::to(|| async { diff --git a/awc/tests/test_rustls_client.rs b/awc/tests/test_rustls_client.rs index 464edfe8..080eaf79 100644 --- a/awc/tests/test_rustls_client.rs +++ b/awc/tests/test_rustls_client.rs @@ -13,8 +13,8 @@ use std::{ use actix_http::HttpService; use actix_http_test::test_server; use actix_service::{map_config, pipeline_factory, ServiceFactoryExt}; +use actix_utils::future::ok; use actix_web::{dev::AppConfig, http::Version, web, App, HttpResponse}; -use futures_util::future::ok; use rustls::internal::pemfile::{certs, pkcs8_private_keys}; use rustls::{ClientConfig, NoClientAuth, ServerConfig}; diff --git a/awc/tests/test_ssl_client.rs b/awc/tests/test_ssl_client.rs index 3079aaf5..50222340 100644 --- a/awc/tests/test_ssl_client.rs +++ b/awc/tests/test_ssl_client.rs @@ -8,9 +8,9 @@ use std::sync::Arc; use actix_http::HttpService; use actix_http_test::test_server; use actix_service::{map_config, pipeline_factory, ServiceFactoryExt}; +use actix_utils::future::ok; use actix_web::http::Version; use actix_web::{dev::AppConfig, web, App, HttpResponse}; -use futures_util::future::ok; use openssl::{ pkey::PKey, ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode}, diff --git a/awc/tests/test_ws.rs b/awc/tests/test_ws.rs index 1b3f780d..3f19ac4e 100644 --- a/awc/tests/test_ws.rs +++ b/awc/tests/test_ws.rs @@ -3,9 +3,9 @@ use std::io; use actix_codec::Framed; use actix_http::{body::BodySize, h1, ws, Error, HttpService, Request, Response}; use actix_http_test::test_server; +use actix_utils::future::ok; use bytes::Bytes; -use futures_util::future::ok; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{SinkExt as _, StreamExt as _}; async fn ws_service(req: ws::Frame) -> Result { match req { diff --git a/benches/responder.rs b/benches/responder.rs index 8cfdbd3e..0dfc8cd1 100644 --- a/benches/responder.rs +++ b/benches/responder.rs @@ -1,12 +1,12 @@ -use std::future::Future; -use std::time::Instant; +use std::{future::Future, time::Instant}; use actix_http::Response; +use actix_utils::future::{ready, Ready}; use actix_web::http::StatusCode; use actix_web::test::TestRequest; use actix_web::{error, Error, HttpRequest, HttpResponse, Responder}; use criterion::{criterion_group, criterion_main, Criterion}; -use futures_util::future::{ready, Either, Ready}; +use futures_util::future::{join_all, Either}; // responder simulate the old responder trait. trait FutureResponder { @@ -79,7 +79,7 @@ fn future_responder(c: &mut Criterion) { .await }); - let futs = futures_util::future::join_all(futs); + let futs = join_all(futs); let start = Instant::now(); diff --git a/docs/graphs/net-only.dot b/docs/graphs/net-only.dot index 84227cdb..bee0185a 100644 --- a/docs/graphs/net-only.dot +++ b/docs/graphs/net-only.dot @@ -1,19 +1,37 @@ digraph { + rankdir=TB + subgraph cluster_net { - label="actix/actix-net"; + label="actix-net" "actix-codec" "actix-macros" "actix-rt" "actix-server" "actix-service" "actix-tls" "actix-tracing" "actix-utils" "actix-router" - "local-channel" "local-waker" + } + + subgraph cluster_other { + label="other actix owned crates" + { rank=same; "local-channel" "local-waker" "bytestring" } } - "actix-codec" -> { "actix-rt" "actix-service" "local-channel" "tokio" } + subgraph cluster_tokio { + label="tokio" + "tokio" "tokio-util" + } + + "actix-codec" -> { "tokio" } + "actix-codec" -> { "tokio-util" }[color=red] "actix-utils" -> { "local-waker" } "actix-tracing" -> { "actix-service" } "actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" } - "actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" "tokio" } + "actix-tls" -> { "tokio-util" }[color="#009900"] + "actix-server" -> { "actix-service" "actix-rt" "actix-utils" "tokio" } "actix-rt" -> { "actix-macros" "tokio" } + "actix-router" -> { "bytestring" } "local-channel" -> { "local-waker" } - "tokio" [fontcolor = darkgreen] + // invisible edges to force nicer layout + edge [style=invis] + "actix-macros" -> "tokio" + "actix-service" -> "bytestring" + "actix-macros" -> "bytestring" } diff --git a/src/app.rs b/src/app.rs index f2c6bce8..357d45ee 100644 --- a/src/app.rs +++ b/src/app.rs @@ -10,7 +10,7 @@ use actix_service::boxed::{self, BoxServiceFactory}; use actix_service::{ apply, apply_fn_factory, IntoServiceFactory, ServiceFactory, ServiceFactoryExt, Transform, }; -use futures_util::future::FutureExt; +use futures_util::future::FutureExt as _; use crate::app_service::{AppEntry, AppInit, AppRoutingFactory}; use crate::config::ServiceConfig; @@ -465,8 +465,8 @@ where #[cfg(test)] mod tests { use actix_service::Service; + use actix_utils::future::{err, ok}; use bytes::Bytes; - use futures_util::future::{err, ok}; use super::*; use crate::http::{header, HeaderValue, Method, StatusCode}; diff --git a/src/data.rs b/src/data.rs index 56ecdb8a..3bc54a46 100644 --- a/src/data.rs +++ b/src/data.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use actix_http::error::{Error, ErrorInternalServerError}; use actix_http::Extensions; -use futures_util::future::{err, ok, LocalBoxFuture, Ready}; +use actix_utils::future::{err, ok, Ready}; +use futures_core::future::LocalBoxFuture; use serde::Serialize; use crate::dev::Payload; @@ -147,10 +148,10 @@ impl DataFactory for Data { #[cfg(test)] mod tests { - use actix_service::Service; use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; + use crate::dev::Service; use crate::http::StatusCode; use crate::test::{self, init_service, TestRequest}; use crate::{web, App, HttpResponse}; diff --git a/src/extract.rs b/src/extract.rs index 8851481e..80f2384a 100644 --- a/src/extract.rs +++ b/src/extract.rs @@ -6,10 +6,8 @@ use std::{ task::{Context, Poll}, }; -use futures_util::{ - future::{ready, Ready}, - ready, -}; +use actix_utils::future::{ready, Ready}; +use futures_core::ready; use crate::{dev::Payload, Error, HttpRequest}; diff --git a/src/handler.rs b/src/handler.rs index 7e3c5f47..e005a96a 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -5,8 +5,8 @@ use std::task::{Context, Poll}; use actix_http::{Error, Response}; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ready, Ready}; -use futures_util::ready; +use actix_utils::future::{ready, Ready}; +use futures_core::ready; use pin_project::pin_project; use crate::extract::FromRequest; diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index a397bccd..6a56e6de 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -16,8 +16,8 @@ use actix_http::{ Error, }; use actix_service::{Service, Transform}; +use actix_utils::future::{ok, Ready}; use futures_core::ready; -use futures_util::future::{ok, Ready}; use pin_project::pin_project; use crate::{ diff --git a/src/middleware/condition.rs b/src/middleware/condition.rs index dd599a0c..d1ba7ee4 100644 --- a/src/middleware/condition.rs +++ b/src/middleware/condition.rs @@ -3,7 +3,9 @@ use std::task::{Context, Poll}; use actix_service::{Service, Transform}; -use futures_util::future::{Either, FutureExt, LocalBoxFuture}; +use actix_utils::future::Either; +use futures_core::future::LocalBoxFuture; +use futures_util::future::FutureExt as _; /// Middleware for conditionally enabling other middleware. /// @@ -85,8 +87,8 @@ where fn call(&self, req: Req) -> Self::Future { match self { - ConditionMiddleware::Enable(service) => Either::Left(service.call(req)), - ConditionMiddleware::Disable(service) => Either::Right(service.call(req)), + ConditionMiddleware::Enable(service) => Either::left(service.call(req)), + ConditionMiddleware::Disable(service) => Either::right(service.call(req)), } } } @@ -94,7 +96,7 @@ where #[cfg(test)] mod tests { use actix_service::IntoService; - use futures_util::future::ok; + use actix_utils::future::ok; use super::*; use crate::{ diff --git a/src/middleware/default_headers.rs b/src/middleware/default_headers.rs index 12d70ab2..d8a947aa 100644 --- a/src/middleware/default_headers.rs +++ b/src/middleware/default_headers.rs @@ -9,10 +9,8 @@ use std::{ task::{Context, Poll}, }; -use futures_util::{ - future::{ready, Ready}, - ready, -}; +use actix_utils::future::{ready, Ready}; +use futures_core::ready; use crate::{ dev::{Service, Transform}, @@ -188,7 +186,7 @@ where #[cfg(test)] mod tests { use actix_service::IntoService; - use futures_util::future::ok; + use actix_utils::future::ok; use super::*; use crate::{ diff --git a/src/middleware/err_handlers.rs b/src/middleware/err_handlers.rs index fddd87a9..88834f8c 100644 --- a/src/middleware/err_handlers.rs +++ b/src/middleware/err_handlers.rs @@ -175,7 +175,8 @@ where #[cfg(test)] mod tests { use actix_service::IntoService; - use futures_util::future::{ok, FutureExt}; + use actix_utils::future::ok; + use futures_util::future::FutureExt as _; use super::*; use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode}; diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index 8f539175..3fd37211 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -13,8 +13,9 @@ use std::{ }; use actix_service::{Service, Transform}; +use actix_utils::future::{ok, Ready}; use bytes::Bytes; -use futures_util::future::{ok, Ready}; +use futures_core::ready; use log::{debug, warn}; use regex::{Regex, RegexSet}; use time::OffsetDateTime; @@ -269,7 +270,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let res = match futures_util::ready!(this.fut.poll(cx)) { + let res = match ready!(this.fut.poll(cx)) { Ok(res) => res, Err(e) => return Poll::Ready(Err(e)), }; @@ -588,7 +589,7 @@ impl<'a> fmt::Display for FormatDisplay<'a> { #[cfg(test)] mod tests { use actix_service::{IntoService, Service, Transform}; - use futures_util::future::ok; + use actix_utils::future::ok; use super::*; use crate::http::{header, StatusCode}; diff --git a/src/middleware/normalize.rs b/src/middleware/normalize.rs index 2a97a047..ec6c2a34 100644 --- a/src/middleware/normalize.rs +++ b/src/middleware/normalize.rs @@ -2,8 +2,8 @@ use actix_http::http::{PathAndQuery, Uri}; use actix_service::{Service, Transform}; +use actix_utils::future::{ready, Ready}; use bytes::Bytes; -use futures_util::future::{ready, Ready}; use regex::Regex; use crate::{ diff --git a/src/request.rs b/src/request.rs index 15c97345..3fdbb13e 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,7 +5,7 @@ use std::{fmt, net}; use actix_http::http::{HeaderMap, Method, Uri, Version}; use actix_http::{Error, Extensions, HttpMessage, Message, Payload, RequestHead}; use actix_router::{Path, Url}; -use futures_util::future::{ok, Ready}; +use actix_utils::future::{ok, Ready}; use smallvec::SmallVec; use crate::app_service::AppInitServiceState; diff --git a/src/request_data.rs b/src/request_data.rs index fc711d01..60471cbf 100644 --- a/src/request_data.rs +++ b/src/request_data.rs @@ -1,7 +1,7 @@ use std::{any::type_name, ops::Deref}; use actix_http::error::{Error, ErrorInternalServerError}; -use futures_util::future; +use actix_utils::future::{err, ok, Ready}; use crate::{dev::Payload, FromRequest, HttpRequest}; @@ -67,11 +67,11 @@ impl Deref for ReqData { impl FromRequest for ReqData { type Config = (); type Error = Error; - type Future = future::Ready>; + type Future = Ready>; fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { if let Some(st) = req.extensions().get::() { - future::ok(ReqData(st.clone())) + ok(ReqData(st.clone())) } else { log::debug!( "Failed to construct App-level ReqData extractor. \ @@ -79,7 +79,7 @@ impl FromRequest for ReqData { req.path(), type_name::(), ); - future::err(ErrorInternalServerError( + err(ErrorInternalServerError( "Missing expected request extension data", )) } diff --git a/src/resource.rs b/src/resource.rs index 8f356c76..e868bb54 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -519,7 +519,7 @@ mod tests { use actix_rt::time::sleep; use actix_service::Service; - use futures_util::future::ok; + use actix_utils::future::ok; use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::middleware::DefaultHeaders; diff --git a/src/scope.rs b/src/scope.rs index 693d6860..3be6adb0 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -575,8 +575,8 @@ impl ServiceFactory for ScopeEndpoint { #[cfg(test)] mod tests { use actix_service::Service; + use actix_utils::future::ok; use bytes::Bytes; - use futures_util::future::ok; use crate::dev::{Body, ResponseBody}; use crate::http::{header, HeaderValue, Method, StatusCode}; diff --git a/src/service.rs b/src/service.rs index 32f152f7..570d88e7 100644 --- a/src/service.rs +++ b/src/service.rs @@ -602,7 +602,7 @@ mod tests { use crate::test::{init_service, TestRequest}; use crate::{guard, http, web, App, HttpResponse}; use actix_service::Service; - use futures_util::future::ok; + use actix_utils::future::ok; #[actix_rt::test] async fn test_service() { diff --git a/src/test.rs b/src/test.rs index 18bf89cd..8ba01944 100644 --- a/src/test.rs +++ b/src/test.rs @@ -15,12 +15,12 @@ use actix_http::{ws, Extensions, HttpService, Request}; use actix_router::{Path, ResourceDef, Url}; use actix_rt::{time::sleep, System}; use actix_service::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory}; +use actix_utils::future::ok; use awc::error::PayloadError; use awc::{Client, ClientRequest, ClientResponse, Connector}; use bytes::{Bytes, BytesMut}; use futures_core::Stream; -use futures_util::future::ok; -use futures_util::StreamExt; +use futures_util::StreamExt as _; use serde::de::DeserializeOwned; use serde::Serialize; use socket2::{Domain, Protocol, Socket, Type}; diff --git a/src/types/either.rs b/src/types/either.rs index bbab48de..210495e4 100644 --- a/src/types/either.rs +++ b/src/types/either.rs @@ -1,7 +1,8 @@ //! For either helper, see [`Either`]. use bytes::Bytes; -use futures_util::{future::LocalBoxFuture, FutureExt, TryFutureExt}; +use futures_core::future::LocalBoxFuture; +use futures_util::{FutureExt as _, TryFutureExt as _}; use crate::{ dev, diff --git a/src/types/form.rs b/src/types/form.rs index 57a742e3..0985bd94 100644 --- a/src/types/form.rs +++ b/src/types/form.rs @@ -12,10 +12,8 @@ use std::{ use actix_http::Payload; use bytes::BytesMut; use encoding_rs::{Encoding, UTF_8}; -use futures_util::{ - future::{FutureExt, LocalBoxFuture}, - StreamExt, -}; +use futures_core::future::LocalBoxFuture; +use futures_util::{FutureExt as _, StreamExt as _}; use serde::{de::DeserializeOwned, Serialize}; #[cfg(feature = "compress")] diff --git a/src/types/json.rs b/src/types/json.rs index d8ce3cb7..068dfeb2 100644 --- a/src/types/json.rs +++ b/src/types/json.rs @@ -11,7 +11,7 @@ use std::{ }; use bytes::BytesMut; -use futures_util::{ready, stream::Stream}; +use futures_core::{ready, stream::Stream as _}; use serde::{de::DeserializeOwned, Serialize}; use actix_http::Payload; diff --git a/src/types/path.rs b/src/types/path.rs index 294df6cf..90ee5296 100644 --- a/src/types/path.rs +++ b/src/types/path.rs @@ -4,7 +4,7 @@ use std::{fmt, ops, sync::Arc}; use actix_http::error::{Error, ErrorNotFound}; use actix_router::PathDeserializer; -use futures_util::future::{ready, Ready}; +use actix_utils::future::{ready, Ready}; use serde::de; use crate::{dev::Payload, error::PathError, FromRequest, HttpRequest}; diff --git a/src/types/payload.rs b/src/types/payload.rs index 781347b8..f8880085 100644 --- a/src/types/payload.rs +++ b/src/types/payload.rs @@ -8,13 +8,10 @@ use std::{ }; use actix_http::error::{ErrorBadRequest, PayloadError}; +use actix_utils::future::{ready, Either, Ready}; use bytes::{Bytes, BytesMut}; use encoding_rs::{Encoding, UTF_8}; -use futures_core::stream::Stream; -use futures_util::{ - future::{ready, Either, ErrInto, Ready, TryFutureExt as _}, - ready, -}; +use futures_core::{ready, stream::Stream}; use mime::Mime; use crate::{dev, http::header, web, Error, FromRequest, HttpMessage, HttpRequest}; @@ -26,7 +23,7 @@ use crate::{dev, http::header, web, Error, FromRequest, HttpMessage, HttpRequest /// # Examples /// ``` /// use std::future::Future; -/// use futures_util::stream::{Stream, StreamExt}; +/// use futures_util::stream::StreamExt as _; /// use actix_web::{post, web}; /// /// // `body: web::Payload` parameter extracts raw payload stream from request @@ -91,7 +88,7 @@ impl FromRequest for Payload { impl FromRequest for Bytes { type Config = PayloadConfig; type Error = Error; - type Future = Either, Ready>>; + type Future = Either>>; #[inline] fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future { @@ -99,12 +96,25 @@ impl FromRequest for Bytes { let cfg = PayloadConfig::from_req(req); if let Err(err) = cfg.check_mimetype(req) { - return Either::Right(ready(Err(err))); + return Either::right(ready(Err(err))); } - let limit = cfg.limit; - let fut = HttpMessageBody::new(req, payload).limit(limit); - Either::Left(fut.err_into()) + Either::left(BytesExtractFut { + body_fut: HttpMessageBody::new(req, payload).limit(cfg.limit), + }) + } +} + +/// Future for `Bytes` extractor. +pub struct BytesExtractFut { + body_fut: HttpMessageBody, +} + +impl<'a> Future for BytesExtractFut { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.body_fut).poll(cx).map_err(Into::into) } } @@ -135,21 +145,22 @@ impl FromRequest for String { // check content-type if let Err(err) = cfg.check_mimetype(req) { - return Either::Right(ready(Err(err))); + return Either::right(ready(Err(err))); } // check charset let encoding = match req.encoding() { Ok(enc) => enc, - Err(err) => return Either::Right(ready(Err(err.into()))), + Err(err) => return Either::right(ready(Err(err.into()))), }; let limit = cfg.limit; let body_fut = HttpMessageBody::new(req, payload).limit(limit); - Either::Left(StringExtractFut { body_fut, encoding }) + Either::left(StringExtractFut { body_fut, encoding }) } } +/// Future for `String` extractor. pub struct StringExtractFut { body_fut: HttpMessageBody, encoding: &'static Encoding, diff --git a/src/types/query.rs b/src/types/query.rs index 79af3258..b6c025ef 100644 --- a/src/types/query.rs +++ b/src/types/query.rs @@ -2,7 +2,7 @@ use std::{fmt, ops, sync::Arc}; -use futures_util::future::{err, ok, Ready}; +use actix_utils::future::{err, ok, Ready}; use serde::de; use crate::{dev::Payload, error::QueryPayloadError, Error, FromRequest, HttpRequest}; diff --git a/src/types/readlines.rs b/src/types/readlines.rs index b8bdcc50..6c456e21 100644 --- a/src/types/readlines.rs +++ b/src/types/readlines.rs @@ -177,7 +177,7 @@ where #[cfg(test)] mod tests { - use futures_util::stream::StreamExt; + use futures_util::stream::StreamExt as _; use super::*; use crate::test::TestRequest; diff --git a/tests/test_server.rs b/tests/test_server.rs index b35af657..db9fe37d 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -20,7 +20,7 @@ use flate2::{ write::{GzEncoder, ZlibDecoder, ZlibEncoder}, Compression, }; -use futures_util::ready; +use futures_core::ready; #[cfg(feature = "openssl")] use openssl::{ pkey::PKey,