diff --git a/Cargo.toml b/Cargo.toml index 54e4b2374..ab812d1b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,19 +28,20 @@ path = "src/lib.rs" [workspace] members = [ - ".", - "awc", - "actix-http", - "actix-cors", - "actix-files", - "actix-framed", - "actix-session", - "actix-identity", - "actix-multipart", - "actix-web-actors", - "actix-web-codegen", - "test-server", +# ".", +# "awc", +# #"actix-http", +# "actix-cors", +# "actix-files", +# "actix-framed", +# "actix-session", +# "actix-identity", +# "actix-multipart", +# "actix-web-actors", +# "actix-web-codegen", +# "test-server", ] +exclude = ["actix-http"] [features] default = ["brotli", "flate2-zlib", "client", "fail"] @@ -122,12 +123,23 @@ opt-level = 3 codegen-units = 1 [patch.crates-io] -actix-web = { path = "." } -actix-http = { path = "actix-http" } -actix-http-test = { path = "test-server" } -actix-web-codegen = { path = "actix-web-codegen" } -actix-web-actors = { path = "actix-web-actors" } -actix-session = { path = "actix-session" } -actix-files = { path = "actix-files" } -actix-multipart = { path = "actix-multipart" } -awc = { path = "awc" } +# actix-web = { path = "." } +# actix-http = { path = "actix-http" } +# actix-http-test = { path = "test-server" } +# actix-web-codegen = { path = "actix-web-codegen" } +# actix-web-actors = { path = "actix-web-actors" } +# actix-session = { path = "actix-session" } +# actix-files = { path = "actix-files" } +# actix-multipart = { path = "actix-multipart" } +# awc = { path = "awc" } + +actix-codec = { path = "../actix-net/actix-codec" } +actix-connect = { path = "../actix-net/actix-connect" } +actix-ioframe = { path = "../actix-net/actix-ioframe" } +actix-rt = { path = "../actix-net/actix-rt" } +actix-server = { path = "../actix-net/actix-server" } +actix-server-config = { path = "../actix-net/actix-server-config" } +actix-service = { path = "../actix-net/actix-service" } +actix-testing = { path = "../actix-net/actix-testing" } +actix-threadpool = { path = "../actix-net/actix-threadpool" } +actix-utils = { path = "../actix-net/actix-utils" } diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index ee0ded597..1cc5e43a1 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "0.2.11" +version = "0.3.0-alpha.1" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" @@ -13,10 +13,11 @@ categories = ["network-programming", "asynchronous", "web-programming::websocket"] license = "MIT/Apache-2.0" edition = "2018" -workspace = ".." + +# workspace = ".." [package.metadata.docs.rs] -features = ["ssl", "fail", "brotli", "flate2-zlib", "secure-cookies"] +features = ["openssl", "fail", "brotli", "flate2-zlib", "secure-cookies"] [lib] name = "actix_http" @@ -26,10 +27,10 @@ path = "src/lib.rs" default = [] # openssl -ssl = ["openssl", "actix-connect/ssl"] +openssl = ["open-ssl", "actix-connect/openssl"] # rustls support -rust-tls = ["rustls", "webpki-roots", "actix-connect/rust-tls"] +rustls = ["rust-tls", "webpki-roots", "actix-connect/rustls"] # brotli encoding, requires c compiler brotli = ["brotli2"] @@ -47,23 +48,24 @@ fail = ["failure"] secure-cookies = ["ring"] [dependencies] -actix-service = "0.4.1" -actix-codec = "0.1.2" -actix-connect = "0.2.4" -actix-utils = "0.4.4" -actix-server-config = "0.1.2" -actix-threadpool = "0.1.1" +actix-service = "1.0.0-alpha.1" +actix-codec = "0.2.0-alpha.1" +actix-connect = "1.0.0-alpha.1" +actix-utils = "0.5.0-alpha.1" +actix-server-config = "0.3.0-alpha.1" +actix-threadpool = "0.2.0-alpha.1" base64 = "0.10" bitflags = "1.0" bytes = "0.4" copyless = "0.1.4" +chrono = "0.4.6" derive_more = "0.15.0" either = "1.5.2" encoding_rs = "0.8" -futures = "0.1.25" +futures = "0.3.1" hashbrown = "0.6.3" -h2 = "0.1.16" +h2 = "0.2.0-alpha.3" http = "0.1.17" httparse = "1.3" indexmap = "1.2" @@ -80,13 +82,16 @@ sha1 = "0.6" slab = "0.4" serde_urlencoded = "0.6.1" time = "0.1.42" -tokio-tcp = "0.1.3" -tokio-timer = "0.2.8" -tokio-current-thread = "0.1" -trust-dns-resolver = { version="0.11.1", default-features = false } + +tokio = "=0.2.0-alpha.6" +tokio-io = "=0.2.0-alpha.6" +tokio-net = "=0.2.0-alpha.6" +tokio-timer = "0.3.0-alpha.6" +tokio-executor = "=0.2.0-alpha.6" +trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false } # for secure cookie -ring = { version = "0.14.6", optional = true } +ring = { version = "0.16.9", optional = true } # compression brotli2 = { version="0.3.2", optional = true } @@ -94,17 +99,25 @@ flate2 = { version="1.0.7", optional = true, default-features = false } # optional deps failure = { version = "0.1.5", optional = true } -openssl = { version="0.10", optional = true } -rustls = { version = "0.15.2", optional = true } -webpki-roots = { version = "0.16", optional = true } -chrono = "0.4.6" +open-ssl = { version="0.10", package="openssl", optional = true } +rust-tls = { version = "0.16.0", package="rustls", optional = true } +webpki-roots = { version = "0.18", optional = true } [dev-dependencies] -actix-rt = "0.2.2" -actix-server = { version = "0.6.0", features=["ssl", "rust-tls"] } -actix-connect = { version = "0.2.0", features=["ssl"] } -actix-http-test = { version = "0.2.4", features=["ssl"] } +actix-rt = "1.0.0-alpha.1" +actix-server = { version = "0.8.0-alpha.1", features=["openssl"] } +actix-connect = { version = "1.0.0-alpha.1", features=["openssl"] } +#actix-http-test = { version = "0.2.4", features=["ssl"] } env_logger = "0.6" serde_derive = "1.0" -openssl = { version="0.10" } -tokio-tcp = "0.1" +open-ssl = { version="0.10", package="openssl" } + +[patch.crates-io] +actix-codec = { path = "../../actix-net/actix-codec" } +actix-connect = { path = "../../actix-net/actix-connect" } +actix-rt = { path = "../../actix-net/actix-rt" } +actix-server = { path = "../../actix-net/actix-server" } +actix-server-config = { path = "../../actix-net/actix-server-config" } +actix-service = { path = "../../actix-net/actix-service" } +actix-threadpool = { path = "../../actix-net/actix-threadpool" } +actix-utils = { path = "../../actix-net/actix-utils" } diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index b761738e1..7b86bfb14 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -1,8 +1,10 @@ use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; -use futures::{Async, Poll, Stream}; +use futures::Stream; use crate::error::Error; @@ -29,10 +31,10 @@ impl BodySize { } /// Type that provides this trait can be streamed to a peer. -pub trait MessageBody { +pub trait MessageBody: Unpin { fn size(&self) -> BodySize; - fn poll_next(&mut self) -> Poll, Error>; + fn poll_next(&mut self, cx: &mut Context) -> Poll>>; } impl MessageBody for () { @@ -40,8 +42,8 @@ impl MessageBody for () { BodySize::Empty } - fn poll_next(&mut self) -> Poll, Error> { - Ok(Async::Ready(None)) + fn poll_next(&mut self, _: &mut Context) -> Poll>> { + Poll::Ready(None) } } @@ -50,8 +52,8 @@ impl MessageBody for Box { self.as_ref().size() } - fn poll_next(&mut self) -> Poll, Error> { - self.as_mut().poll_next() + fn poll_next(&mut self, cx: &mut Context) -> Poll>> { + self.as_mut().poll_next(cx) } } @@ -93,20 +95,19 @@ impl MessageBody for ResponseBody { } } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, cx: &mut Context) -> Poll>> { match self { - ResponseBody::Body(ref mut body) => body.poll_next(), - ResponseBody::Other(ref mut body) => body.poll_next(), + ResponseBody::Body(ref mut body) => body.poll_next(cx), + ResponseBody::Other(ref mut body) => body.poll_next(cx), } } } impl Stream for ResponseBody { - type Item = Bytes; - type Error = Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.poll_next() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.get_mut().poll_next(cx) } } @@ -144,19 +145,19 @@ impl MessageBody for Body { } } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, cx: &mut Context) -> Poll>> { match self { - Body::None => Ok(Async::Ready(None)), - Body::Empty => Ok(Async::Ready(None)), + Body::None => Poll::Ready(None), + Body::Empty => Poll::Ready(None), Body::Bytes(ref mut bin) => { let len = bin.len(); if len == 0 { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { - Ok(Async::Ready(Some(mem::replace(bin, Bytes::new())))) + Poll::Ready(Some(Ok(mem::replace(bin, Bytes::new())))) } } - Body::Message(ref mut body) => body.poll_next(), + Body::Message(ref mut body) => body.poll_next(cx), } } } @@ -242,7 +243,7 @@ impl From for Body { impl From> for Body where - S: Stream + 'static, + S: Stream> + Unpin + 'static, { fn from(s: SizedStream) -> Body { Body::from_message(s) @@ -251,8 +252,8 @@ where impl From> for Body where - S: Stream + 'static, - E: Into + 'static, + S: Stream> + Unpin + 'static, + E: Into + Unpin + 'static, { fn from(s: BodyStream) -> Body { Body::from_message(s) @@ -264,11 +265,11 @@ impl MessageBody for Bytes { BodySize::Sized(self.len()) } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, _: &mut Context) -> Poll>> { if self.is_empty() { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { - Ok(Async::Ready(Some(mem::replace(self, Bytes::new())))) + Poll::Ready(Some(Ok(mem::replace(self, Bytes::new())))) } } } @@ -278,13 +279,11 @@ impl MessageBody for BytesMut { BodySize::Sized(self.len()) } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, _: &mut Context) -> Poll>> { if self.is_empty() { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { - Ok(Async::Ready(Some( - mem::replace(self, BytesMut::new()).freeze(), - ))) + Poll::Ready(Some(Ok(mem::replace(self, BytesMut::new()).freeze()))) } } } @@ -294,11 +293,11 @@ impl MessageBody for &'static str { BodySize::Sized(self.len()) } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, _: &mut Context) -> Poll>> { if self.is_empty() { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { - Ok(Async::Ready(Some(Bytes::from_static( + Poll::Ready(Some(Ok(Bytes::from_static( mem::replace(self, "").as_ref(), )))) } @@ -310,13 +309,11 @@ impl MessageBody for &'static [u8] { BodySize::Sized(self.len()) } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, _: &mut Context) -> Poll>> { if self.is_empty() { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { - Ok(Async::Ready(Some(Bytes::from_static(mem::replace( - self, b"", - ))))) + Poll::Ready(Some(Ok(Bytes::from_static(mem::replace(self, b""))))) } } } @@ -326,14 +323,11 @@ impl MessageBody for Vec { BodySize::Sized(self.len()) } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, _: &mut Context) -> Poll>> { if self.is_empty() { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { - Ok(Async::Ready(Some(Bytes::from(mem::replace( - self, - Vec::new(), - ))))) + Poll::Ready(Some(Ok(Bytes::from(mem::replace(self, Vec::new()))))) } } } @@ -343,11 +337,11 @@ impl MessageBody for String { BodySize::Sized(self.len()) } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, _: &mut Context) -> Poll>> { if self.is_empty() { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { - Ok(Async::Ready(Some(Bytes::from( + Poll::Ready(Some(Ok(Bytes::from( mem::replace(self, String::new()).into_bytes(), )))) } @@ -363,7 +357,7 @@ pub struct BodyStream { impl BodyStream where - S: Stream, + S: Stream>, E: Into, { pub fn new(stream: S) -> Self { @@ -376,15 +370,17 @@ where impl MessageBody for BodyStream where - S: Stream, - E: Into, + S: Stream> + Unpin, + E: Into + Unpin, { fn size(&self) -> BodySize { BodySize::Stream } - fn poll_next(&mut self) -> Poll, Error> { - self.stream.poll().map_err(std::convert::Into::into) + fn poll_next(&mut self, cx: &mut Context) -> Poll>> { + Pin::new(&mut self.stream) + .poll_next(cx) + .map(|res| res.map(|res| res.map_err(std::convert::Into::into))) } } @@ -397,7 +393,7 @@ pub struct SizedStream { impl SizedStream where - S: Stream, + S: Stream>, { pub fn new(size: u64, stream: S) -> Self { SizedStream { size, stream } @@ -406,14 +402,14 @@ where impl MessageBody for SizedStream where - S: Stream, + S: Stream> + Unpin, { fn size(&self) -> BodySize { BodySize::Sized64(self.size) } - fn poll_next(&mut self) -> Poll, Error> { - self.stream.poll() + fn poll_next(&mut self, cx: &mut Context) -> Poll>> { + Pin::new(&mut self.stream).poll_next(cx) } } diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index cd23b7265..8997d720c 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -4,7 +4,7 @@ use std::rc::Rc; use actix_codec::Framed; use actix_server_config::ServerConfig as SrvConfig; -use actix_service::{IntoNewService, NewService, Service}; +use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use crate::body::MessageBody; use crate::config::{KeepAlive, ServiceConfig}; @@ -32,9 +32,12 @@ pub struct HttpServiceBuilder> { impl HttpServiceBuilder> where - S: NewService, - S::Error: Into, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, { /// Create instance of `ServiceConfigBuilder` pub fn new() -> Self { @@ -52,19 +55,28 @@ where impl HttpServiceBuilder where - S: NewService, - S::Error: Into, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - X: NewService, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - U: NewService< + X::Future: Unpin, + X::Service: Unpin, + ::Future: Unpin + 'static, + U: ServiceFactory< Config = SrvConfig, Request = (Request, Framed), Response = (), >, U::Error: fmt::Display, U::InitError: fmt::Debug, + U::Future: Unpin, + U::Service: Unpin, + ::Future: Unpin + 'static, { /// Set server keep-alive setting. /// @@ -108,16 +120,19 @@ where /// request will be forwarded to main service. pub fn expect(self, expect: F) -> HttpServiceBuilder where - F: IntoNewService, - X1: NewService, + F: IntoServiceFactory, + X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, + X1::Future: Unpin, + X1::Service: Unpin, + ::Future: Unpin + 'static, { HttpServiceBuilder { keep_alive: self.keep_alive, client_timeout: self.client_timeout, client_disconnect: self.client_disconnect, - expect: expect.into_new_service(), + expect: expect.into_factory(), upgrade: self.upgrade, on_connect: self.on_connect, _t: PhantomData, @@ -130,21 +145,24 @@ where /// and this service get called with original request and framed object. pub fn upgrade(self, upgrade: F) -> HttpServiceBuilder where - F: IntoNewService, - U1: NewService< + F: IntoServiceFactory, + U1: ServiceFactory< Config = SrvConfig, Request = (Request, Framed), Response = (), >, U1::Error: fmt::Display, U1::InitError: fmt::Debug, + U1::Future: Unpin, + U1::Service: Unpin, + ::Future: Unpin + 'static, { HttpServiceBuilder { keep_alive: self.keep_alive, client_timeout: self.client_timeout, client_disconnect: self.client_disconnect, expect: self.expect, - upgrade: Some(upgrade.into_new_service()), + upgrade: Some(upgrade.into_factory()), on_connect: self.on_connect, _t: PhantomData, } @@ -167,17 +185,21 @@ where pub fn h1(self, service: F) -> H1Service where B: MessageBody + 'static, - F: IntoNewService, - S::Error: Into, + F: IntoServiceFactory, + S::Future: Unpin, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, + S::Response: Into> + Unpin + 'static, + S::Service: Unpin, + ::Future: Unpin + 'static, + P: Unpin, { let cfg = ServiceConfig::new( self.keep_alive, self.client_timeout, self.client_disconnect, ); - H1Service::with_config(cfg, service.into_new_service()) + H1Service::with_config(cfg, service.into_factory()) .expect(self.expect) .upgrade(self.upgrade) .on_connect(self.on_connect) @@ -187,37 +209,42 @@ where pub fn h2(self, service: F) -> H2Service where B: MessageBody + 'static, - F: IntoNewService, - S::Error: Into, + F: IntoServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, - ::Future: 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, + P: Unpin, { let cfg = ServiceConfig::new( self.keep_alive, self.client_timeout, self.client_disconnect, ); - H2Service::with_config(cfg, service.into_new_service()) - .on_connect(self.on_connect) + H2Service::with_config(cfg, service.into_factory()).on_connect(self.on_connect) } /// Finish service configuration and create `HttpService` instance. pub fn finish(self, service: F) -> HttpService where B: MessageBody + 'static, - F: IntoNewService, - S::Error: Into, + F: IntoServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, - ::Future: 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, + P: Unpin, { let cfg = ServiceConfig::new( self.keep_alive, self.client_timeout, self.client_disconnect, ); - HttpService::with_config(cfg, service.into_new_service()) + HttpService::with_config(cfg, service.into_factory()) .expect(self.expect) .upgrade(self.upgrade) .on_connect(self.on_connect) diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 98e8618c3..4ae28ba68 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -1,15 +1,18 @@ use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use actix_codec::{AsyncRead, AsyncWrite}; use actix_connect::{ default_connector, Connect as TcpConnect, Connection as TcpConnection, }; -use actix_service::{apply_fn, Service, ServiceExt}; +use actix_service::{apply_fn, Service}; use actix_utils::timeout::{TimeoutError, TimeoutService}; use http::Uri; -use tokio_tcp::TcpStream; +use tokio_net::tcp::TcpStream; use super::connection::Connection; use super::error::ConnectError; @@ -212,7 +215,7 @@ where pub fn finish( self, ) -> impl Service - + Clone { + + Clone { #[cfg(not(any(feature = "ssl", feature = "rust-tls")))] { let connector = TimeoutService::new( diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index b078c6a67..14984253b 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -1,10 +1,13 @@ +use std::future::Future; use std::io::Write; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{io, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use bytes::{BufMut, Bytes, BytesMut}; use futures::future::{ok, Either}; -use futures::{Async, Future, Poll, Sink, Stream}; +use futures::{Sink, Stream}; use crate::error::PayloadError; use crate::h1; diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 5744a1547..50d74fe1b 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -1,9 +1,11 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; use futures::future::{err, Either}; -use futures::{Async, Future, Poll}; use h2::{client::SendRequest, SendStream}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, HttpTryFrom, Method, Version}; diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index a3522ff8a..4d02e0a13 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -1,22 +1,24 @@ use std::cell::RefCell; use std::collections::VecDeque; +use std::future::Future; use std::io; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::Service; +use actix_utils::oneshot; +use actix_utils::task::LocalWaker; use bytes::Bytes; use futures::future::{err, ok, Either, FutureResult}; -use futures::task::AtomicTask; -use futures::unsync::oneshot; -use futures::{Async, Future, Poll}; use h2::client::{handshake, Handshake}; use hashbrown::HashMap; use http::uri::Authority; use indexmap::IndexSet; use slab::Slab; -use tokio_timer::{sleep, Delay}; +use tokio_timer::{delay_for, Delay}; use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; @@ -140,7 +142,7 @@ where // start support future if !support { self.1.as_ref().borrow_mut().task = Some(AtomicTask::new()); - tokio_current_thread::spawn(ConnectorPoolSupport { + tokio_executor::current_thread::spawn(ConnectorPoolSupport { connector: self.0.clone(), inner: self.1.clone(), }) @@ -255,7 +257,7 @@ where if let Some(ref mut h2) = self.h2 { return match h2.poll() { Ok(Async::Ready((snd, connection))) => { - tokio_current_thread::spawn(connection.map_err(|_| ())); + tokio_executor::current_thread::spawn(connection.map_err(|_| ())); Ok(Async::Ready(IoConnection::new( ConnectionType::H2(snd), Instant::now(), @@ -373,7 +375,7 @@ where { if let Some(timeout) = self.disconnect_timeout { if let ConnectionType::H1(io) = conn.io { - tokio_current_thread::spawn(CloseConnection::new( + tokio_executor::current_thread::spawn(CloseConnection::new( io, timeout, )) } @@ -387,7 +389,7 @@ where Ok(n) if n > 0 => { if let Some(timeout) = self.disconnect_timeout { if let ConnectionType::H1(io) = io { - tokio_current_thread::spawn( + tokio_executor::current_thread::spawn( CloseConnection::new(io, timeout), ) } @@ -421,7 +423,7 @@ where self.acquired -= 1; if let Some(timeout) = self.disconnect_timeout { if let ConnectionType::H1(io) = io { - tokio_current_thread::spawn(CloseConnection::new(io, timeout)) + tokio_executor::current_thread::spawn(CloseConnection::new(io, timeout)) } } self.check_availibility(); @@ -448,7 +450,7 @@ where fn new(io: T, timeout: Duration) -> Self { CloseConnection { io, - timeout: sleep(timeout), + timeout: delay_for(timeout), } } } @@ -558,7 +560,7 @@ where inner: Rc>>, fut: F, ) { - tokio_current_thread::spawn(OpenWaitingConnection { + tokio_executor::current_thread::spawn(OpenWaitingConnection { key, fut, h2: None, @@ -593,7 +595,7 @@ where if let Some(ref mut h2) = self.h2 { return match h2.poll() { Ok(Async::Ready((snd, connection))) => { - tokio_current_thread::spawn(connection.map_err(|_| ())); + tokio_executor::current_thread::spawn(connection.map_err(|_| ())); let rx = self.rx.take().unwrap(); let _ = rx.send(Ok(IoConnection::new( ConnectionType::H2(snd), diff --git a/actix-http/src/cloneable.rs b/actix-http/src/cloneable.rs index ffc1d0611..18869c66d 100644 --- a/actix-http/src/cloneable.rs +++ b/actix-http/src/cloneable.rs @@ -1,8 +1,8 @@ use std::cell::UnsafeCell; use std::rc::Rc; +use std::task::{Context, Poll}; use actix_service::Service; -use futures::Poll; #[doc(hidden)] /// Service that allows to turn non-clone service to a service with `Clone` impl @@ -32,8 +32,8 @@ where type Error = T::Error; type Future = T::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - unsafe { &mut *self.0.as_ref().get() }.poll_ready() + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + unsafe { &mut *self.0.as_ref().get() }.poll_ready(cx) } fn call(&mut self, req: T::Request) -> Self::Future { diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index bdfecef30..a2dab8f04 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -5,9 +5,9 @@ use std::rc::Rc; use std::time::{Duration, Instant}; use bytes::BytesMut; -use futures::{future, Future}; +use futures::{future, Future, FutureExt}; use time; -use tokio_timer::{sleep, Delay}; +use tokio_timer::{delay, delay_for, Delay}; // "Sun, 06 Nov 1994 08:49:37 GMT".len() const DATE_VALUE_LENGTH: usize = 29; @@ -104,10 +104,10 @@ impl ServiceConfig { #[inline] /// Client timeout for first request. pub fn client_timer(&self) -> Option { - let delay = self.0.client_timeout; - if delay != 0 { - Some(Delay::new( - self.0.timer.now() + Duration::from_millis(delay), + let delay_time = self.0.client_timeout; + if delay_time != 0 { + Some(delay( + self.0.timer.now() + Duration::from_millis(delay_time), )) } else { None @@ -138,7 +138,7 @@ impl ServiceConfig { /// Return keep-alive timer delay is configured. pub fn keep_alive_timer(&self) -> Option { if let Some(ka) = self.0.keep_alive { - Some(Delay::new(self.0.timer.now() + ka)) + Some(delay(self.0.timer.now() + ka)) } else { None } @@ -242,12 +242,12 @@ impl DateService { // periodic date update let s = self.clone(); - tokio_current_thread::spawn(sleep(Duration::from_millis(500)).then( - move |_| { + tokio_executor::current_thread::spawn( + delay_for(Duration::from_millis(500)).then(move |_| { s.0.reset(); - future::ok(()) - }, - )); + future::ready(()) + }), + ); } } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index 4b56a1b62..1e51e8b56 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -1,4 +1,7 @@ +use std::future::Future; use std::io::{self, Write}; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix_threadpool::{run, CpuFuture}; #[cfg(feature = "brotli")] @@ -6,7 +9,7 @@ use brotli2::write::BrotliDecoder; use bytes::Bytes; #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] use flate2::write::{GzDecoder, ZlibDecoder}; -use futures::{try_ready, Async, Future, Poll, Stream}; +use futures::{ready, Stream}; use super::Writer; use crate::error::PayloadError; @@ -18,12 +21,12 @@ pub struct Decoder { decoder: Option, stream: S, eof: bool, - fut: Option, ContentDecoder), io::Error>>, + fut: Option, ContentDecoder), io::Error>>>, } impl Decoder where - S: Stream, + S: Stream>, { /// Construct a decoder. #[inline] @@ -71,34 +74,41 @@ where impl Stream for Decoder where - S: Stream, + S: Stream> + Unpin, { - type Item = Bytes; - type Error = PayloadError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { loop { if let Some(ref mut fut) = self.fut { - let (chunk, decoder) = try_ready!(fut.poll()); + let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { + Ok(Ok(item)) => item, + Ok(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Err(e) => return Poll::Ready(Some(Err(e.into()))), + }; self.decoder = Some(decoder); self.fut.take(); if let Some(chunk) = chunk { - return Ok(Async::Ready(Some(chunk))); + return Poll::Ready(Some(Ok(chunk))); } } if self.eof { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } - match self.stream.poll()? { - Async::Ready(Some(chunk)) => { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))), + Poll::Ready(Some(Ok(chunk))) => { if let Some(mut decoder) = self.decoder.take() { if chunk.len() < INPLACE { let chunk = decoder.feed_data(chunk)?; self.decoder = Some(decoder); if let Some(chunk) = chunk { - return Ok(Async::Ready(Some(chunk))); + return Poll::Ready(Some(Ok(chunk))); } } else { self.fut = Some(run(move || { @@ -108,21 +118,25 @@ where } continue; } else { - return Ok(Async::Ready(Some(chunk))); + return Poll::Ready(Some(Ok(chunk))); } } - Async::Ready(None) => { + Poll::Ready(None) => { self.eof = true; return if let Some(mut decoder) = self.decoder.take() { - Ok(Async::Ready(decoder.feed_eof()?)) + match decoder.feed_eof() { + Ok(Some(res)) => Poll::Ready(Some(Ok(res))), + Ok(None) => Poll::Ready(None), + Err(err) => Poll::Ready(Some(Err(err.into()))), + } } else { - Ok(Async::Ready(None)) + Poll::Ready(None) }; } - Async::NotReady => break, + Poll::Pending => break, } } - Ok(Async::NotReady) + Poll::Pending } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 58d8a2d9e..295d99a2a 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -1,5 +1,8 @@ //! Stream encoder +use std::future::Future; use std::io::{self, Write}; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix_threadpool::{run, CpuFuture}; #[cfg(feature = "brotli")] @@ -7,7 +10,6 @@ use brotli2::write::BrotliEncoder; use bytes::Bytes; #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] use flate2::write::{GzEncoder, ZlibEncoder}; -use futures::{Async, Future, Poll}; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; @@ -22,7 +24,7 @@ pub struct Encoder { eof: bool, body: EncoderBody, encoder: Option, - fut: Option>, + fut: Option>>, } impl Encoder { @@ -94,43 +96,46 @@ impl MessageBody for Encoder { } } - fn poll_next(&mut self) -> Poll, Error> { + fn poll_next(&mut self, cx: &mut Context) -> Poll>> { loop { if self.eof { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } if let Some(ref mut fut) = self.fut { - let mut encoder = futures::try_ready!(fut.poll()); + let mut encoder = match futures::ready!(Pin::new(fut).poll(cx)) { + Ok(Ok(item)) => item, + Ok(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Err(e) => return Poll::Ready(Some(Err(e.into()))), + }; let chunk = encoder.take(); self.encoder = Some(encoder); self.fut.take(); if !chunk.is_empty() { - return Ok(Async::Ready(Some(chunk))); + return Poll::Ready(Some(Ok(chunk))); } } let result = match self.body { EncoderBody::Bytes(ref mut b) => { if b.is_empty() { - Async::Ready(None) + Poll::Ready(None) } else { - Async::Ready(Some(std::mem::replace(b, Bytes::new()))) + Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new())))) } } - EncoderBody::Stream(ref mut b) => b.poll_next()?, - EncoderBody::BoxedStream(ref mut b) => b.poll_next()?, + EncoderBody::Stream(ref mut b) => b.poll_next(cx), + EncoderBody::BoxedStream(ref mut b) => b.poll_next(cx), }; match result { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(Some(chunk)) => { + Poll::Ready(Some(Ok(chunk))) => { if let Some(mut encoder) = self.encoder.take() { if chunk.len() < INPLACE { encoder.write(&chunk)?; let chunk = encoder.take(); self.encoder = Some(encoder); if !chunk.is_empty() { - return Ok(Async::Ready(Some(chunk))); + return Poll::Ready(Some(Ok(chunk))); } } else { self.fut = Some(run(move || { @@ -139,22 +144,23 @@ impl MessageBody for Encoder { })); } } else { - return Ok(Async::Ready(Some(chunk))); + return Poll::Ready(Some(Ok(chunk))); } } - Async::Ready(None) => { + Poll::Ready(None) => { if let Some(encoder) = self.encoder.take() { let chunk = encoder.finish()?; if chunk.is_empty() { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } else { self.eof = true; - return Ok(Async::Ready(Some(chunk))); + return Poll::Ready(Some(Ok(chunk))); } } else { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } } + val => return val, } } } diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index cd9613d21..82027dbe3 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -6,11 +6,10 @@ use std::str::Utf8Error; use std::string::FromUtf8Error; use std::{fmt, io, result}; -pub use actix_threadpool::BlockingError; use actix_utils::timeout::TimeoutError; use bytes::BytesMut; use derive_more::{Display, From}; -use futures::Canceled; +use futures::channel::oneshot::Canceled; use http::uri::InvalidUri; use http::{header, Error as HttpError, StatusCode}; use httparse; @@ -197,8 +196,8 @@ impl ResponseError for DeError { } } -/// `InternalServerError` for `BlockingError` -impl ResponseError for BlockingError {} +/// `InternalServerError` for `Canceled` +impl ResponseError for Canceled {} /// Return `BAD_REQUEST` for `Utf8Error` impl ResponseError for Utf8Error { @@ -236,9 +235,6 @@ impl ResponseError for header::InvalidHeaderValueBytes { } } -/// `InternalServerError` for `futures::Canceled` -impl ResponseError for Canceled {} - /// A set of errors that can occur during parsing HTTP streams #[derive(Debug, Display)] pub enum ParseError { @@ -365,15 +361,12 @@ impl From for PayloadError { } } -impl From> for PayloadError { - fn from(err: BlockingError) -> Self { - match err { - BlockingError::Error(e) => PayloadError::Io(e), - BlockingError::Canceled => PayloadError::Io(io::Error::new( - io::ErrorKind::Other, - "Thread pool is gone", - )), - } +impl From for PayloadError { + fn from(_: Canceled) -> Self { + PayloadError::Io(io::Error::new( + io::ErrorKind::Other, + "Operation is canceled", + )) } } @@ -390,12 +383,12 @@ impl ResponseError for PayloadError { } } -/// Return `BadRequest` for `cookie::ParseError` -impl ResponseError for crate::cookie::ParseError { - fn error_response(&self) -> Response { - Response::new(StatusCode::BAD_REQUEST) - } -} +// /// Return `BadRequest` for `cookie::ParseError` +// impl ResponseError for crate::cookie::ParseError { +// fn error_response(&self) -> Response { +// Response::new(StatusCode::BAD_REQUEST) +// } +// } #[derive(Debug, Display, From)] /// A set of errors that can occur during dispatching http requests diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index ce113a145..272270ca1 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -1,10 +1,12 @@ +use std::future::Future; use std::io; use std::marker::PhantomData; use std::mem::MaybeUninit; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix_codec::Decoder; use bytes::{Bytes, BytesMut}; -use futures::{Async, Poll}; use http::header::{HeaderName, HeaderValue}; use http::{header, HttpTryFrom, Method, StatusCode, Uri, Version}; use httparse; @@ -442,9 +444,10 @@ impl Decoder for PayloadDecoder { loop { let mut buf = None; // advances the chunked state - *state = match state.step(src, size, &mut buf)? { - Async::NotReady => return Ok(None), - Async::Ready(state) => state, + *state = match state.step(src, size, &mut buf) { + Poll::Pending => return Ok(None), + Poll::Ready(Ok(state)) => state, + Poll::Ready(Err(e)) => return Err(e), }; if *state == ChunkedState::End { trace!("End of chunked stream"); @@ -476,7 +479,7 @@ macro_rules! byte ( $rdr.split_to(1); b } else { - return Ok(Async::NotReady) + return Poll::Pending } }) ); @@ -487,7 +490,7 @@ impl ChunkedState { body: &mut BytesMut, size: &mut u64, buf: &mut Option, - ) -> Poll { + ) -> Poll> { use self::ChunkedState::*; match *self { Size => ChunkedState::read_size(body, size), @@ -499,10 +502,14 @@ impl ChunkedState { BodyLf => ChunkedState::read_body_lf(body), EndCr => ChunkedState::read_end_cr(body), EndLf => ChunkedState::read_end_lf(body), - End => Ok(Async::Ready(ChunkedState::End)), + End => Poll::Ready(Ok(ChunkedState::End)), } } - fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { + + fn read_size( + rdr: &mut BytesMut, + size: &mut u64, + ) -> Poll> { let radix = 16; match byte!(rdr) { b @ b'0'..=b'9' => { @@ -517,48 +524,49 @@ impl ChunkedState { *size *= radix; *size += u64::from(b + 10 - b'A'); } - b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => return Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), + b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)), + b';' => return Poll::Ready(Ok(ChunkedState::Extension)), + b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)), _ => { - return Err(io::Error::new( + return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk size line: Invalid Size", - )); + ))); } } - Ok(Async::Ready(ChunkedState::Size)) + Poll::Ready(Ok(ChunkedState::Size)) } - fn read_size_lws(rdr: &mut BytesMut) -> Poll { + + fn read_size_lws(rdr: &mut BytesMut) -> Poll> { trace!("read_size_lws"); match byte!(rdr) { // LWS can follow the chunk size, but no more digits can come - b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => Err(io::Error::new( + b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)), + b';' => Poll::Ready(Ok(ChunkedState::Extension)), + b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)), + _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk size linear white space", - )), + ))), } } - fn read_extension(rdr: &mut BytesMut) -> Poll { + fn read_extension(rdr: &mut BytesMut) -> Poll> { match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions + b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)), + _ => Poll::Ready(Ok(ChunkedState::Extension)), // no supported extensions } } fn read_size_lf( rdr: &mut BytesMut, size: &mut u64, - ) -> Poll { + ) -> Poll> { match byte!(rdr) { - b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), - b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), - _ => Err(io::Error::new( + b'\n' if *size > 0 => Poll::Ready(Ok(ChunkedState::Body)), + b'\n' if *size == 0 => Poll::Ready(Ok(ChunkedState::EndCr)), + _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk size LF", - )), + ))), } } @@ -566,12 +574,12 @@ impl ChunkedState { rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option, - ) -> Poll { + ) -> Poll> { trace!("Chunked read, remaining={:?}", rem); let len = rdr.len() as u64; if len == 0 { - Ok(Async::Ready(ChunkedState::Body)) + Poll::Ready(Ok(ChunkedState::Body)) } else { let slice; if *rem > len { @@ -583,47 +591,47 @@ impl ChunkedState { } *buf = Some(slice); if *rem > 0 { - Ok(Async::Ready(ChunkedState::Body)) + Poll::Ready(Ok(ChunkedState::Body)) } else { - Ok(Async::Ready(ChunkedState::BodyCr)) + Poll::Ready(Ok(ChunkedState::BodyCr)) } } } - fn read_body_cr(rdr: &mut BytesMut) -> Poll { + fn read_body_cr(rdr: &mut BytesMut) -> Poll> { match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), - _ => Err(io::Error::new( + b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)), + _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk body CR", - )), + ))), } } - fn read_body_lf(rdr: &mut BytesMut) -> Poll { + fn read_body_lf(rdr: &mut BytesMut) -> Poll> { match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::Size)), - _ => Err(io::Error::new( + b'\n' => Poll::Ready(Ok(ChunkedState::Size)), + _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk body LF", - )), + ))), } } - fn read_end_cr(rdr: &mut BytesMut) -> Poll { + fn read_end_cr(rdr: &mut BytesMut) -> Poll> { match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), - _ => Err(io::Error::new( + b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)), + _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk end CR", - )), + ))), } } - fn read_end_lf(rdr: &mut BytesMut) -> Poll { + fn read_end_lf(rdr: &mut BytesMut) -> Poll> { match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::End)), - _ => Err(io::Error::new( + b'\n' => Poll::Ready(Ok(ChunkedState::End)), + _ => Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidInput, "Invalid chunk end LF", - )), + ))), } } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index c82eb4ac8..16e36447d 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1,15 +1,17 @@ use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Instant; -use std::{fmt, io, net}; +use std::{fmt, io, io::Write, net}; -use actix_codec::{Decoder, Encoder, Framed, FramedParts}; +use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; use actix_server_config::IoStream; use actix_service::Service; use bitflags::bitflags; use bytes::{BufMut, BytesMut}; -use futures::{Async, Future, Poll}; use log::{error, trace}; -use tokio_timer::Delay; +use tokio_timer::{delay, Delay}; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::cloneable::CloneableService; @@ -46,11 +48,14 @@ pub struct Dispatcher where S: Service, S::Error: Into, + S::Future: Unpin, B: MessageBody, X: Service, X::Error: Into, + X::Future: Unpin, U: Service), Response = ()>, U::Error: fmt::Display, + U::Future: Unpin, { inner: DispatcherState, } @@ -59,11 +64,14 @@ enum DispatcherState where S: Service, S::Error: Into, + S::Future: Unpin, B: MessageBody, X: Service, X::Error: Into, + X::Future: Unpin, U: Service), Response = ()>, U::Error: fmt::Display, + U::Future: Unpin, { Normal(InnerDispatcher), Upgrade(U::Future), @@ -74,11 +82,14 @@ struct InnerDispatcher where S: Service, S::Error: Into, + S::Future: Unpin, B: MessageBody, X: Service, X::Error: Into, + X::Future: Unpin, U: Service), Response = ()>, U::Error: fmt::Display, + U::Future: Unpin, { service: CloneableService, expect: CloneableService, @@ -170,11 +181,14 @@ where S: Service, S::Error: Into, S::Response: Into>, + S::Future: Unpin, B: MessageBody, X: Service, X::Error: Into, + X::Future: Unpin, U: Service), Response = ()>, U::Error: fmt::Display, + U::Future: Unpin, { /// Create http/1 dispatcher. pub(crate) fn new( @@ -255,20 +269,23 @@ where S: Service, S::Error: Into, S::Response: Into>, + S::Future: Unpin, B: MessageBody, X: Service, X::Error: Into, + X::Future: Unpin, U: Service), Response = ()>, U::Error: fmt::Display, + U::Future: Unpin, { - fn can_read(&self) -> bool { + fn can_read(&self, cx: &mut Context) -> bool { if self .flags .intersects(Flags::READ_DISCONNECT | Flags::UPGRADE) { false } else if let Some(ref info) = self.payload { - info.need_read() == PayloadStatus::Read + info.need_read(cx) == PayloadStatus::Read } else { true } @@ -287,7 +304,7 @@ where /// /// true - got whouldblock /// false - didnt get whouldblock - fn poll_flush(&mut self) -> Result { + fn poll_flush(&mut self, cx: &mut Context) -> Result { if self.write_buf.is_empty() { return Ok(false); } @@ -295,23 +312,23 @@ where let len = self.write_buf.len(); let mut written = 0; while written < len { - match self.io.write(&self.write_buf[written..]) { - Ok(0) => { + match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) { + Poll::Ready(Ok(0)) => { return Err(DispatchError::Io(io::Error::new( io::ErrorKind::WriteZero, "", ))); } - Ok(n) => { + Poll::Ready(Ok(n)) => { written += n; } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + Poll::Pending => { if written > 0 { let _ = self.write_buf.split_to(written); } return Ok(true); } - Err(err) => return Err(DispatchError::Io(err)), + Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)), } } if written > 0 { @@ -350,12 +367,15 @@ where .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); } - fn poll_response(&mut self) -> Result { + fn poll_response( + &mut self, + cx: &mut Context, + ) -> Result { loop { let state = match self.state { State::None => match self.messages.pop_front() { Some(DispatcherMessage::Item(req)) => { - Some(self.handle_request(req)?) + Some(self.handle_request(req, cx)?) } Some(DispatcherMessage::Error(res)) => { Some(self.send_response(res, ResponseBody::Other(Body::Empty))?) @@ -365,54 +385,54 @@ where } None => None, }, - State::ExpectCall(ref mut fut) => match fut.poll() { - Ok(Async::Ready(req)) => { + State::ExpectCall(ref mut fut) => match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(req)) => { self.send_continue(); self.state = State::ServiceCall(self.service.call(req)); continue; } - Ok(Async::NotReady) => None, - Err(e) => { + Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); Some(self.send_response(res, body.into_body())?) } + Poll::Pending => None, }, - State::ServiceCall(ref mut fut) => match fut.poll() { - Ok(Async::Ready(res)) => { + State::ServiceCall(ref mut fut) => match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.state = self.send_response(res, body)?; continue; } - Ok(Async::NotReady) => None, - Err(e) => { + Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); Some(self.send_response(res, body.into_body())?) } + Poll::Pending => None, }, State::SendPayload(ref mut stream) => { loop { if self.write_buf.len() < HW_BUFFER_SIZE { - match stream - .poll_next() - .map_err(|_| DispatchError::Unknown)? - { - Async::Ready(Some(item)) => { + match stream.poll_next(cx) { + Poll::Ready(Some(Ok(item))) => { self.codec.encode( Message::Chunk(Some(item)), &mut self.write_buf, )?; continue; } - Async::Ready(None) => { + Poll::Ready(None) => { self.codec.encode( Message::Chunk(None), &mut self.write_buf, )?; self.state = State::None; } - Async::NotReady => return Ok(PollResponse::DoNothing), + Poll::Ready(Some(Err(_))) => { + return Err(DispatchError::Unknown) + } + Poll::Pending => return Ok(PollResponse::DoNothing), } } else { return Ok(PollResponse::DrainWriteBuf); @@ -433,7 +453,7 @@ where // if read-backpressure is enabled and we consumed some data. // we may read more data and retry if self.state.is_call() { - if self.poll_request()? { + if self.poll_request(cx)? { continue; } } else if !self.messages.is_empty() { @@ -446,17 +466,21 @@ where Ok(PollResponse::DoNothing) } - fn handle_request(&mut self, req: Request) -> Result, DispatchError> { + fn handle_request( + &mut self, + req: Request, + cx: &mut Context, + ) -> Result, DispatchError> { // Handle `EXPECT: 100-Continue` header let req = if req.head().expect() { let mut task = self.expect.call(req); - match task.poll() { - Ok(Async::Ready(req)) => { + match Pin::new(&mut task).poll(cx) { + Poll::Ready(Ok(req)) => { self.send_continue(); req } - Ok(Async::NotReady) => return Ok(State::ExpectCall(task)), - Err(e) => { + Poll::Pending => return Ok(State::ExpectCall(task)), + Poll::Ready(Err(e)) => { let e = e.into(); let res: Response = e.into(); let (res, body) = res.replace_body(()); @@ -469,13 +493,13 @@ where // Call service let mut task = self.service.call(req); - match task.poll() { - Ok(Async::Ready(res)) => { + match Pin::new(&mut task).poll(cx) { + Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.send_response(res, body) } - Ok(Async::NotReady) => Ok(State::ServiceCall(task)), - Err(e) => { + Poll::Pending => Ok(State::ServiceCall(task)), + Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); self.send_response(res, body.into_body()) @@ -484,9 +508,12 @@ where } /// Process one incoming requests - pub(self) fn poll_request(&mut self) -> Result { + pub(self) fn poll_request( + &mut self, + cx: &mut Context, + ) -> Result { // limit a mount of non processed requests - if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read() { + if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { return Ok(false); } @@ -521,7 +548,7 @@ where // handle request early if self.state.is_empty() { - self.state = self.handle_request(req)?; + self.state = self.handle_request(req, cx)?; } else { self.messages.push_back(DispatcherMessage::Item(req)); } @@ -587,12 +614,12 @@ where } /// keep-alive timer - fn poll_keepalive(&mut self) -> Result<(), DispatchError> { + fn poll_keepalive(&mut self, cx: &mut Context) -> Result<(), DispatchError> { if self.ka_timer.is_none() { // shutdown timeout if self.flags.contains(Flags::SHUTDOWN) { if let Some(interval) = self.codec.config().client_disconnect_timer() { - self.ka_timer = Some(Delay::new(interval)); + self.ka_timer = Some(delay(interval)); } else { self.flags.insert(Flags::READ_DISCONNECT); if let Some(mut payload) = self.payload.take() { @@ -605,11 +632,8 @@ where } } - match self.ka_timer.as_mut().unwrap().poll().map_err(|e| { - error!("Timer error {:?}", e); - DispatchError::Unknown - })? { - Async::Ready(_) => { + match Pin::new(&mut self.ka_timer.as_mut().unwrap()).poll(cx) { + Poll::Ready(()) => { // if we get timeout during shutdown, drop connection if self.flags.contains(Flags::SHUTDOWN) { return Err(DispatchError::DisconnectTimeout); @@ -624,9 +648,9 @@ where if let Some(deadline) = self.codec.config().client_disconnect_timer() { - if let Some(timer) = self.ka_timer.as_mut() { + if let Some(mut timer) = self.ka_timer.as_mut() { timer.reset(deadline); - let _ = timer.poll(); + let _ = Pin::new(&mut timer).poll(cx); } } else { // no shutdown timeout, drop socket @@ -650,17 +674,17 @@ where } else if let Some(deadline) = self.codec.config().keep_alive_expire() { - if let Some(timer) = self.ka_timer.as_mut() { + if let Some(mut timer) = self.ka_timer.as_mut() { timer.reset(deadline); - let _ = timer.poll(); + let _ = Pin::new(&mut timer).poll(cx); } } - } else if let Some(timer) = self.ka_timer.as_mut() { + } else if let Some(mut timer) = self.ka_timer.as_mut() { timer.reset(self.ka_expire); - let _ = timer.poll(); + let _ = Pin::new(&mut timer).poll(cx); } } - Async::NotReady => (), + Poll::Pending => (), } Ok(()) @@ -673,33 +697,37 @@ where S: Service, S::Error: Into, S::Response: Into>, + S::Future: Unpin, B: MessageBody, X: Service, X::Error: Into, + X::Future: Unpin, U: Service), Response = ()>, U::Error: fmt::Display, + U::Future: Unpin, { - type Item = (); - type Error = DispatchError; + type Output = Result<(), DispatchError>; #[inline] - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.inner { DispatcherState::Normal(ref mut inner) => { - inner.poll_keepalive()?; + inner.poll_keepalive(cx)?; if inner.flags.contains(Flags::SHUTDOWN) { if inner.flags.contains(Flags::WRITE_DISCONNECT) { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } else { // flush buffer - inner.poll_flush()?; + inner.poll_flush(cx)?; if !inner.write_buf.is_empty() { - Ok(Async::NotReady) + Poll::Pending } else { - match inner.io.shutdown()? { - Async::Ready(_) => Ok(Async::Ready(())), - Async::NotReady => Ok(Async::NotReady), + match Pin::new(&mut inner.io).poll_shutdown(cx) { + Poll::Ready(res) => { + Poll::Ready(res.map_err(DispatchError::from)) + } + Poll::Pending => Poll::Pending, } } } @@ -707,12 +735,12 @@ where // read socket into a buf let should_disconnect = if !inner.flags.contains(Flags::READ_DISCONNECT) { - read_available(&mut inner.io, &mut inner.read_buf)? + read_available(cx, &mut inner.io, &mut inner.read_buf)? } else { None }; - inner.poll_request()?; + inner.poll_request(cx)?; if let Some(true) = should_disconnect { inner.flags.insert(Flags::READ_DISCONNECT); if let Some(mut payload) = inner.payload.take() { @@ -724,7 +752,7 @@ where if inner.write_buf.remaining_mut() < LW_BUFFER_SIZE { inner.write_buf.reserve(HW_BUFFER_SIZE); } - let result = inner.poll_response()?; + let result = inner.poll_response(cx)?; let drain = result == PollResponse::DrainWriteBuf; // switch to upgrade handler @@ -742,7 +770,7 @@ where self.inner = DispatcherState::Upgrade( inner.upgrade.unwrap().call((req, framed)), ); - return self.poll(); + return self.poll(cx); } else { panic!() } @@ -751,14 +779,14 @@ where // we didnt get WouldBlock from write operation, // so data get written to kernel completely (OSX) // and we have to write again otherwise response can get stuck - if inner.poll_flush()? || !drain { + if inner.poll_flush(cx)? || !drain { break; } } // client is gone if inner.flags.contains(Flags::WRITE_DISCONNECT) { - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } let is_empty = inner.state.is_empty(); @@ -771,38 +799,44 @@ where // keep-alive and stream errors if is_empty && inner.write_buf.is_empty() { if let Some(err) = inner.error.take() { - Err(err) + Poll::Ready(Err(err)) } // disconnect if keep-alive is not enabled else if inner.flags.contains(Flags::STARTED) && !inner.flags.intersects(Flags::KEEPALIVE) { inner.flags.insert(Flags::SHUTDOWN); - self.poll() + self.poll(cx) } // disconnect if shutdown else if inner.flags.contains(Flags::SHUTDOWN) { - self.poll() + self.poll(cx) } else { - Ok(Async::NotReady) + Poll::Pending } } else { - Ok(Async::NotReady) + Poll::Pending } } } - DispatcherState::Upgrade(ref mut fut) => fut.poll().map_err(|e| { - error!("Upgrade handler error: {}", e); - DispatchError::Upgrade - }), + DispatcherState::Upgrade(ref mut fut) => { + Pin::new(fut).poll(cx).map_err(|e| { + error!("Upgrade handler error: {}", e); + DispatchError::Upgrade + }) + } DispatcherState::None => panic!(), } } } -fn read_available(io: &mut T, buf: &mut BytesMut) -> Result, io::Error> +fn read_available( + cx: &mut Context, + io: &mut T, + buf: &mut BytesMut, +) -> Result, io::Error> where - T: io::Read, + T: AsyncRead + Unpin, { let mut read_some = false; loop { @@ -810,19 +844,18 @@ where buf.reserve(HW_BUFFER_SIZE); } - let read = unsafe { io.read(buf.bytes_mut()) }; - match read { - Ok(n) => { + match read(cx, io, buf) { + Poll::Pending => { + return if read_some { Ok(Some(false)) } else { Ok(None) }; + } + Poll::Ready(Ok(n)) => { if n == 0 { return Ok(Some(true)); } else { read_some = true; - unsafe { - buf.advance_mut(n); - } } } - Err(e) => { + Poll::Ready(Err(e)) => { return if e.kind() == io::ErrorKind::WouldBlock { if read_some { Ok(Some(false)) @@ -833,12 +866,23 @@ where Ok(Some(true)) } else { Err(e) - }; + } } } } } +fn read( + cx: &mut Context, + io: &mut T, + buf: &mut BytesMut, +) -> Poll> +where + T: AsyncRead + Unpin, +{ + Pin::new(io).poll_read_buf(cx, buf) +} + #[cfg(test)] mod tests { use actix_service::IntoService; diff --git a/actix-http/src/h1/expect.rs b/actix-http/src/h1/expect.rs index 32b6bd9c4..79831eae1 100644 --- a/actix-http/src/h1/expect.rs +++ b/actix-http/src/h1/expect.rs @@ -1,21 +1,24 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + use actix_server_config::ServerConfig; -use actix_service::{NewService, Service}; -use futures::future::{ok, FutureResult}; -use futures::{Async, Poll}; +use actix_service::{Service, ServiceFactory}; +use futures::future::{ok, Ready}; use crate::error::Error; use crate::request::Request; pub struct ExpectHandler; -impl NewService for ExpectHandler { +impl ServiceFactory for ExpectHandler { type Config = ServerConfig; type Request = Request; type Response = Request; type Error = Error; type Service = ExpectHandler; type InitError = Error; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &ServerConfig) -> Self::Future { ok(ExpectHandler) @@ -26,10 +29,10 @@ impl Service for ExpectHandler { type Request = Request; type Response = Request; type Error = Error; - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Request) -> Self::Future { diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index 28acb64bb..036138f98 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -1,12 +1,14 @@ //! Payload stream use std::cell::RefCell; use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; use std::rc::{Rc, Weak}; +use std::task::{Context, Poll}; +use actix_utils::task::LocalWaker; use bytes::Bytes; -use futures::task::current as current_task; -use futures::task::Task; -use futures::{Async, Poll, Stream}; +use futures::Stream; use crate::error::PayloadError; @@ -77,15 +79,24 @@ impl Payload { pub fn unread_data(&mut self, data: Bytes) { self.inner.borrow_mut().unread_data(data); } + + #[inline] + pub fn readany( + &mut self, + cx: &mut Context, + ) -> Poll>> { + self.inner.borrow_mut().readany(cx) + } } impl Stream for Payload { - type Item = Bytes; - type Error = PayloadError; + type Item = Result; - #[inline] - fn poll(&mut self) -> Poll, PayloadError> { - self.inner.borrow_mut().readany() + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + self.inner.borrow_mut().readany(cx) } } @@ -117,19 +128,14 @@ impl PayloadSender { } #[inline] - pub fn need_read(&self) -> PayloadStatus { + pub fn need_read(&self, cx: &mut Context) -> PayloadStatus { // we check need_read only if Payload (other side) is alive, // otherwise always return true (consume payload) if let Some(shared) = self.inner.upgrade() { if shared.borrow().need_read { PayloadStatus::Read } else { - #[cfg(not(test))] - { - if shared.borrow_mut().io_task.is_none() { - shared.borrow_mut().io_task = Some(current_task()); - } - } + shared.borrow_mut().io_task.register(cx.waker()); PayloadStatus::Pause } } else { @@ -145,8 +151,8 @@ struct Inner { err: Option, need_read: bool, items: VecDeque, - task: Option, - io_task: Option, + task: LocalWaker, + io_task: LocalWaker, } impl Inner { @@ -157,8 +163,8 @@ impl Inner { err: None, items: VecDeque::new(), need_read: true, - task: None, - io_task: None, + task: LocalWaker::new(), + io_task: LocalWaker::new(), } } @@ -178,7 +184,7 @@ impl Inner { self.items.push_back(data); self.need_read = self.len < MAX_BUFFER_SIZE; if let Some(task) = self.task.take() { - task.notify() + task.wake() } } @@ -187,34 +193,28 @@ impl Inner { self.len } - fn readany(&mut self) -> Poll, PayloadError> { + fn readany( + &mut self, + cx: &mut Context, + ) -> Poll>> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); self.need_read = self.len < MAX_BUFFER_SIZE; - if self.need_read && self.task.is_none() && !self.eof { - self.task = Some(current_task()); + if self.need_read && !self.eof { + self.task.register(cx.waker()); } - if let Some(task) = self.io_task.take() { - task.notify() - } - Ok(Async::Ready(Some(data))) + self.io_task.wake(); + Poll::Ready(Some(Ok(data))) } else if let Some(err) = self.err.take() { - Err(err) + Poll::Ready(Some(Err(err))) } else if self.eof { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { self.need_read = true; - #[cfg(not(test))] - { - if self.task.is_none() { - self.task = Some(current_task()); - } - if let Some(task) = self.io_task.take() { - task.notify() - } - } - Ok(Async::NotReady) + self.task.register(cx.waker()); + self.io_task.wake(); + Poll::Pending } } diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 89bf08e9b..95596af76 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -1,12 +1,15 @@ use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use actix_codec::Framed; use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig}; -use actix_service::{IntoNewService, NewService, Service}; -use futures::future::{ok, FutureResult}; -use futures::{try_ready, Async, Future, IntoFuture, Poll, Stream}; +use actix_service::{IntoServiceFactory, Service, ServiceFactory}; +use futures::future::{ok, Ready}; +use futures::{ready, Stream}; use crate::body::MessageBody; use crate::cloneable::CloneableService; @@ -20,7 +23,7 @@ use super::codec::Codec; use super::dispatcher::Dispatcher; use super::{ExpectHandler, Message, UpgradeHandler}; -/// `NewService` implementation for HTTP1 transport +/// `ServiceFactory` implementation for HTTP1 transport pub struct H1Service> { srv: S, cfg: ServiceConfig, @@ -32,19 +35,23 @@ pub struct H1Service> { impl H1Service where - S: NewService, + S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin, B: MessageBody, + P: Unpin, { /// Create new `HttpService` instance with default config. - pub fn new>(service: F) -> Self { + pub fn new>(service: F) -> Self { let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0); H1Service { cfg, - srv: service.into_new_service(), + srv: service.into_factory(), expect: ExpectHandler, upgrade: None, on_connect: None, @@ -53,10 +60,13 @@ where } /// Create new `HttpService` instance with config. - pub fn with_config>(cfg: ServiceConfig, service: F) -> Self { + pub fn with_config>( + cfg: ServiceConfig, + service: F, + ) -> Self { H1Service { cfg, - srv: service.into_new_service(), + srv: service.into_factory(), expect: ExpectHandler, upgrade: None, on_connect: None, @@ -67,17 +77,24 @@ where impl H1Service where - S: NewService, + S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin, B: MessageBody, + P: Unpin, { pub fn expect(self, expect: X1) -> H1Service where - X1: NewService, + X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, + X1::Future: Unpin, + X1::Service: Unpin, + ::Future: Unpin, { H1Service { expect, @@ -91,9 +108,12 @@ where pub fn upgrade(self, upgrade: Option) -> H1Service where - U1: NewService), Response = ()>, + U1: ServiceFactory), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, + U1::Future: Unpin, + U1::Service: Unpin, + ::Future: Unpin, { H1Service { upgrade, @@ -115,24 +135,35 @@ where } } -impl NewService for H1Service +impl ServiceFactory for H1Service where T: IoStream, - S: NewService, + S: ServiceFactory, + S::Service: Unpin, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin, B: MessageBody, - X: NewService, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - U: NewService< + X::Future: Unpin, + X::Service: Unpin, + ::Future: Unpin, + U: ServiceFactory< Config = SrvConfig, Request = (Request, Framed), Response = (), >, U::Error: fmt::Display, U::InitError: fmt::Debug, + U::Future: Unpin, + U::Service: Unpin, + ::Future: Unpin, + P: Unpin, { type Config = SrvConfig; type Request = Io; @@ -144,7 +175,7 @@ where fn new_service(&self, cfg: &SrvConfig) -> Self::Future { H1ServiceResponse { - fut: self.srv.new_service(cfg).into_future(), + fut: self.srv.new_service(cfg), fut_ex: Some(self.expect.new_service(cfg)), fut_upg: self.upgrade.as_ref().map(|f| f.new_service(cfg)), expect: None, @@ -159,15 +190,25 @@ where #[doc(hidden)] pub struct H1ServiceResponse where - S: NewService, + S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, - X: NewService, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - U: NewService), Response = ()>, + X::Future: Unpin, + X::Service: Unpin, + ::Future: Unpin, + U: ServiceFactory), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, + U::Future: Unpin, + U::Service: Unpin, + ::Future: Unpin, + P: Unpin, { fut: S::Future, fut_ex: Option, @@ -182,49 +223,63 @@ where impl Future for H1ServiceResponse where T: IoStream, - S: NewService, + S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin, B: MessageBody, - X: NewService, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - U: NewService), Response = ()>, + X::Future: Unpin, + X::Service: Unpin, + ::Future: Unpin, + U: ServiceFactory), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, + U::Future: Unpin, + U::Service: Unpin, + ::Future: Unpin, + P: Unpin, { - type Item = H1ServiceHandler; - type Error = (); + type Output = + Result, ()>; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_ex { - let expect = try_ready!(fut - .poll() - .map_err(|e| log::error!("Init http service error: {:?}", e))); - self.expect = Some(expect); - self.fut_ex.take(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + if let Some(ref mut fut) = this.fut_ex { + let expect = ready!(Pin::new(fut) + .poll(cx) + .map_err(|e| log::error!("Init http service error: {:?}", e)))?; + this.expect = Some(expect); + this.fut_ex.take(); } - if let Some(ref mut fut) = self.fut_upg { - let upgrade = try_ready!(fut - .poll() - .map_err(|e| log::error!("Init http service error: {:?}", e))); - self.upgrade = Some(upgrade); - self.fut_ex.take(); + if let Some(ref mut fut) = this.fut_upg { + let upgrade = ready!(Pin::new(fut) + .poll(cx) + .map_err(|e| log::error!("Init http service error: {:?}", e)))?; + this.upgrade = Some(upgrade); + this.fut_ex.take(); } - let service = try_ready!(self - .fut - .poll() + let result = ready!(Pin::new(&mut this.fut) + .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e))); - Ok(Async::Ready(H1ServiceHandler::new( - self.cfg.take().unwrap(), - service, - self.expect.take().unwrap(), - self.upgrade.take(), - self.on_connect.clone(), - ))) + + Poll::Ready(result.map(|service| { + H1ServiceHandler::new( + this.cfg.take().unwrap(), + service, + this.expect.take().unwrap(), + this.upgrade.take(), + this.on_connect.clone(), + ) + })) } } @@ -240,14 +295,18 @@ pub struct H1ServiceHandler { impl H1ServiceHandler where - S: Service, + S: Service + Unpin, S::Error: Into, S::Response: Into>, + S::Future: Unpin, B: MessageBody, - X: Service, + X: Service + Unpin, + X::Future: Unpin, X::Error: Into, - U: Service), Response = ()>, + U: Service), Response = ()> + Unpin, + U::Future: Unpin, U::Error: fmt::Display, + P: Unpin, { fn new( cfg: ServiceConfig, @@ -270,24 +329,28 @@ where impl Service for H1ServiceHandler where T: IoStream, - S: Service, + S: Service + Unpin, S::Error: Into, S::Response: Into>, + S::Future: Unpin, B: MessageBody, - X: Service, + X: Service + Unpin, X::Error: Into, - U: Service), Response = ()>, + X::Future: Unpin, + U: Service), Response = ()> + Unpin, U::Error: fmt::Display, + U::Future: Unpin, + P: Unpin, { type Request = Io; type Response = (); type Error = DispatchError; type Future = Dispatcher; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { let ready = self .expect - .poll_ready() + .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); @@ -297,7 +360,7 @@ where let ready = self .srv - .poll_ready() + .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); @@ -307,9 +370,9 @@ where && ready; if ready { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } else { - Ok(Async::NotReady) + Poll::Pending } } @@ -333,7 +396,7 @@ where } } -/// `NewService` implementation for `OneRequestService` service +/// `ServiceFactory` implementation for `OneRequestService` service #[derive(Default)] pub struct OneRequest { config: ServiceConfig, @@ -353,7 +416,7 @@ where } } -impl NewService for OneRequest +impl ServiceFactory for OneRequest where T: IoStream, { @@ -363,7 +426,7 @@ where type Error = ParseError; type InitError = (); type Service = OneRequestService; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &SrvConfig) -> Self::Future { ok(OneRequestService { @@ -389,8 +452,8 @@ where type Error = ParseError; type Future = OneRequestServiceResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Self::Request) -> Self::Future { @@ -415,19 +478,19 @@ impl Future for OneRequestServiceResponse where T: IoStream, { - type Item = (Request, Framed); - type Error = ParseError; + type Output = Result<(Request, Framed), ParseError>; - fn poll(&mut self) -> Poll { - match self.framed.as_mut().unwrap().poll()? { - Async::Ready(Some(req)) => match req { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.framed.as_mut().unwrap().next_item(cx) { + Poll::Ready(Some(Ok(req))) => match req { Message::Item(req) => { - Ok(Async::Ready((req, self.framed.take().unwrap()))) + Poll::Ready(Ok((req, self.framed.take().unwrap()))) } Message::Chunk(_) => unreachable!("Something is wrong"), }, - Async::Ready(None) => Err(ParseError::Incomplete), - Async::NotReady => Ok(Async::NotReady), + Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)), + Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)), + Poll::Pending => Poll::Pending, } } } diff --git a/actix-http/src/h1/upgrade.rs b/actix-http/src/h1/upgrade.rs index 0278f23e5..43ab53d01 100644 --- a/actix-http/src/h1/upgrade.rs +++ b/actix-http/src/h1/upgrade.rs @@ -1,10 +1,12 @@ +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix_codec::Framed; use actix_server_config::ServerConfig; -use actix_service::{NewService, Service}; -use futures::future::FutureResult; -use futures::{Async, Poll}; +use actix_service::{Service, ServiceFactory}; +use futures::future::Ready; use crate::error::Error; use crate::h1::Codec; @@ -12,14 +14,14 @@ use crate::request::Request; pub struct UpgradeHandler(PhantomData); -impl NewService for UpgradeHandler { +impl ServiceFactory for UpgradeHandler { type Config = ServerConfig; type Request = (Request, Framed); type Response = (); type Error = Error; type Service = UpgradeHandler; type InitError = Error; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &ServerConfig) -> Self::Future { unimplemented!() @@ -30,10 +32,10 @@ impl Service for UpgradeHandler { type Request = (Request, Framed); type Response = (); type Error = Error; - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: Self::Request) -> Self::Future { diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index fdc4cf0bc..bc6914d3b 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -1,5 +1,9 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use futures::{Async, Future, Poll, Sink}; +use futures::Sink; use crate::body::{BodySize, MessageBody, ResponseBody}; use crate::error::Error; @@ -30,63 +34,64 @@ where impl Future for SendResponse where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, B: MessageBody, { - type Item = Framed; - type Error = Error; + type Output = Result, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); - fn poll(&mut self) -> Poll { loop { - let mut body_ready = self.body.is_some(); - let framed = self.framed.as_mut().unwrap(); + let mut body_ready = this.body.is_some(); + let framed = this.framed.as_mut().unwrap(); // send body - if self.res.is_none() && self.body.is_some() { - while body_ready && self.body.is_some() && !framed.is_write_buf_full() { - match self.body.as_mut().unwrap().poll_next()? { - Async::Ready(item) => { + if this.res.is_none() && this.body.is_some() { + while body_ready && this.body.is_some() && !framed.is_write_buf_full() { + match this.body.as_mut().unwrap().poll_next(cx)? { + Poll::Ready(item) => { // body is done if item.is_none() { - let _ = self.body.take(); + let _ = this.body.take(); } framed.force_send(Message::Chunk(item))?; } - Async::NotReady => body_ready = false, + Poll::Pending => body_ready = false, } } } // flush write buffer if !framed.is_write_buf_empty() { - match framed.poll_complete()? { - Async::Ready(_) => { + match framed.flush(cx)? { + Poll::Ready(_) => { if body_ready { continue; } else { - return Ok(Async::NotReady); + return Poll::Pending; } } - Async::NotReady => return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, } } // send response - if let Some(res) = self.res.take() { + if let Some(res) = this.res.take() { framed.force_send(res)?; continue; } - if self.body.is_some() { + if this.body.is_some() { if body_ready { continue; } else { - return Ok(Async::NotReady); + return Poll::Pending; } } else { break; } } - Ok(Async::Ready(self.framed.take().unwrap())) + Poll::Ready(Ok(this.framed.take().unwrap())) } } diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 888f9065e..2a44c83f9 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -1,5 +1,8 @@ use std::collections::VecDeque; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Instant; use std::{fmt, mem, net}; @@ -8,7 +11,7 @@ use actix_server_config::IoStream; use actix_service::Service; use bitflags::bitflags; use bytes::{Bytes, BytesMut}; -use futures::{try_ready, Async, Future, Poll, Sink, Stream}; +use futures::{ready, Sink, Stream}; use h2::server::{Connection, SendResponse}; use h2::{RecvStream, SendStream}; use http::header::{ @@ -43,13 +46,24 @@ pub struct Dispatcher, B: MessageBody _t: PhantomData, } +impl Unpin for Dispatcher +where + T: IoStream, + S: Service, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, + B: MessageBody + 'static, +{ +} + impl Dispatcher where T: IoStream, S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody + 'static, { pub(crate) fn new( @@ -93,61 +107,75 @@ impl Future for Dispatcher where T: IoStream, S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody + 'static, { - type Item = (); - type Error = DispatchError; + type Output = Result<(), DispatchError>; #[inline] - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + loop { - match self.connection.poll()? { - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some((req, res))) => { + match Pin::new(&mut this.connection).poll_accept(cx) { + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())), + Poll::Ready(Some(Ok((req, res)))) => { // update keep-alive expire - if self.ka_timer.is_some() { - if let Some(expire) = self.config.keep_alive_expire() { - self.ka_expire = expire; + if this.ka_timer.is_some() { + if let Some(expire) = this.config.keep_alive_expire() { + this.ka_expire = expire; } } let (parts, body) = req.into_parts(); - let mut req = Request::with_payload(body.into()); + // let b: () = body; + let mut req = Request::with_payload(Payload::< + crate::payload::PayloadStream, + >::H2( + crate::h2::Payload::new(body) + )); let head = &mut req.head_mut(); head.uri = parts.uri; head.method = parts.method; head.version = parts.version; head.headers = parts.headers.into(); - head.peer_addr = self.peer_addr; + head.peer_addr = this.peer_addr; // set on_connect data - if let Some(ref on_connect) = self.on_connect { + if let Some(ref on_connect) = this.on_connect { on_connect.set(&mut req.extensions_mut()); } - tokio_current_thread::spawn(ServiceResponse:: { - state: ServiceResponseState::ServiceCall( - self.service.call(req), - Some(res), - ), - config: self.config.clone(), - buffer: None, - }) + // tokio_executor::current_thread::spawn(ServiceResponse::< + // S::Future, + // S::Response, + // S::Error, + // B, + // > { + // state: ServiceResponseState::ServiceCall( + // this.service.call(req), + // Some(res), + // ), + // config: this.config.clone(), + // buffer: None, + // _t: PhantomData, + // }); } - Async::NotReady => return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, } } } } -struct ServiceResponse { +struct ServiceResponse { state: ServiceResponseState, config: ServiceConfig, buffer: Option, + _t: PhantomData<(I, E)>, } enum ServiceResponseState { @@ -155,11 +183,11 @@ enum ServiceResponseState { SendPayload(SendStream, ResponseBody), } -impl ServiceResponse +impl ServiceResponse where - F: Future, - F::Error: Into, - F::Item: Into>, + F: Future> + Unpin, + E: Into + Unpin + 'static, + I: Into> + Unpin + 'static, B: MessageBody + 'static, { fn prepare_response( @@ -223,109 +251,116 @@ where } } -impl Future for ServiceResponse +impl Future for ServiceResponse where - F: Future, - F::Error: Into, - F::Item: Into>, + F: Future> + Unpin, + E: Into + Unpin + 'static, + I: Into> + Unpin + 'static, B: MessageBody + 'static, { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { - match self.state { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + match this.state { ServiceResponseState::ServiceCall(ref mut call, ref mut send) => { - match call.poll() { - Ok(Async::Ready(res)) => { + match Pin::new(call).poll(cx) { + Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); let mut send = send.take().unwrap(); let mut size = body.size(); - let h2_res = self.prepare_response(res.head(), &mut size); + let h2_res = this.prepare_response(res.head(), &mut size); - let stream = - send.send_response(h2_res, size.is_eof()).map_err(|e| { + let stream = match send.send_response(h2_res, size.is_eof()) { + Err(e) => { trace!("Error sending h2 response: {:?}", e); - })?; + return Poll::Ready(()); + } + Ok(stream) => stream, + }; if size.is_eof() { - Ok(Async::Ready(())) + Poll::Ready(()) } else { - self.state = ServiceResponseState::SendPayload(stream, body); - self.poll() + this.state = ServiceResponseState::SendPayload(stream, body); + Pin::new(this).poll(cx) } } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); let mut send = send.take().unwrap(); let mut size = body.size(); - let h2_res = self.prepare_response(res.head(), &mut size); + let h2_res = this.prepare_response(res.head(), &mut size); - let stream = - send.send_response(h2_res, size.is_eof()).map_err(|e| { + let stream = match send.send_response(h2_res, size.is_eof()) { + Err(e) => { trace!("Error sending h2 response: {:?}", e); - })?; + return Poll::Ready(()); + } + Ok(stream) => stream, + }; if size.is_eof() { - Ok(Async::Ready(())) + Poll::Ready(()) } else { - self.state = ServiceResponseState::SendPayload( + this.state = ServiceResponseState::SendPayload( stream, body.into_body(), ); - self.poll() + Pin::new(this).poll(cx) } } } } ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop { loop { - if let Some(ref mut buffer) = self.buffer { - match stream.poll_capacity().map_err(|e| warn!("{:?}", e))? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some(cap)) => { + if let Some(ref mut buffer) = this.buffer { + match stream.poll_capacity(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(Ok(cap))) => { let len = buffer.len(); let bytes = buffer.split_to(std::cmp::min(cap, len)); if let Err(e) = stream.send_data(bytes, false) { warn!("{:?}", e); - return Err(()); + return Poll::Ready(()); } else if !buffer.is_empty() { let cap = std::cmp::min(buffer.len(), CHUNK_SIZE); stream.reserve_capacity(cap); } else { - self.buffer.take(); + this.buffer.take(); } } + Poll::Ready(Some(Err(e))) => { + warn!("{:?}", e); + return Poll::Ready(()); + } } } else { - match body.poll_next() { - Ok(Async::NotReady) => { - return Ok(Async::NotReady); - } - Ok(Async::Ready(None)) => { + match body.poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { if let Err(e) = stream.send_data(Bytes::new(), true) { warn!("{:?}", e); - return Err(()); - } else { - return Ok(Async::Ready(())); } + return Poll::Ready(()); } - Ok(Async::Ready(Some(chunk))) => { + Poll::Ready(Some(Ok(chunk))) => { stream.reserve_capacity(std::cmp::min( chunk.len(), CHUNK_SIZE, )); - self.buffer = Some(chunk); + this.buffer = Some(chunk); } - Err(e) => { + Poll::Ready(Some(Err(e))) => { error!("Response payload stream error: {:?}", e); - return Err(()); + return Poll::Ready(()); } } } diff --git a/actix-http/src/h2/mod.rs b/actix-http/src/h2/mod.rs index c5972123f..9c902f18c 100644 --- a/actix-http/src/h2/mod.rs +++ b/actix-http/src/h2/mod.rs @@ -1,9 +1,11 @@ #![allow(dead_code, unused_imports)] - use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use bytes::Bytes; -use futures::{Async, Poll, Stream}; +use futures::Stream; use h2::RecvStream; mod dispatcher; @@ -25,22 +27,23 @@ impl Payload { } impl Stream for Payload { - type Item = Bytes; - type Error = PayloadError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - match self.pl.poll() { - Ok(Async::Ready(Some(chunk))) => { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + + match Pin::new(&mut this.pl).poll_data(cx) { + Poll::Ready(Some(Ok(chunk))) => { let len = chunk.len(); - if let Err(err) = self.pl.release_capacity().release_capacity(len) { - Err(err.into()) + if let Err(err) = this.pl.release_capacity().release_capacity(len) { + Poll::Ready(Some(Err(err.into()))) } else { - Ok(Async::Ready(Some(chunk))) + Poll::Ready(Some(Ok(chunk))) } } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err.into()), + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))), + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), } } } diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index e894cf660..559c99308 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -1,13 +1,16 @@ use std::fmt::Debug; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{io, net, rc}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig}; -use actix_service::{IntoNewService, NewService, Service}; +use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use bytes::Bytes; -use futures::future::{ok, FutureResult}; -use futures::{try_ready, Async, Future, IntoFuture, Poll, Stream}; +use futures::future::{ok, Ready}; +use futures::{ready, Stream}; use h2::server::{self, Connection, Handshake}; use h2::RecvStream; use log::error; @@ -23,7 +26,7 @@ use crate::response::Response; use super::dispatcher::Dispatcher; -/// `NewService` implementation for HTTP2 transport +/// `ServiceFactory` implementation for HTTP2 transport pub struct H2Service { srv: S, cfg: ServiceConfig, @@ -33,30 +36,35 @@ pub struct H2Service { impl H2Service where - S: NewService, - S::Error: Into, - S::Response: Into>, - ::Future: 'static, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + ::Future: Unpin + 'static, B: MessageBody + 'static, + P: Unpin, { /// Create new `HttpService` instance. - pub fn new>(service: F) -> Self { + pub fn new>(service: F) -> Self { let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0); H2Service { cfg, on_connect: None, - srv: service.into_new_service(), + srv: service.into_factory(), _t: PhantomData, } } /// Create new `HttpService` instance with config. - pub fn with_config>(cfg: ServiceConfig, service: F) -> Self { + pub fn with_config>( + cfg: ServiceConfig, + service: F, + ) -> Self { H2Service { cfg, on_connect: None, - srv: service.into_new_service(), + srv: service.into_factory(), _t: PhantomData, } } @@ -71,14 +79,16 @@ where } } -impl NewService for H2Service +impl ServiceFactory for H2Service where T: IoStream, - S: NewService, - S::Error: Into, - S::Response: Into>, - ::Future: 'static, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + ::Future: Unpin + 'static, B: MessageBody + 'static, + P: Unpin, { type Config = SrvConfig; type Request = Io; @@ -90,7 +100,7 @@ where fn new_service(&self, cfg: &SrvConfig) -> Self::Future { H2ServiceResponse { - fut: self.srv.new_service(cfg).into_future(), + fut: self.srv.new_service(cfg), cfg: Some(self.cfg.clone()), on_connect: self.on_connect.clone(), _t: PhantomData, @@ -99,8 +109,8 @@ where } #[doc(hidden)] -pub struct H2ServiceResponse { - fut: ::Future, +pub struct H2ServiceResponse { + fut: S::Future, cfg: Option, on_connect: Option Box>>, _t: PhantomData<(T, P, B)>, @@ -109,22 +119,26 @@ pub struct H2ServiceResponse { impl Future for H2ServiceResponse where T: IoStream, - S: NewService, - S::Error: Into, - S::Response: Into>, - ::Future: 'static, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + ::Future: Unpin + 'static, B: MessageBody + 'static, + P: Unpin, { - type Item = H2ServiceHandler; - type Error = S::InitError; + type Output = Result, S::InitError>; - fn poll(&mut self) -> Poll { - let service = try_ready!(self.fut.poll()); - Ok(Async::Ready(H2ServiceHandler::new( - self.cfg.take().unwrap(), - self.on_connect.clone(), - service, - ))) + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + Poll::Ready(ready!(Pin::new(&mut this.fut).poll(cx)).map(|service| { + H2ServiceHandler::new( + this.cfg.take().unwrap(), + this.on_connect.clone(), + service, + ) + })) } } @@ -139,10 +153,11 @@ pub struct H2ServiceHandler { impl H2ServiceHandler where S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody + 'static, + P: Unpin, { fn new( cfg: ServiceConfig, @@ -162,18 +177,19 @@ impl Service for H2ServiceHandler where T: IoStream, S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody + 'static, + P: Unpin, { type Request = Io; type Response = (); type Error = DispatchError; type Future = H2ServiceHandlerResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.srv.poll_ready().map_err(|e| { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + self.srv.poll_ready(cx).map_err(|e| { let e = e.into(); error!("Service readiness error: {:?}", e); DispatchError::Service(e) @@ -219,9 +235,9 @@ pub struct H2ServiceHandlerResponse where T: IoStream, S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody + 'static, { state: State, @@ -231,25 +247,24 @@ impl Future for H2ServiceHandlerResponse where T: IoStream, S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody, { - type Item = (); - type Error = DispatchError; + type Output = Result<(), DispatchError>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.state { - State::Incoming(ref mut disp) => disp.poll(), + State::Incoming(ref mut disp) => Pin::new(disp).poll(cx), State::Handshake( ref mut srv, ref mut config, ref peer_addr, ref mut on_connect, ref mut handshake, - ) => match handshake.poll() { - Ok(Async::Ready(conn)) => { + ) => match Pin::new(handshake).poll(cx) { + Poll::Ready(Ok(conn)) => { self.state = State::Incoming(Dispatcher::new( srv.take().unwrap(), conn, @@ -258,13 +273,13 @@ where None, *peer_addr, )); - self.poll() + self.poll(cx) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => { + Poll::Ready(Err(err)) => { trace!("H2 handshake error: {}", err); - Err(err.into()) + Poll::Ready(Err(err.into())) } + Poll::Pending => Poll::Pending, }, } } diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index b57fdddce..cf528aeec 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -4,7 +4,8 @@ clippy::too_many_arguments, clippy::new_without_default, clippy::borrow_interior_mutable_const, - clippy::write_with_newline + clippy::write_with_newline, + unused_imports )] #[macro_use] @@ -12,7 +13,7 @@ extern crate log; pub mod body; mod builder; -pub mod client; +// pub mod client; mod cloneable; mod config; pub mod encoding; @@ -31,8 +32,8 @@ pub mod cookie; pub mod error; pub mod h1; pub mod h2; -pub mod test; -pub mod ws; +// pub mod test; +// pub mod ws; pub use self::builder::HttpServiceBuilder; pub use self::config::{KeepAlive, ServiceConfig}; diff --git a/actix-http/src/payload.rs b/actix-http/src/payload.rs index 0ce209705..f2cc6414f 100644 --- a/actix-http/src/payload.rs +++ b/actix-http/src/payload.rs @@ -1,11 +1,15 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + use bytes::Bytes; -use futures::{Async, Poll, Stream}; +use futures::Stream; use h2::RecvStream; use crate::error::PayloadError; /// Type represent boxed payload -pub type PayloadStream = Box>; +pub type PayloadStream = Pin>>>; /// Type represent streaming payload pub enum Payload { @@ -48,18 +52,17 @@ impl Payload { impl Stream for Payload where - S: Stream, + S: Stream> + Unpin, { - type Item = Bytes; - type Error = PayloadError; + type Item = Result; #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - match self { - Payload::None => Ok(Async::Ready(None)), - Payload::H1(ref mut pl) => pl.poll(), - Payload::H2(ref mut pl) => pl.poll(), - Payload::Stream(ref mut pl) => pl.poll(), + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match self.get_mut() { + Payload::None => Poll::Ready(None), + Payload::H1(ref mut pl) => pl.readany(cx), + Payload::H2(ref mut pl) => Pin::new(pl).poll_next(cx), + Payload::Stream(ref mut pl) => Pin::new(pl).poll_next(cx), } } } diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index a1541b53e..5b3d17cb4 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -4,7 +4,7 @@ use std::io::Write; use std::{fmt, str}; use bytes::{BufMut, Bytes, BytesMut}; -use futures::future::{ok, FutureResult, IntoFuture}; +use futures::future::{ok, Ready}; use futures::Stream; use serde::Serialize; use serde_json; @@ -280,15 +280,15 @@ impl fmt::Debug for Response { } } -impl IntoFuture for Response { - type Item = Response; - type Error = Error; - type Future = FutureResult; +// impl IntoFuture for Response { +// type Item = Response; +// type Error = Error; +// type Future = FutureResult; - fn into_future(self) -> Self::Future { - ok(self) - } -} +// fn into_future(self) -> Self::Future { +// ok(self) +// } +// } pub struct CookieIter<'a> { iter: header::GetAll<'a>, @@ -635,8 +635,8 @@ impl ResponseBuilder { /// `ResponseBuilder` can not be used after this call. pub fn streaming(&mut self, stream: S) -> Response where - S: Stream + 'static, - E: Into + 'static, + S: Stream> + Unpin + 'static, + E: Into + Unpin + 'static, { self.body(Body::from_message(BodyStream::new(stream))) } @@ -757,15 +757,15 @@ impl<'a> From<&'a ResponseHead> for ResponseBuilder { } } -impl IntoFuture for ResponseBuilder { - type Item = Response; - type Error = Error; - type Future = FutureResult; +// impl IntoFuture for ResponseBuilder { +// type Item = Response; +// type Error = Error; +// type Future = FutureResult; - fn into_future(mut self) -> Self::Future { - ok(self.finish()) - } -} +// fn into_future(mut self) -> Self::Future { +// ok(self.finish()) +// } +// } impl fmt::Debug for ResponseBuilder { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 09b8077b3..65a0c7bd4 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -1,13 +1,15 @@ use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{fmt, io, net, rc}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_server_config::{ Io as ServerIo, IoStream, Protocol, ServerConfig as SrvConfig, }; -use actix_service::{IntoNewService, NewService, Service}; +use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures::{try_ready, Async, Future, IntoFuture, Poll}; +use futures::{ready, Future}; use h2::server::{self, Handshake}; use crate::body::MessageBody; @@ -20,7 +22,7 @@ use crate::request::Request; use crate::response::Response; use crate::{h1, h2::Dispatcher}; -/// `NewService` HTTP1.1/HTTP2 transport implementation +/// `ServiceFactory` HTTP1.1/HTTP2 transport implementation pub struct HttpService> { srv: S, cfg: ServiceConfig, @@ -32,11 +34,13 @@ pub struct HttpService HttpService where - S: NewService, - S::Error: Into, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, - ::Future: 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, B: MessageBody + 'static, { /// Create builder for `HttpService` instance. @@ -47,20 +51,23 @@ where impl HttpService where - S: NewService, - S::Error: Into, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, - ::Future: 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, B: MessageBody + 'static, + P: Unpin, { /// Create new `HttpService` instance. - pub fn new>(service: F) -> Self { + pub fn new>(service: F) -> Self { let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0); HttpService { cfg, - srv: service.into_new_service(), + srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect: None, @@ -69,13 +76,13 @@ where } /// Create new `HttpService` instance with config. - pub(crate) fn with_config>( + pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { HttpService { cfg, - srv: service.into_new_service(), + srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect: None, @@ -86,11 +93,15 @@ where impl HttpService where - S: NewService, - S::Error: Into, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, B: MessageBody, + P: Unpin, { /// Provide service for `EXPECT: 100-Continue` support. /// @@ -99,9 +110,12 @@ where /// request will be forwarded to main service. pub fn expect(self, expect: X1) -> HttpService where - X1: NewService, + X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, + X1::Future: Unpin, + X1::Service: Unpin, + ::Future: Unpin + 'static, { HttpService { expect, @@ -119,13 +133,16 @@ where /// and this service get called with original request and framed object. pub fn upgrade(self, upgrade: Option) -> HttpService where - U1: NewService< + U1: ServiceFactory< Config = SrvConfig, Request = (Request, Framed), Response = (), >, U1::Error: fmt::Display, U1::InitError: fmt::Debug, + U1::Future: Unpin, + U1::Service: Unpin, + ::Future: Unpin + 'static, { HttpService { upgrade, @@ -147,25 +164,35 @@ where } } -impl NewService for HttpService +impl ServiceFactory for HttpService where - T: IoStream, - S: NewService, - S::Error: Into, + T: IoStream + Unpin, + S: ServiceFactory, + S::Service: Unpin, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, - ::Future: 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, B: MessageBody + 'static, - X: NewService, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - U: NewService< + X::Future: Unpin, + X::Service: Unpin, + ::Future: Unpin + 'static, + U: ServiceFactory< Config = SrvConfig, Request = (Request, Framed), Response = (), >, U::Error: fmt::Display, U::InitError: fmt::Debug, + U::Future: Unpin, + U::Service: Unpin, + ::Future: Unpin + 'static, + P: Unpin, { type Config = SrvConfig; type Request = ServerIo; @@ -177,7 +204,7 @@ where fn new_service(&self, cfg: &SrvConfig) -> Self::Future { HttpServiceResponse { - fut: self.srv.new_service(cfg).into_future(), + fut: self.srv.new_service(cfg), fut_ex: Some(self.expect.new_service(cfg)), fut_upg: self.upgrade.as_ref().map(|f| f.new_service(cfg)), expect: None, @@ -190,7 +217,14 @@ where } #[doc(hidden)] -pub struct HttpServiceResponse { +pub struct HttpServiceResponse< + T, + P, + S: ServiceFactory, + B, + X: ServiceFactory, + U: ServiceFactory, +> { fut: S::Future, fut_ex: Option, fut_upg: Option, @@ -204,50 +238,62 @@ pub struct HttpServiceResponse Future for HttpServiceResponse where T: IoStream, - S: NewService, - S::Error: Into, + S: ServiceFactory, + S::Error: Into + Unpin + 'static, S::InitError: fmt::Debug, - S::Response: Into>, - ::Future: 'static, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, + S::Service: Unpin, + ::Future: Unpin + 'static, B: MessageBody + 'static, - X: NewService, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - U: NewService), Response = ()>, + X::Future: Unpin, + X::Service: Unpin, + ::Future: Unpin + 'static, + U: ServiceFactory), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, + U::Future: Unpin, + U::Service: Unpin, + ::Future: Unpin + 'static, + P: Unpin, { - type Item = HttpServiceHandler; - type Error = (); + type Output = + Result, ()>; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_ex { - let expect = try_ready!(fut - .poll() - .map_err(|e| log::error!("Init http service error: {:?}", e))); - self.expect = Some(expect); - self.fut_ex.take(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + if let Some(ref mut fut) = this.fut_ex { + let expect = ready!(Pin::new(fut) + .poll(cx) + .map_err(|e| log::error!("Init http service error: {:?}", e)))?; + this.expect = Some(expect); + this.fut_ex.take(); } - if let Some(ref mut fut) = self.fut_upg { - let upgrade = try_ready!(fut - .poll() - .map_err(|e| log::error!("Init http service error: {:?}", e))); - self.upgrade = Some(upgrade); - self.fut_ex.take(); + if let Some(ref mut fut) = this.fut_upg { + let upgrade = ready!(Pin::new(fut) + .poll(cx) + .map_err(|e| log::error!("Init http service error: {:?}", e)))?; + this.upgrade = Some(upgrade); + this.fut_ex.take(); } - let service = try_ready!(self - .fut - .poll() + let result = ready!(Pin::new(&mut this.fut) + .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e))); - Ok(Async::Ready(HttpServiceHandler::new( - self.cfg.take().unwrap(), - service, - self.expect.take().unwrap(), - self.upgrade.take(), - self.on_connect.clone(), - ))) + Poll::Ready(result.map(|service| { + HttpServiceHandler::new( + this.cfg.take().unwrap(), + service, + this.expect.take().unwrap(), + this.upgrade.take(), + this.on_connect.clone(), + ) + })) } } @@ -263,15 +309,19 @@ pub struct HttpServiceHandler { impl HttpServiceHandler where - S: Service, - S::Error: Into, + S: Service + Unpin, + S::Error: Into + Unpin + 'static, S::Future: 'static, - S::Response: Into>, + S::Response: Into> + Unpin + 'static, + S::Future: Unpin, B: MessageBody + 'static, - X: Service, + X: Service + Unpin, + X::Future: Unpin, X::Error: Into, - U: Service), Response = ()>, + U: Service), Response = ()> + Unpin, + U::Future: Unpin, U::Error: fmt::Display, + P: Unpin, { fn new( cfg: ServiceConfig, @@ -293,26 +343,29 @@ where impl Service for HttpServiceHandler where - T: IoStream, - S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + T: IoStream + Unpin, + S: Service + Unpin, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody + 'static, - X: Service, + X: Service + Unpin, X::Error: Into, - U: Service), Response = ()>, + X::Future: Unpin, + U: Service), Response = ()> + Unpin, U::Error: fmt::Display, + U::Future: Unpin, + P: Unpin, { type Request = ServerIo; type Response = (); type Error = DispatchError; type Future = HttpServiceHandlerResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { let ready = self .expect - .poll_ready() + .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); @@ -322,7 +375,7 @@ where let ready = self .srv - .poll_ready() + .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); @@ -332,9 +385,9 @@ where && ready; if ready { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } else { - Ok(Async::NotReady) + Poll::Pending } } @@ -391,15 +444,17 @@ where enum State where - S: Service, - S::Future: 'static, + S: Service + Unpin, + S::Future: Unpin + 'static, S::Error: Into, - T: IoStream, + T: IoStream + Unpin, B: MessageBody, - X: Service, + X: Service + Unpin, X::Error: Into, - U: Service), Response = ()>, + X::Future: Unpin, + U: Service), Response = ()> + Unpin, U::Error: fmt::Display, + U::Future: Unpin, { H1(h1::Dispatcher), H2(Dispatcher, S, B>), @@ -427,16 +482,18 @@ where pub struct HttpServiceHandlerResponse where - T: IoStream, - S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + T: IoStream + Unpin, + S: Service + Unpin, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody + 'static, - X: Service, + X: Service + Unpin, X::Error: Into, - U: Service), Response = ()>, + X::Future: Unpin, + U: Service), Response = ()> + Unpin, U::Error: fmt::Display, + U::Future: Unpin, { state: State, } @@ -445,32 +502,33 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; impl Future for HttpServiceHandlerResponse where - T: IoStream, - S: Service, - S::Error: Into, - S::Future: 'static, - S::Response: Into>, + T: IoStream + Unpin, + S: Service + Unpin, + S::Error: Into + Unpin + 'static, + S::Future: Unpin + 'static, + S::Response: Into> + Unpin + 'static, B: MessageBody, - X: Service, + X: Service + Unpin, + X::Future: Unpin, X::Error: Into, - U: Service), Response = ()>, + U: Service), Response = ()> + Unpin, + U::Future: Unpin, U::Error: fmt::Display, { - type Item = (); - type Error = DispatchError; + type Output = Result<(), DispatchError>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.state { - State::H1(ref mut disp) => disp.poll(), - State::H2(ref mut disp) => disp.poll(), + State::H1(ref mut disp) => Pin::new(disp).poll(cx), + State::H2(ref mut disp) => Pin::new(disp).poll(cx), State::Unknown(ref mut data) => { if let Some(ref mut item) = data { loop { // Safety - we only write to the returned slice. let b = unsafe { item.1.bytes_mut() }; - let n = try_ready!(item.0.poll_read(b)); + let n = ready!(Pin::new(&mut item.0).poll_read(cx, b))?; if n == 0 { - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } // Safety - we know that 'n' bytes have // been initialized via the contract of @@ -511,17 +569,17 @@ where on_connect, )) } - self.poll() + self.poll(cx) } State::Handshake(ref mut data) => { let conn = if let Some(ref mut item) = data { - match item.0.poll() { - Ok(Async::Ready(conn)) => conn, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => { + match Pin::new(&mut item.0).poll(cx) { + Poll::Ready(Ok(conn)) => conn, + Poll::Ready(Err(err)) => { trace!("H2 handshake error: {}", err); - return Err(err.into()); + return Poll::Ready(Err(err.into())); } + Poll::Pending => return Poll::Pending, } } else { panic!() @@ -530,7 +588,7 @@ where self.state = State::H2(Dispatcher::new( srv, conn, on_connect, cfg, None, peer_addr, )); - self.poll() + self.poll(cx) } } } @@ -542,6 +600,8 @@ struct Io { inner: T, } +impl Unpin for Io {} + impl io::Read for Io { fn read(&mut self, buf: &mut [u8]) -> io::Result { if let Some(mut bytes) = self.unread.take() { @@ -567,22 +627,62 @@ impl io::Write for Io { } } -impl AsyncRead for Io { +impl AsyncRead for Io { + // unsafe fn initializer(&self) -> io::Initializer { + // self.get_mut().inner.initializer() + // } + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_read(cx, buf) + } + + // fn poll_read_vectored( + // self: Pin<&mut Self>, + // cx: &mut Context<'_>, + // bufs: &mut [io::IoSliceMut<'_>], + // ) -> Poll> { + // self.get_mut().inner.poll_read_vectored(cx, bufs) + // } } -impl AsyncWrite for Io { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.inner.shutdown() +impl tokio_io::AsyncWrite for Io { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_write(cx, buf) } - fn write_buf(&mut self, buf: &mut B) -> Poll { - self.inner.write_buf(buf) + + // fn poll_write_vectored( + // self: Pin<&mut Self>, + // cx: &mut Context<'_>, + // bufs: &[io::IoSlice<'_>], + // ) -> Poll> { + // self.get_mut().inner.poll_write_vectored(cx, bufs) + // } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_shutdown(cx) } } -impl IoStream for Io { +impl actix_server_config::IoStream for Io { #[inline] fn peer_addr(&self) -> Option { self.inner.peer_addr() diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index ed5b81a35..817bf480d 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -1,12 +1,13 @@ //! Test Various helpers for Actix applications to use during testing. use std::fmt::Write as FmtWrite; use std::io; +use std::pin::Pin; use std::str::FromStr; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_server_config::IoStream; use bytes::{Buf, Bytes, BytesMut}; -use futures::{Async, Poll}; use http::header::{self, HeaderName, HeaderValue}; use http::{HttpTryFrom, Method, Uri, Version}; use percent_encoding::percent_encode; @@ -244,16 +245,16 @@ impl io::Write for TestBuffer { } } -impl AsyncRead for TestBuffer {} +// impl AsyncRead for TestBuffer {} -impl AsyncWrite for TestBuffer { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(Async::Ready(())) - } - fn write_buf(&mut self, _: &mut B) -> Poll { - Ok(Async::NotReady) - } -} +// impl AsyncWrite for TestBuffer { +// fn shutdown(&mut self) -> Poll<(), io::Error> { +// Ok(Async::Ready(())) +// } +// fn write_buf(&mut self, _: &mut B) -> Poll { +// Ok(Async::NotReady) +// } +// } impl IoStream for TestBuffer { fn set_nodelay(&mut self, _nodelay: bool) -> io::Result<()> {