1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

make actix-http compile with std::future

This commit is contained in:
Nikolay Kim 2019-11-15 15:54:11 +06:00
parent 5cb2d500d1
commit 8cba1170e6
28 changed files with 1176 additions and 822 deletions

View File

@ -28,19 +28,20 @@ path = "src/lib.rs"
[workspace]
members = [
".",
"awc",
"actix-http",
"actix-cors",
"actix-files",
"actix-framed",
"actix-session",
"actix-identity",
"actix-multipart",
"actix-web-actors",
"actix-web-codegen",
"test-server",
# ".",
# "awc",
# #"actix-http",
# "actix-cors",
# "actix-files",
# "actix-framed",
# "actix-session",
# "actix-identity",
# "actix-multipart",
# "actix-web-actors",
# "actix-web-codegen",
# "test-server",
]
exclude = ["actix-http"]
[features]
default = ["brotli", "flate2-zlib", "client", "fail"]
@ -122,12 +123,23 @@ opt-level = 3
codegen-units = 1
[patch.crates-io]
actix-web = { path = "." }
actix-http = { path = "actix-http" }
actix-http-test = { path = "test-server" }
actix-web-codegen = { path = "actix-web-codegen" }
actix-web-actors = { path = "actix-web-actors" }
actix-session = { path = "actix-session" }
actix-files = { path = "actix-files" }
actix-multipart = { path = "actix-multipart" }
awc = { path = "awc" }
# actix-web = { path = "." }
# actix-http = { path = "actix-http" }
# actix-http-test = { path = "test-server" }
# actix-web-codegen = { path = "actix-web-codegen" }
# actix-web-actors = { path = "actix-web-actors" }
# actix-session = { path = "actix-session" }
# actix-files = { path = "actix-files" }
# actix-multipart = { path = "actix-multipart" }
# awc = { path = "awc" }
actix-codec = { path = "../actix-net/actix-codec" }
actix-connect = { path = "../actix-net/actix-connect" }
actix-ioframe = { path = "../actix-net/actix-ioframe" }
actix-rt = { path = "../actix-net/actix-rt" }
actix-server = { path = "../actix-net/actix-server" }
actix-server-config = { path = "../actix-net/actix-server-config" }
actix-service = { path = "../actix-net/actix-service" }
actix-testing = { path = "../actix-net/actix-testing" }
actix-threadpool = { path = "../actix-net/actix-threadpool" }
actix-utils = { path = "../actix-net/actix-utils" }

View File

@ -1,6 +1,6 @@
[package]
name = "actix-http"
version = "0.2.11"
version = "0.3.0-alpha.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http primitives"
readme = "README.md"
@ -13,10 +13,11 @@ categories = ["network-programming", "asynchronous",
"web-programming::websocket"]
license = "MIT/Apache-2.0"
edition = "2018"
workspace = ".."
# workspace = ".."
[package.metadata.docs.rs]
features = ["ssl", "fail", "brotli", "flate2-zlib", "secure-cookies"]
features = ["openssl", "fail", "brotli", "flate2-zlib", "secure-cookies"]
[lib]
name = "actix_http"
@ -26,10 +27,10 @@ path = "src/lib.rs"
default = []
# openssl
ssl = ["openssl", "actix-connect/ssl"]
openssl = ["open-ssl", "actix-connect/openssl"]
# rustls support
rust-tls = ["rustls", "webpki-roots", "actix-connect/rust-tls"]
rustls = ["rust-tls", "webpki-roots", "actix-connect/rustls"]
# brotli encoding, requires c compiler
brotli = ["brotli2"]
@ -47,23 +48,24 @@ fail = ["failure"]
secure-cookies = ["ring"]
[dependencies]
actix-service = "0.4.1"
actix-codec = "0.1.2"
actix-connect = "0.2.4"
actix-utils = "0.4.4"
actix-server-config = "0.1.2"
actix-threadpool = "0.1.1"
actix-service = "1.0.0-alpha.1"
actix-codec = "0.2.0-alpha.1"
actix-connect = "1.0.0-alpha.1"
actix-utils = "0.5.0-alpha.1"
actix-server-config = "0.3.0-alpha.1"
actix-threadpool = "0.2.0-alpha.1"
base64 = "0.10"
bitflags = "1.0"
bytes = "0.4"
copyless = "0.1.4"
chrono = "0.4.6"
derive_more = "0.15.0"
either = "1.5.2"
encoding_rs = "0.8"
futures = "0.1.25"
futures = "0.3.1"
hashbrown = "0.6.3"
h2 = "0.1.16"
h2 = "0.2.0-alpha.3"
http = "0.1.17"
httparse = "1.3"
indexmap = "1.2"
@ -80,13 +82,16 @@ sha1 = "0.6"
slab = "0.4"
serde_urlencoded = "0.6.1"
time = "0.1.42"
tokio-tcp = "0.1.3"
tokio-timer = "0.2.8"
tokio-current-thread = "0.1"
trust-dns-resolver = { version="0.11.1", default-features = false }
tokio = "=0.2.0-alpha.6"
tokio-io = "=0.2.0-alpha.6"
tokio-net = "=0.2.0-alpha.6"
tokio-timer = "0.3.0-alpha.6"
tokio-executor = "=0.2.0-alpha.6"
trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false }
# for secure cookie
ring = { version = "0.14.6", optional = true }
ring = { version = "0.16.9", optional = true }
# compression
brotli2 = { version="0.3.2", optional = true }
@ -94,17 +99,25 @@ flate2 = { version="1.0.7", optional = true, default-features = false }
# optional deps
failure = { version = "0.1.5", optional = true }
openssl = { version="0.10", optional = true }
rustls = { version = "0.15.2", optional = true }
webpki-roots = { version = "0.16", optional = true }
chrono = "0.4.6"
open-ssl = { version="0.10", package="openssl", optional = true }
rust-tls = { version = "0.16.0", package="rustls", optional = true }
webpki-roots = { version = "0.18", optional = true }
[dev-dependencies]
actix-rt = "0.2.2"
actix-server = { version = "0.6.0", features=["ssl", "rust-tls"] }
actix-connect = { version = "0.2.0", features=["ssl"] }
actix-http-test = { version = "0.2.4", features=["ssl"] }
actix-rt = "1.0.0-alpha.1"
actix-server = { version = "0.8.0-alpha.1", features=["openssl"] }
actix-connect = { version = "1.0.0-alpha.1", features=["openssl"] }
#actix-http-test = { version = "0.2.4", features=["ssl"] }
env_logger = "0.6"
serde_derive = "1.0"
openssl = { version="0.10" }
tokio-tcp = "0.1"
open-ssl = { version="0.10", package="openssl" }
[patch.crates-io]
actix-codec = { path = "../../actix-net/actix-codec" }
actix-connect = { path = "../../actix-net/actix-connect" }
actix-rt = { path = "../../actix-net/actix-rt" }
actix-server = { path = "../../actix-net/actix-server" }
actix-server-config = { path = "../../actix-net/actix-server-config" }
actix-service = { path = "../../actix-net/actix-service" }
actix-threadpool = { path = "../../actix-net/actix-threadpool" }
actix-utils = { path = "../../actix-net/actix-utils" }

View File

@ -1,8 +1,10 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, mem};
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream};
use futures::Stream;
use crate::error::Error;
@ -29,10 +31,10 @@ impl BodySize {
}
/// Type that provides this trait can be streamed to a peer.
pub trait MessageBody {
pub trait MessageBody: Unpin {
fn size(&self) -> BodySize;
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error>;
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>>;
}
impl MessageBody for () {
@ -40,8 +42,8 @@ impl MessageBody for () {
BodySize::Empty
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
Ok(Async::Ready(None))
fn poll_next(&mut self, _: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
Poll::Ready(None)
}
}
@ -50,8 +52,8 @@ impl<T: MessageBody> MessageBody for Box<T> {
self.as_ref().size()
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
self.as_mut().poll_next()
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
self.as_mut().poll_next(cx)
}
}
@ -93,20 +95,19 @@ impl<B: MessageBody> MessageBody for ResponseBody<B> {
}
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
match self {
ResponseBody::Body(ref mut body) => body.poll_next(),
ResponseBody::Other(ref mut body) => body.poll_next(),
ResponseBody::Body(ref mut body) => body.poll_next(cx),
ResponseBody::Other(ref mut body) => body.poll_next(cx),
}
}
}
impl<B: MessageBody> Stream for ResponseBody<B> {
type Item = Bytes;
type Error = Error;
type Item = Result<Bytes, Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.poll_next()
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.get_mut().poll_next(cx)
}
}
@ -144,19 +145,19 @@ impl MessageBody for Body {
}
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
match self {
Body::None => Ok(Async::Ready(None)),
Body::Empty => Ok(Async::Ready(None)),
Body::None => Poll::Ready(None),
Body::Empty => Poll::Ready(None),
Body::Bytes(ref mut bin) => {
let len = bin.len();
if len == 0 {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(mem::replace(bin, Bytes::new()))))
Poll::Ready(Some(Ok(mem::replace(bin, Bytes::new()))))
}
}
Body::Message(ref mut body) => body.poll_next(),
Body::Message(ref mut body) => body.poll_next(cx),
}
}
}
@ -242,7 +243,7 @@ impl From<serde_json::Value> for Body {
impl<S> From<SizedStream<S>> for Body
where
S: Stream<Item = Bytes, Error = Error> + 'static,
S: Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
{
fn from(s: SizedStream<S>) -> Body {
Body::from_message(s)
@ -251,8 +252,8 @@ where
impl<S, E> From<BodyStream<S, E>> for Body
where
S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static,
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + Unpin + 'static,
{
fn from(s: BodyStream<S, E>) -> Body {
Body::from_message(s)
@ -264,11 +265,11 @@ impl MessageBody for Bytes {
BodySize::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, _: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(mem::replace(self, Bytes::new()))))
Poll::Ready(Some(Ok(mem::replace(self, Bytes::new()))))
}
}
}
@ -278,13 +279,11 @@ impl MessageBody for BytesMut {
BodySize::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, _: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(
mem::replace(self, BytesMut::new()).freeze(),
)))
Poll::Ready(Some(Ok(mem::replace(self, BytesMut::new()).freeze())))
}
}
}
@ -294,11 +293,11 @@ impl MessageBody for &'static str {
BodySize::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, _: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(Bytes::from_static(
Poll::Ready(Some(Ok(Bytes::from_static(
mem::replace(self, "").as_ref(),
))))
}
@ -310,13 +309,11 @@ impl MessageBody for &'static [u8] {
BodySize::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, _: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(Bytes::from_static(mem::replace(
self, b"",
)))))
Poll::Ready(Some(Ok(Bytes::from_static(mem::replace(self, b"")))))
}
}
}
@ -326,14 +323,11 @@ impl MessageBody for Vec<u8> {
BodySize::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, _: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(Bytes::from(mem::replace(
self,
Vec::new(),
)))))
Poll::Ready(Some(Ok(Bytes::from(mem::replace(self, Vec::new())))))
}
}
}
@ -343,11 +337,11 @@ impl MessageBody for String {
BodySize::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, _: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
Ok(Async::Ready(Some(Bytes::from(
Poll::Ready(Some(Ok(Bytes::from(
mem::replace(self, String::new()).into_bytes(),
))))
}
@ -363,7 +357,7 @@ pub struct BodyStream<S, E> {
impl<S, E> BodyStream<S, E>
where
S: Stream<Item = Bytes, Error = E>,
S: Stream<Item = Result<Bytes, E>>,
E: Into<Error>,
{
pub fn new(stream: S) -> Self {
@ -376,15 +370,17 @@ where
impl<S, E> MessageBody for BodyStream<S, E>
where
S: Stream<Item = Bytes, Error = E>,
E: Into<Error>,
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error> + Unpin,
{
fn size(&self) -> BodySize {
BodySize::Stream
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
self.stream.poll().map_err(std::convert::Into::into)
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
Pin::new(&mut self.stream)
.poll_next(cx)
.map(|res| res.map(|res| res.map_err(std::convert::Into::into)))
}
}
@ -397,7 +393,7 @@ pub struct SizedStream<S> {
impl<S> SizedStream<S>
where
S: Stream<Item = Bytes, Error = Error>,
S: Stream<Item = Result<Bytes, Error>>,
{
pub fn new(size: u64, stream: S) -> Self {
SizedStream { size, stream }
@ -406,14 +402,14 @@ where
impl<S> MessageBody for SizedStream<S>
where
S: Stream<Item = Bytes, Error = Error>,
S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
fn size(&self) -> BodySize {
BodySize::Sized64(self.size)
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
self.stream.poll()
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}

View File

@ -4,7 +4,7 @@ use std::rc::Rc;
use actix_codec::Framed;
use actix_server_config::ServerConfig as SrvConfig;
use actix_service::{IntoNewService, NewService, Service};
use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use crate::body::MessageBody;
use crate::config::{KeepAlive, ServiceConfig};
@ -32,9 +32,12 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler<T>> {
impl<T, S> HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler<T>>
where
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
{
/// Create instance of `ServiceConfigBuilder`
pub fn new() -> Self {
@ -52,19 +55,28 @@ where
impl<T, S, X, U> HttpServiceBuilder<T, S, X, U>
where
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
X: NewService<Config = SrvConfig, Request = Request, Response = Request>,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
X: ServiceFactory<Config = SrvConfig, Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: NewService<
X::Future: Unpin,
X::Service: Unpin,
<X::Service as Service>::Future: Unpin + 'static,
U: ServiceFactory<
Config = SrvConfig,
Request = (Request, Framed<T, Codec>),
Response = (),
>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
U::Future: Unpin,
U::Service: Unpin,
<U::Service as Service>::Future: Unpin + 'static,
{
/// Set server keep-alive setting.
///
@ -108,16 +120,19 @@ where
/// request will be forwarded to main service.
pub fn expect<F, X1>(self, expect: F) -> HttpServiceBuilder<T, S, X1, U>
where
F: IntoNewService<X1>,
X1: NewService<Config = SrvConfig, Request = Request, Response = Request>,
F: IntoServiceFactory<X1>,
X1: ServiceFactory<Config = SrvConfig, Request = Request, Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
X1::Future: Unpin,
X1::Service: Unpin,
<X1::Service as Service>::Future: Unpin + 'static,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
client_timeout: self.client_timeout,
client_disconnect: self.client_disconnect,
expect: expect.into_new_service(),
expect: expect.into_factory(),
upgrade: self.upgrade,
on_connect: self.on_connect,
_t: PhantomData,
@ -130,21 +145,24 @@ where
/// and this service get called with original request and framed object.
pub fn upgrade<F, U1>(self, upgrade: F) -> HttpServiceBuilder<T, S, X, U1>
where
F: IntoNewService<U1>,
U1: NewService<
F: IntoServiceFactory<U1>,
U1: ServiceFactory<
Config = SrvConfig,
Request = (Request, Framed<T, Codec>),
Response = (),
>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
U1::Future: Unpin,
U1::Service: Unpin,
<U1::Service as Service>::Future: Unpin + 'static,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
client_timeout: self.client_timeout,
client_disconnect: self.client_disconnect,
expect: self.expect,
upgrade: Some(upgrade.into_new_service()),
upgrade: Some(upgrade.into_factory()),
on_connect: self.on_connect,
_t: PhantomData,
}
@ -167,17 +185,21 @@ where
pub fn h1<F, P, B>(self, service: F) -> H1Service<T, P, S, B, X, U>
where
B: MessageBody + 'static,
F: IntoNewService<S>,
S::Error: Into<Error>,
F: IntoServiceFactory<S>,
S::Future: Unpin,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
P: Unpin,
{
let cfg = ServiceConfig::new(
self.keep_alive,
self.client_timeout,
self.client_disconnect,
);
H1Service::with_config(cfg, service.into_new_service())
H1Service::with_config(cfg, service.into_factory())
.expect(self.expect)
.upgrade(self.upgrade)
.on_connect(self.on_connect)
@ -187,37 +209,42 @@ where
pub fn h2<F, P, B>(self, service: F) -> H2Service<T, P, S, B>
where
B: MessageBody + 'static,
F: IntoNewService<S>,
S::Error: Into<Error>,
F: IntoServiceFactory<S>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
P: Unpin,
{
let cfg = ServiceConfig::new(
self.keep_alive,
self.client_timeout,
self.client_disconnect,
);
H2Service::with_config(cfg, service.into_new_service())
.on_connect(self.on_connect)
H2Service::with_config(cfg, service.into_factory()).on_connect(self.on_connect)
}
/// Finish service configuration and create `HttpService` instance.
pub fn finish<F, P, B>(self, service: F) -> HttpService<T, P, S, B, X, U>
where
B: MessageBody + 'static,
F: IntoNewService<S>,
S::Error: Into<Error>,
F: IntoServiceFactory<S>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
P: Unpin,
{
let cfg = ServiceConfig::new(
self.keep_alive,
self.client_timeout,
self.client_disconnect,
);
HttpService::with_config(cfg, service.into_new_service())
HttpService::with_config(cfg, service.into_factory())
.expect(self.expect)
.upgrade(self.upgrade)
.on_connect(self.on_connect)

View File

@ -1,15 +1,18 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_connect::{
default_connector, Connect as TcpConnect, Connection as TcpConnection,
};
use actix_service::{apply_fn, Service, ServiceExt};
use actix_service::{apply_fn, Service};
use actix_utils::timeout::{TimeoutError, TimeoutService};
use http::Uri;
use tokio_tcp::TcpStream;
use tokio_net::tcp::TcpStream;
use super::connection::Connection;
use super::error::ConnectError;
@ -212,7 +215,7 @@ where
pub fn finish(
self,
) -> impl Service<Request = Connect, Response = impl Connection, Error = ConnectError>
+ Clone {
+ Clone {
#[cfg(not(any(feature = "ssl", feature = "rust-tls")))]
{
let connector = TimeoutService::new(

View File

@ -1,10 +1,13 @@
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{io, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::{BufMut, Bytes, BytesMut};
use futures::future::{ok, Either};
use futures::{Async, Future, Poll, Sink, Stream};
use futures::{Sink, Stream};
use crate::error::PayloadError;
use crate::h1;

View File

@ -1,9 +1,11 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time;
use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes;
use futures::future::{err, Either};
use futures::{Async, Future, Poll};
use h2::{client::SendRequest, SendStream};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, HttpTryFrom, Method, Version};

View File

@ -1,22 +1,24 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::Service;
use actix_utils::oneshot;
use actix_utils::task::LocalWaker;
use bytes::Bytes;
use futures::future::{err, ok, Either, FutureResult};
use futures::task::AtomicTask;
use futures::unsync::oneshot;
use futures::{Async, Future, Poll};
use h2::client::{handshake, Handshake};
use hashbrown::HashMap;
use http::uri::Authority;
use indexmap::IndexSet;
use slab::Slab;
use tokio_timer::{sleep, Delay};
use tokio_timer::{delay_for, Delay};
use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectError;
@ -140,7 +142,7 @@ where
// start support future
if !support {
self.1.as_ref().borrow_mut().task = Some(AtomicTask::new());
tokio_current_thread::spawn(ConnectorPoolSupport {
tokio_executor::current_thread::spawn(ConnectorPoolSupport {
connector: self.0.clone(),
inner: self.1.clone(),
})
@ -255,7 +257,7 @@ where
if let Some(ref mut h2) = self.h2 {
return match h2.poll() {
Ok(Async::Ready((snd, connection))) => {
tokio_current_thread::spawn(connection.map_err(|_| ()));
tokio_executor::current_thread::spawn(connection.map_err(|_| ()));
Ok(Async::Ready(IoConnection::new(
ConnectionType::H2(snd),
Instant::now(),
@ -373,7 +375,7 @@ where
{
if let Some(timeout) = self.disconnect_timeout {
if let ConnectionType::H1(io) = conn.io {
tokio_current_thread::spawn(CloseConnection::new(
tokio_executor::current_thread::spawn(CloseConnection::new(
io, timeout,
))
}
@ -387,7 +389,7 @@ where
Ok(n) if n > 0 => {
if let Some(timeout) = self.disconnect_timeout {
if let ConnectionType::H1(io) = io {
tokio_current_thread::spawn(
tokio_executor::current_thread::spawn(
CloseConnection::new(io, timeout),
)
}
@ -421,7 +423,7 @@ where
self.acquired -= 1;
if let Some(timeout) = self.disconnect_timeout {
if let ConnectionType::H1(io) = io {
tokio_current_thread::spawn(CloseConnection::new(io, timeout))
tokio_executor::current_thread::spawn(CloseConnection::new(io, timeout))
}
}
self.check_availibility();
@ -448,7 +450,7 @@ where
fn new(io: T, timeout: Duration) -> Self {
CloseConnection {
io,
timeout: sleep(timeout),
timeout: delay_for(timeout),
}
}
}
@ -558,7 +560,7 @@ where
inner: Rc<RefCell<Inner<Io>>>,
fut: F,
) {
tokio_current_thread::spawn(OpenWaitingConnection {
tokio_executor::current_thread::spawn(OpenWaitingConnection {
key,
fut,
h2: None,
@ -593,7 +595,7 @@ where
if let Some(ref mut h2) = self.h2 {
return match h2.poll() {
Ok(Async::Ready((snd, connection))) => {
tokio_current_thread::spawn(connection.map_err(|_| ()));
tokio_executor::current_thread::spawn(connection.map_err(|_| ()));
let rx = self.rx.take().unwrap();
let _ = rx.send(Ok(IoConnection::new(
ConnectionType::H2(snd),

View File

@ -1,8 +1,8 @@
use std::cell::UnsafeCell;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_service::Service;
use futures::Poll;
#[doc(hidden)]
/// Service that allows to turn non-clone service to a service with `Clone` impl
@ -32,8 +32,8 @@ where
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
unsafe { &mut *self.0.as_ref().get() }.poll_ready()
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
unsafe { &mut *self.0.as_ref().get() }.poll_ready(cx)
}
fn call(&mut self, req: T::Request) -> Self::Future {

View File

@ -5,9 +5,9 @@ use std::rc::Rc;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use futures::{future, Future};
use futures::{future, Future, FutureExt};
use time;
use tokio_timer::{sleep, Delay};
use tokio_timer::{delay, delay_for, Delay};
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH: usize = 29;
@ -104,10 +104,10 @@ impl ServiceConfig {
#[inline]
/// Client timeout for first request.
pub fn client_timer(&self) -> Option<Delay> {
let delay = self.0.client_timeout;
if delay != 0 {
Some(Delay::new(
self.0.timer.now() + Duration::from_millis(delay),
let delay_time = self.0.client_timeout;
if delay_time != 0 {
Some(delay(
self.0.timer.now() + Duration::from_millis(delay_time),
))
} else {
None
@ -138,7 +138,7 @@ impl ServiceConfig {
/// Return keep-alive timer delay is configured.
pub fn keep_alive_timer(&self) -> Option<Delay> {
if let Some(ka) = self.0.keep_alive {
Some(Delay::new(self.0.timer.now() + ka))
Some(delay(self.0.timer.now() + ka))
} else {
None
}
@ -242,12 +242,12 @@ impl DateService {
// periodic date update
let s = self.clone();
tokio_current_thread::spawn(sleep(Duration::from_millis(500)).then(
move |_| {
tokio_executor::current_thread::spawn(
delay_for(Duration::from_millis(500)).then(move |_| {
s.0.reset();
future::ok(())
},
));
future::ready(())
}),
);
}
}

View File

@ -1,4 +1,7 @@
use std::future::Future;
use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_threadpool::{run, CpuFuture};
#[cfg(feature = "brotli")]
@ -6,7 +9,7 @@ use brotli2::write::BrotliDecoder;
use bytes::Bytes;
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
use flate2::write::{GzDecoder, ZlibDecoder};
use futures::{try_ready, Async, Future, Poll, Stream};
use futures::{ready, Stream};
use super::Writer;
use crate::error::PayloadError;
@ -18,12 +21,12 @@ pub struct Decoder<S> {
decoder: Option<ContentDecoder>,
stream: S,
eof: bool,
fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
fut: Option<CpuFuture<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
}
impl<S> Decoder<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
S: Stream<Item = Result<Bytes, PayloadError>>,
{
/// Construct a decoder.
#[inline]
@ -71,34 +74,41 @@ where
impl<S> Stream for Decoder<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
{
type Item = Bytes;
type Error = PayloadError;
type Item = Result<Bytes, PayloadError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
loop {
if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = try_ready!(fut.poll());
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
Ok(Ok(item)) => item,
Ok(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
self.decoder = Some(decoder);
self.fut.take();
if let Some(chunk) = chunk {
return Ok(Async::Ready(Some(chunk)));
return Poll::Ready(Some(Ok(chunk)));
}
}
if self.eof {
return Ok(Async::Ready(None));
return Poll::Ready(None);
}
match self.stream.poll()? {
Async::Ready(Some(chunk)) => {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
Poll::Ready(Some(Ok(chunk))) => {
if let Some(mut decoder) = self.decoder.take() {
if chunk.len() < INPLACE {
let chunk = decoder.feed_data(chunk)?;
self.decoder = Some(decoder);
if let Some(chunk) = chunk {
return Ok(Async::Ready(Some(chunk)));
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(run(move || {
@ -108,21 +118,25 @@ where
}
continue;
} else {
return Ok(Async::Ready(Some(chunk)));
return Poll::Ready(Some(Ok(chunk)));
}
}
Async::Ready(None) => {
Poll::Ready(None) => {
self.eof = true;
return if let Some(mut decoder) = self.decoder.take() {
Ok(Async::Ready(decoder.feed_eof()?))
match decoder.feed_eof() {
Ok(Some(res)) => Poll::Ready(Some(Ok(res))),
Ok(None) => Poll::Ready(None),
Err(err) => Poll::Ready(Some(Err(err.into()))),
}
} else {
Ok(Async::Ready(None))
Poll::Ready(None)
};
}
Async::NotReady => break,
Poll::Pending => break,
}
}
Ok(Async::NotReady)
Poll::Pending
}
}

View File

@ -1,5 +1,8 @@
//! Stream encoder
use std::future::Future;
use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_threadpool::{run, CpuFuture};
#[cfg(feature = "brotli")]
@ -7,7 +10,6 @@ use brotli2::write::BrotliEncoder;
use bytes::Bytes;
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
use flate2::write::{GzEncoder, ZlibEncoder};
use futures::{Async, Future, Poll};
use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::{ContentEncoding, CONTENT_ENCODING};
@ -22,7 +24,7 @@ pub struct Encoder<B> {
eof: bool,
body: EncoderBody<B>,
encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>,
fut: Option<CpuFuture<Result<ContentEncoder, io::Error>>>,
}
impl<B: MessageBody> Encoder<B> {
@ -94,43 +96,46 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
}
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
loop {
if self.eof {
return Ok(Async::Ready(None));
return Poll::Ready(None);
}
if let Some(ref mut fut) = self.fut {
let mut encoder = futures::try_ready!(fut.poll());
let mut encoder = match futures::ready!(Pin::new(fut).poll(cx)) {
Ok(Ok(item)) => item,
Ok(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
let chunk = encoder.take();
self.encoder = Some(encoder);
self.fut.take();
if !chunk.is_empty() {
return Ok(Async::Ready(Some(chunk)));
return Poll::Ready(Some(Ok(chunk)));
}
}
let result = match self.body {
EncoderBody::Bytes(ref mut b) => {
if b.is_empty() {
Async::Ready(None)
Poll::Ready(None)
} else {
Async::Ready(Some(std::mem::replace(b, Bytes::new())))
Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new()))))
}
}
EncoderBody::Stream(ref mut b) => b.poll_next()?,
EncoderBody::BoxedStream(ref mut b) => b.poll_next()?,
EncoderBody::Stream(ref mut b) => b.poll_next(cx),
EncoderBody::BoxedStream(ref mut b) => b.poll_next(cx),
};
match result {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(Some(chunk)) => {
Poll::Ready(Some(Ok(chunk))) => {
if let Some(mut encoder) = self.encoder.take() {
if chunk.len() < INPLACE {
encoder.write(&chunk)?;
let chunk = encoder.take();
self.encoder = Some(encoder);
if !chunk.is_empty() {
return Ok(Async::Ready(Some(chunk)));
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(run(move || {
@ -139,22 +144,23 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
}));
}
} else {
return Ok(Async::Ready(Some(chunk)));
return Poll::Ready(Some(Ok(chunk)));
}
}
Async::Ready(None) => {
Poll::Ready(None) => {
if let Some(encoder) = self.encoder.take() {
let chunk = encoder.finish()?;
if chunk.is_empty() {
return Ok(Async::Ready(None));
return Poll::Ready(None);
} else {
self.eof = true;
return Ok(Async::Ready(Some(chunk)));
return Poll::Ready(Some(Ok(chunk)));
}
} else {
return Ok(Async::Ready(None));
return Poll::Ready(None);
}
}
val => return val,
}
}
}

View File

@ -6,11 +6,10 @@ use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::{fmt, io, result};
pub use actix_threadpool::BlockingError;
use actix_utils::timeout::TimeoutError;
use bytes::BytesMut;
use derive_more::{Display, From};
use futures::Canceled;
use futures::channel::oneshot::Canceled;
use http::uri::InvalidUri;
use http::{header, Error as HttpError, StatusCode};
use httparse;
@ -197,8 +196,8 @@ impl ResponseError for DeError {
}
}
/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
/// `InternalServerError` for `Canceled`
impl ResponseError for Canceled {}
/// Return `BAD_REQUEST` for `Utf8Error`
impl ResponseError for Utf8Error {
@ -236,9 +235,6 @@ impl ResponseError for header::InvalidHeaderValueBytes {
}
}
/// `InternalServerError` for `futures::Canceled`
impl ResponseError for Canceled {}
/// A set of errors that can occur during parsing HTTP streams
#[derive(Debug, Display)]
pub enum ParseError {
@ -365,15 +361,12 @@ impl From<io::Error> for PayloadError {
}
}
impl From<BlockingError<io::Error>> for PayloadError {
fn from(err: BlockingError<io::Error>) -> Self {
match err {
BlockingError::Error(e) => PayloadError::Io(e),
BlockingError::Canceled => PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Thread pool is gone",
)),
}
impl From<Canceled> for PayloadError {
fn from(_: Canceled) -> Self {
PayloadError::Io(io::Error::new(
io::ErrorKind::Other,
"Operation is canceled",
))
}
}
@ -390,12 +383,12 @@ impl ResponseError for PayloadError {
}
}
/// Return `BadRequest` for `cookie::ParseError`
impl ResponseError for crate::cookie::ParseError {
fn error_response(&self) -> Response {
Response::new(StatusCode::BAD_REQUEST)
}
}
// /// Return `BadRequest` for `cookie::ParseError`
// impl ResponseError for crate::cookie::ParseError {
// fn error_response(&self) -> Response {
// Response::new(StatusCode::BAD_REQUEST)
// }
// }
#[derive(Debug, Display, From)]
/// A set of errors that can occur during dispatching http requests

View File

@ -1,10 +1,12 @@
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::Decoder;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll};
use http::header::{HeaderName, HeaderValue};
use http::{header, HttpTryFrom, Method, StatusCode, Uri, Version};
use httparse;
@ -442,9 +444,10 @@ impl Decoder for PayloadDecoder {
loop {
let mut buf = None;
// advances the chunked state
*state = match state.step(src, size, &mut buf)? {
Async::NotReady => return Ok(None),
Async::Ready(state) => state,
*state = match state.step(src, size, &mut buf) {
Poll::Pending => return Ok(None),
Poll::Ready(Ok(state)) => state,
Poll::Ready(Err(e)) => return Err(e),
};
if *state == ChunkedState::End {
trace!("End of chunked stream");
@ -476,7 +479,7 @@ macro_rules! byte (
$rdr.split_to(1);
b
} else {
return Ok(Async::NotReady)
return Poll::Pending
}
})
);
@ -487,7 +490,7 @@ impl ChunkedState {
body: &mut BytesMut,
size: &mut u64,
buf: &mut Option<Bytes>,
) -> Poll<ChunkedState, io::Error> {
) -> Poll<Result<ChunkedState, io::Error>> {
use self::ChunkedState::*;
match *self {
Size => ChunkedState::read_size(body, size),
@ -499,10 +502,14 @@ impl ChunkedState {
BodyLf => ChunkedState::read_body_lf(body),
EndCr => ChunkedState::read_end_cr(body),
EndLf => ChunkedState::read_end_lf(body),
End => Ok(Async::Ready(ChunkedState::End)),
End => Poll::Ready(Ok(ChunkedState::End)),
}
}
fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll<ChunkedState, io::Error> {
fn read_size(
rdr: &mut BytesMut,
size: &mut u64,
) -> Poll<Result<ChunkedState, io::Error>> {
let radix = 16;
match byte!(rdr) {
b @ b'0'..=b'9' => {
@ -517,48 +524,49 @@ impl ChunkedState {
*size *= radix;
*size += u64::from(b + 10 - b'A');
}
b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)),
b';' => return Ok(Async::Ready(ChunkedState::Extension)),
b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)),
b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)),
b';' => return Poll::Ready(Ok(ChunkedState::Extension)),
b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)),
_ => {
return Err(io::Error::new(
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk size line: Invalid Size",
));
)));
}
}
Ok(Async::Ready(ChunkedState::Size))
Poll::Ready(Ok(ChunkedState::Size))
}
fn read_size_lws(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
fn read_size_lws(rdr: &mut BytesMut) -> Poll<Result<ChunkedState, io::Error>> {
trace!("read_size_lws");
match byte!(rdr) {
// LWS can follow the chunk size, but no more digits can come
b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)),
b';' => Ok(Async::Ready(ChunkedState::Extension)),
b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)),
_ => Err(io::Error::new(
b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)),
b';' => Poll::Ready(Ok(ChunkedState::Extension)),
b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
_ => Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk size linear white space",
)),
))),
}
}
fn read_extension(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
fn read_extension(rdr: &mut BytesMut) -> Poll<Result<ChunkedState, io::Error>> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)),
_ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions
b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
_ => Poll::Ready(Ok(ChunkedState::Extension)), // no supported extensions
}
}
fn read_size_lf(
rdr: &mut BytesMut,
size: &mut u64,
) -> Poll<ChunkedState, io::Error> {
) -> Poll<Result<ChunkedState, io::Error>> {
match byte!(rdr) {
b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)),
b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)),
_ => Err(io::Error::new(
b'\n' if *size > 0 => Poll::Ready(Ok(ChunkedState::Body)),
b'\n' if *size == 0 => Poll::Ready(Ok(ChunkedState::EndCr)),
_ => Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk size LF",
)),
))),
}
}
@ -566,12 +574,12 @@ impl ChunkedState {
rdr: &mut BytesMut,
rem: &mut u64,
buf: &mut Option<Bytes>,
) -> Poll<ChunkedState, io::Error> {
) -> Poll<Result<ChunkedState, io::Error>> {
trace!("Chunked read, remaining={:?}", rem);
let len = rdr.len() as u64;
if len == 0 {
Ok(Async::Ready(ChunkedState::Body))
Poll::Ready(Ok(ChunkedState::Body))
} else {
let slice;
if *rem > len {
@ -583,47 +591,47 @@ impl ChunkedState {
}
*buf = Some(slice);
if *rem > 0 {
Ok(Async::Ready(ChunkedState::Body))
Poll::Ready(Ok(ChunkedState::Body))
} else {
Ok(Async::Ready(ChunkedState::BodyCr))
Poll::Ready(Ok(ChunkedState::BodyCr))
}
}
}
fn read_body_cr(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
fn read_body_cr(rdr: &mut BytesMut) -> Poll<Result<ChunkedState, io::Error>> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)),
_ => Err(io::Error::new(
b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)),
_ => Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk body CR",
)),
))),
}
}
fn read_body_lf(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
fn read_body_lf(rdr: &mut BytesMut) -> Poll<Result<ChunkedState, io::Error>> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::Size)),
_ => Err(io::Error::new(
b'\n' => Poll::Ready(Ok(ChunkedState::Size)),
_ => Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk body LF",
)),
))),
}
}
fn read_end_cr(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
fn read_end_cr(rdr: &mut BytesMut) -> Poll<Result<ChunkedState, io::Error>> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::EndLf)),
_ => Err(io::Error::new(
b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)),
_ => Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk end CR",
)),
))),
}
}
fn read_end_lf(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
fn read_end_lf(rdr: &mut BytesMut) -> Poll<Result<ChunkedState, io::Error>> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::End)),
_ => Err(io::Error::new(
b'\n' => Poll::Ready(Ok(ChunkedState::End)),
_ => Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk end LF",
)),
))),
}
}
}

View File

@ -1,15 +1,17 @@
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use std::{fmt, io, net};
use std::{fmt, io, io::Write, net};
use actix_codec::{Decoder, Encoder, Framed, FramedParts};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use actix_server_config::IoStream;
use actix_service::Service;
use bitflags::bitflags;
use bytes::{BufMut, BytesMut};
use futures::{Async, Future, Poll};
use log::{error, trace};
use tokio_timer::Delay;
use tokio_timer::{delay, Delay};
use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::cloneable::CloneableService;
@ -46,11 +48,14 @@ pub struct Dispatcher<T, S, B, X, U>
where
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::Future: Unpin,
{
inner: DispatcherState<T, S, B, X, U>,
}
@ -59,11 +64,14 @@ enum DispatcherState<T, S, B, X, U>
where
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::Future: Unpin,
{
Normal(InnerDispatcher<T, S, B, X, U>),
Upgrade(U::Future),
@ -74,11 +82,14 @@ struct InnerDispatcher<T, S, B, X, U>
where
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::Future: Unpin,
{
service: CloneableService<S>,
expect: CloneableService<X>,
@ -170,11 +181,14 @@ where
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::Future: Unpin,
{
/// Create http/1 dispatcher.
pub(crate) fn new(
@ -255,20 +269,23 @@ where
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::Future: Unpin,
{
fn can_read(&self) -> bool {
fn can_read(&self, cx: &mut Context) -> bool {
if self
.flags
.intersects(Flags::READ_DISCONNECT | Flags::UPGRADE)
{
false
} else if let Some(ref info) = self.payload {
info.need_read() == PayloadStatus::Read
info.need_read(cx) == PayloadStatus::Read
} else {
true
}
@ -287,7 +304,7 @@ where
///
/// true - got whouldblock
/// false - didnt get whouldblock
fn poll_flush(&mut self) -> Result<bool, DispatchError> {
fn poll_flush(&mut self, cx: &mut Context) -> Result<bool, DispatchError> {
if self.write_buf.is_empty() {
return Ok(false);
}
@ -295,23 +312,23 @@ where
let len = self.write_buf.len();
let mut written = 0;
while written < len {
match self.io.write(&self.write_buf[written..]) {
Ok(0) => {
match Pin::new(&mut self.io).poll_write(cx, &self.write_buf[written..]) {
Poll::Ready(Ok(0)) => {
return Err(DispatchError::Io(io::Error::new(
io::ErrorKind::WriteZero,
"",
)));
}
Ok(n) => {
Poll::Ready(Ok(n)) => {
written += n;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
Poll::Pending => {
if written > 0 {
let _ = self.write_buf.split_to(written);
}
return Ok(true);
}
Err(err) => return Err(DispatchError::Io(err)),
Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)),
}
}
if written > 0 {
@ -350,12 +367,15 @@ where
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
}
fn poll_response(&mut self) -> Result<PollResponse, DispatchError> {
fn poll_response(
&mut self,
cx: &mut Context,
) -> Result<PollResponse, DispatchError> {
loop {
let state = match self.state {
State::None => match self.messages.pop_front() {
Some(DispatcherMessage::Item(req)) => {
Some(self.handle_request(req)?)
Some(self.handle_request(req, cx)?)
}
Some(DispatcherMessage::Error(res)) => {
Some(self.send_response(res, ResponseBody::Other(Body::Empty))?)
@ -365,54 +385,54 @@ where
}
None => None,
},
State::ExpectCall(ref mut fut) => match fut.poll() {
Ok(Async::Ready(req)) => {
State::ExpectCall(ref mut fut) => match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(req)) => {
self.send_continue();
self.state = State::ServiceCall(self.service.call(req));
continue;
}
Ok(Async::NotReady) => None,
Err(e) => {
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
Some(self.send_response(res, body.into_body())?)
}
Poll::Pending => None,
},
State::ServiceCall(ref mut fut) => match fut.poll() {
Ok(Async::Ready(res)) => {
State::ServiceCall(ref mut fut) => match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
self.state = self.send_response(res, body)?;
continue;
}
Ok(Async::NotReady) => None,
Err(e) => {
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
Some(self.send_response(res, body.into_body())?)
}
Poll::Pending => None,
},
State::SendPayload(ref mut stream) => {
loop {
if self.write_buf.len() < HW_BUFFER_SIZE {
match stream
.poll_next()
.map_err(|_| DispatchError::Unknown)?
{
Async::Ready(Some(item)) => {
match stream.poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
self.codec.encode(
Message::Chunk(Some(item)),
&mut self.write_buf,
)?;
continue;
}
Async::Ready(None) => {
Poll::Ready(None) => {
self.codec.encode(
Message::Chunk(None),
&mut self.write_buf,
)?;
self.state = State::None;
}
Async::NotReady => return Ok(PollResponse::DoNothing),
Poll::Ready(Some(Err(_))) => {
return Err(DispatchError::Unknown)
}
Poll::Pending => return Ok(PollResponse::DoNothing),
}
} else {
return Ok(PollResponse::DrainWriteBuf);
@ -433,7 +453,7 @@ where
// if read-backpressure is enabled and we consumed some data.
// we may read more data and retry
if self.state.is_call() {
if self.poll_request()? {
if self.poll_request(cx)? {
continue;
}
} else if !self.messages.is_empty() {
@ -446,17 +466,21 @@ where
Ok(PollResponse::DoNothing)
}
fn handle_request(&mut self, req: Request) -> Result<State<S, B, X>, DispatchError> {
fn handle_request(
&mut self,
req: Request,
cx: &mut Context,
) -> Result<State<S, B, X>, DispatchError> {
// Handle `EXPECT: 100-Continue` header
let req = if req.head().expect() {
let mut task = self.expect.call(req);
match task.poll() {
Ok(Async::Ready(req)) => {
match Pin::new(&mut task).poll(cx) {
Poll::Ready(Ok(req)) => {
self.send_continue();
req
}
Ok(Async::NotReady) => return Ok(State::ExpectCall(task)),
Err(e) => {
Poll::Pending => return Ok(State::ExpectCall(task)),
Poll::Ready(Err(e)) => {
let e = e.into();
let res: Response = e.into();
let (res, body) = res.replace_body(());
@ -469,13 +493,13 @@ where
// Call service
let mut task = self.service.call(req);
match task.poll() {
Ok(Async::Ready(res)) => {
match Pin::new(&mut task).poll(cx) {
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
self.send_response(res, body)
}
Ok(Async::NotReady) => Ok(State::ServiceCall(task)),
Err(e) => {
Poll::Pending => Ok(State::ServiceCall(task)),
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
self.send_response(res, body.into_body())
@ -484,9 +508,12 @@ where
}
/// Process one incoming requests
pub(self) fn poll_request(&mut self) -> Result<bool, DispatchError> {
pub(self) fn poll_request(
&mut self,
cx: &mut Context,
) -> Result<bool, DispatchError> {
// limit a mount of non processed requests
if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read() {
if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) {
return Ok(false);
}
@ -521,7 +548,7 @@ where
// handle request early
if self.state.is_empty() {
self.state = self.handle_request(req)?;
self.state = self.handle_request(req, cx)?;
} else {
self.messages.push_back(DispatcherMessage::Item(req));
}
@ -587,12 +614,12 @@ where
}
/// keep-alive timer
fn poll_keepalive(&mut self) -> Result<(), DispatchError> {
fn poll_keepalive(&mut self, cx: &mut Context) -> Result<(), DispatchError> {
if self.ka_timer.is_none() {
// shutdown timeout
if self.flags.contains(Flags::SHUTDOWN) {
if let Some(interval) = self.codec.config().client_disconnect_timer() {
self.ka_timer = Some(Delay::new(interval));
self.ka_timer = Some(delay(interval));
} else {
self.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = self.payload.take() {
@ -605,11 +632,8 @@ where
}
}
match self.ka_timer.as_mut().unwrap().poll().map_err(|e| {
error!("Timer error {:?}", e);
DispatchError::Unknown
})? {
Async::Ready(_) => {
match Pin::new(&mut self.ka_timer.as_mut().unwrap()).poll(cx) {
Poll::Ready(()) => {
// if we get timeout during shutdown, drop connection
if self.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout);
@ -624,9 +648,9 @@ where
if let Some(deadline) =
self.codec.config().client_disconnect_timer()
{
if let Some(timer) = self.ka_timer.as_mut() {
if let Some(mut timer) = self.ka_timer.as_mut() {
timer.reset(deadline);
let _ = timer.poll();
let _ = Pin::new(&mut timer).poll(cx);
}
} else {
// no shutdown timeout, drop socket
@ -650,17 +674,17 @@ where
} else if let Some(deadline) =
self.codec.config().keep_alive_expire()
{
if let Some(timer) = self.ka_timer.as_mut() {
if let Some(mut timer) = self.ka_timer.as_mut() {
timer.reset(deadline);
let _ = timer.poll();
let _ = Pin::new(&mut timer).poll(cx);
}
}
} else if let Some(timer) = self.ka_timer.as_mut() {
} else if let Some(mut timer) = self.ka_timer.as_mut() {
timer.reset(self.ka_expire);
let _ = timer.poll();
let _ = Pin::new(&mut timer).poll(cx);
}
}
Async::NotReady => (),
Poll::Pending => (),
}
Ok(())
@ -673,33 +697,37 @@ where
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::Future: Unpin,
{
type Item = ();
type Error = DispatchError;
type Output = Result<(), DispatchError>;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.inner {
DispatcherState::Normal(ref mut inner) => {
inner.poll_keepalive()?;
inner.poll_keepalive(cx)?;
if inner.flags.contains(Flags::SHUTDOWN) {
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
} else {
// flush buffer
inner.poll_flush()?;
inner.poll_flush(cx)?;
if !inner.write_buf.is_empty() {
Ok(Async::NotReady)
Poll::Pending
} else {
match inner.io.shutdown()? {
Async::Ready(_) => Ok(Async::Ready(())),
Async::NotReady => Ok(Async::NotReady),
match Pin::new(&mut inner.io).poll_shutdown(cx) {
Poll::Ready(res) => {
Poll::Ready(res.map_err(DispatchError::from))
}
Poll::Pending => Poll::Pending,
}
}
}
@ -707,12 +735,12 @@ where
// read socket into a buf
let should_disconnect =
if !inner.flags.contains(Flags::READ_DISCONNECT) {
read_available(&mut inner.io, &mut inner.read_buf)?
read_available(cx, &mut inner.io, &mut inner.read_buf)?
} else {
None
};
inner.poll_request()?;
inner.poll_request(cx)?;
if let Some(true) = should_disconnect {
inner.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = inner.payload.take() {
@ -724,7 +752,7 @@ where
if inner.write_buf.remaining_mut() < LW_BUFFER_SIZE {
inner.write_buf.reserve(HW_BUFFER_SIZE);
}
let result = inner.poll_response()?;
let result = inner.poll_response(cx)?;
let drain = result == PollResponse::DrainWriteBuf;
// switch to upgrade handler
@ -742,7 +770,7 @@ where
self.inner = DispatcherState::Upgrade(
inner.upgrade.unwrap().call((req, framed)),
);
return self.poll();
return self.poll(cx);
} else {
panic!()
}
@ -751,14 +779,14 @@ where
// we didnt get WouldBlock from write operation,
// so data get written to kernel completely (OSX)
// and we have to write again otherwise response can get stuck
if inner.poll_flush()? || !drain {
if inner.poll_flush(cx)? || !drain {
break;
}
}
// client is gone
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
let is_empty = inner.state.is_empty();
@ -771,38 +799,44 @@ where
// keep-alive and stream errors
if is_empty && inner.write_buf.is_empty() {
if let Some(err) = inner.error.take() {
Err(err)
Poll::Ready(Err(err))
}
// disconnect if keep-alive is not enabled
else if inner.flags.contains(Flags::STARTED)
&& !inner.flags.intersects(Flags::KEEPALIVE)
{
inner.flags.insert(Flags::SHUTDOWN);
self.poll()
self.poll(cx)
}
// disconnect if shutdown
else if inner.flags.contains(Flags::SHUTDOWN) {
self.poll()
self.poll(cx)
} else {
Ok(Async::NotReady)
Poll::Pending
}
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
DispatcherState::Upgrade(ref mut fut) => fut.poll().map_err(|e| {
error!("Upgrade handler error: {}", e);
DispatchError::Upgrade
}),
DispatcherState::Upgrade(ref mut fut) => {
Pin::new(fut).poll(cx).map_err(|e| {
error!("Upgrade handler error: {}", e);
DispatchError::Upgrade
})
}
DispatcherState::None => panic!(),
}
}
}
fn read_available<T>(io: &mut T, buf: &mut BytesMut) -> Result<Option<bool>, io::Error>
fn read_available<T>(
cx: &mut Context,
io: &mut T,
buf: &mut BytesMut,
) -> Result<Option<bool>, io::Error>
where
T: io::Read,
T: AsyncRead + Unpin,
{
let mut read_some = false;
loop {
@ -810,19 +844,18 @@ where
buf.reserve(HW_BUFFER_SIZE);
}
let read = unsafe { io.read(buf.bytes_mut()) };
match read {
Ok(n) => {
match read(cx, io, buf) {
Poll::Pending => {
return if read_some { Ok(Some(false)) } else { Ok(None) };
}
Poll::Ready(Ok(n)) => {
if n == 0 {
return Ok(Some(true));
} else {
read_some = true;
unsafe {
buf.advance_mut(n);
}
}
}
Err(e) => {
Poll::Ready(Err(e)) => {
return if e.kind() == io::ErrorKind::WouldBlock {
if read_some {
Ok(Some(false))
@ -833,12 +866,23 @@ where
Ok(Some(true))
} else {
Err(e)
};
}
}
}
}
}
fn read<T>(
cx: &mut Context,
io: &mut T,
buf: &mut BytesMut,
) -> Poll<Result<usize, io::Error>>
where
T: AsyncRead + Unpin,
{
Pin::new(io).poll_read_buf(cx, buf)
}
#[cfg(test)]
mod tests {
use actix_service::IntoService;

View File

@ -1,21 +1,24 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_server_config::ServerConfig;
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use actix_service::{Service, ServiceFactory};
use futures::future::{ok, Ready};
use crate::error::Error;
use crate::request::Request;
pub struct ExpectHandler;
impl NewService for ExpectHandler {
impl ServiceFactory for ExpectHandler {
type Config = ServerConfig;
type Request = Request;
type Response = Request;
type Error = Error;
type Service = ExpectHandler;
type InitError = Error;
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &ServerConfig) -> Self::Future {
ok(ExpectHandler)
@ -26,10 +29,10 @@ impl Service for ExpectHandler {
type Request = Request;
type Response = Request;
type Error = Error;
type Future = FutureResult<Self::Response, Self::Error>;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {

View File

@ -1,12 +1,14 @@
//! Payload stream
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
use actix_utils::task::LocalWaker;
use bytes::Bytes;
use futures::task::current as current_task;
use futures::task::Task;
use futures::{Async, Poll, Stream};
use futures::Stream;
use crate::error::PayloadError;
@ -77,15 +79,24 @@ impl Payload {
pub fn unread_data(&mut self, data: Bytes) {
self.inner.borrow_mut().unread_data(data);
}
#[inline]
pub fn readany(
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
self.inner.borrow_mut().readany(cx)
}
}
impl Stream for Payload {
type Item = Bytes;
type Error = PayloadError;
type Item = Result<Bytes, PayloadError>;
#[inline]
fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.inner.borrow_mut().readany()
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
self.inner.borrow_mut().readany(cx)
}
}
@ -117,19 +128,14 @@ impl PayloadSender {
}
#[inline]
pub fn need_read(&self) -> PayloadStatus {
pub fn need_read(&self, cx: &mut Context) -> PayloadStatus {
// we check need_read only if Payload (other side) is alive,
// otherwise always return true (consume payload)
if let Some(shared) = self.inner.upgrade() {
if shared.borrow().need_read {
PayloadStatus::Read
} else {
#[cfg(not(test))]
{
if shared.borrow_mut().io_task.is_none() {
shared.borrow_mut().io_task = Some(current_task());
}
}
shared.borrow_mut().io_task.register(cx.waker());
PayloadStatus::Pause
}
} else {
@ -145,8 +151,8 @@ struct Inner {
err: Option<PayloadError>,
need_read: bool,
items: VecDeque<Bytes>,
task: Option<Task>,
io_task: Option<Task>,
task: LocalWaker,
io_task: LocalWaker,
}
impl Inner {
@ -157,8 +163,8 @@ impl Inner {
err: None,
items: VecDeque::new(),
need_read: true,
task: None,
io_task: None,
task: LocalWaker::new(),
io_task: LocalWaker::new(),
}
}
@ -178,7 +184,7 @@ impl Inner {
self.items.push_back(data);
self.need_read = self.len < MAX_BUFFER_SIZE;
if let Some(task) = self.task.take() {
task.notify()
task.wake()
}
}
@ -187,34 +193,28 @@ impl Inner {
self.len
}
fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
fn readany(
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
self.need_read = self.len < MAX_BUFFER_SIZE;
if self.need_read && self.task.is_none() && !self.eof {
self.task = Some(current_task());
if self.need_read && !self.eof {
self.task.register(cx.waker());
}
if let Some(task) = self.io_task.take() {
task.notify()
}
Ok(Async::Ready(Some(data)))
self.io_task.wake();
Poll::Ready(Some(Ok(data)))
} else if let Some(err) = self.err.take() {
Err(err)
Poll::Ready(Some(Err(err)))
} else if self.eof {
Ok(Async::Ready(None))
Poll::Ready(None)
} else {
self.need_read = true;
#[cfg(not(test))]
{
if self.task.is_none() {
self.task = Some(current_task());
}
if let Some(task) = self.io_task.take() {
task.notify()
}
}
Ok(Async::NotReady)
self.task.register(cx.waker());
self.io_task.wake();
Poll::Pending
}
}

View File

@ -1,12 +1,15 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_codec::Framed;
use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig};
use actix_service::{IntoNewService, NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{try_ready, Async, Future, IntoFuture, Poll, Stream};
use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use futures::future::{ok, Ready};
use futures::{ready, Stream};
use crate::body::MessageBody;
use crate::cloneable::CloneableService;
@ -20,7 +23,7 @@ use super::codec::Codec;
use super::dispatcher::Dispatcher;
use super::{ExpectHandler, Message, UpgradeHandler};
/// `NewService` implementation for HTTP1 transport
/// `ServiceFactory` implementation for HTTP1 transport
pub struct H1Service<T, P, S, B, X = ExpectHandler, U = UpgradeHandler<T>> {
srv: S,
cfg: ServiceConfig,
@ -32,19 +35,23 @@ pub struct H1Service<T, P, S, B, X = ExpectHandler, U = UpgradeHandler<T>> {
impl<T, P, S, B> H1Service<T, P, S, B>
where
S: NewService<Config = SrvConfig, Request = Request>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin,
B: MessageBody,
P: Unpin,
{
/// Create new `HttpService` instance with default config.
pub fn new<F: IntoNewService<S>>(service: F) -> Self {
pub fn new<F: IntoServiceFactory<S>>(service: F) -> Self {
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0);
H1Service {
cfg,
srv: service.into_new_service(),
srv: service.into_factory(),
expect: ExpectHandler,
upgrade: None,
on_connect: None,
@ -53,10 +60,13 @@ where
}
/// Create new `HttpService` instance with config.
pub fn with_config<F: IntoNewService<S>>(cfg: ServiceConfig, service: F) -> Self {
pub fn with_config<F: IntoServiceFactory<S>>(
cfg: ServiceConfig,
service: F,
) -> Self {
H1Service {
cfg,
srv: service.into_new_service(),
srv: service.into_factory(),
expect: ExpectHandler,
upgrade: None,
on_connect: None,
@ -67,17 +77,24 @@ where
impl<T, P, S, B, X, U> H1Service<T, P, S, B, X, U>
where
S: NewService<Config = SrvConfig, Request = Request>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin,
B: MessageBody,
P: Unpin,
{
pub fn expect<X1>(self, expect: X1) -> H1Service<T, P, S, B, X1, U>
where
X1: NewService<Request = Request, Response = Request>,
X1: ServiceFactory<Request = Request, Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
X1::Future: Unpin,
X1::Service: Unpin,
<X1::Service as Service>::Future: Unpin,
{
H1Service {
expect,
@ -91,9 +108,12 @@ where
pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, P, S, B, X, U1>
where
U1: NewService<Request = (Request, Framed<T, Codec>), Response = ()>,
U1: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
U1::Future: Unpin,
U1::Service: Unpin,
<U1::Service as Service>::Future: Unpin,
{
H1Service {
upgrade,
@ -115,24 +135,35 @@ where
}
}
impl<T, P, S, B, X, U> NewService for H1Service<T, P, S, B, X, U>
impl<T, P, S, B, X, U> ServiceFactory for H1Service<T, P, S, B, X, U>
where
T: IoStream,
S: NewService<Config = SrvConfig, Request = Request>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Service: Unpin,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin,
B: MessageBody,
X: NewService<Config = SrvConfig, Request = Request, Response = Request>,
X: ServiceFactory<Config = SrvConfig, Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: NewService<
X::Future: Unpin,
X::Service: Unpin,
<X::Service as Service>::Future: Unpin,
U: ServiceFactory<
Config = SrvConfig,
Request = (Request, Framed<T, Codec>),
Response = (),
>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
U::Future: Unpin,
U::Service: Unpin,
<U::Service as Service>::Future: Unpin,
P: Unpin,
{
type Config = SrvConfig;
type Request = Io<T, P>;
@ -144,7 +175,7 @@ where
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
H1ServiceResponse {
fut: self.srv.new_service(cfg).into_future(),
fut: self.srv.new_service(cfg),
fut_ex: Some(self.expect.new_service(cfg)),
fut_upg: self.upgrade.as_ref().map(|f| f.new_service(cfg)),
expect: None,
@ -159,15 +190,25 @@ where
#[doc(hidden)]
pub struct H1ServiceResponse<T, P, S, B, X, U>
where
S: NewService<Request = Request>,
S: ServiceFactory<Request = Request>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
X: NewService<Request = Request, Response = Request>,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin,
X: ServiceFactory<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: NewService<Request = (Request, Framed<T, Codec>), Response = ()>,
X::Future: Unpin,
X::Service: Unpin,
<X::Service as Service>::Future: Unpin,
U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
U::Future: Unpin,
U::Service: Unpin,
<U::Service as Service>::Future: Unpin,
P: Unpin,
{
fut: S::Future,
fut_ex: Option<X::Future>,
@ -182,49 +223,63 @@ where
impl<T, P, S, B, X, U> Future for H1ServiceResponse<T, P, S, B, X, U>
where
T: IoStream,
S: NewService<Request = Request>,
S: ServiceFactory<Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin,
B: MessageBody,
X: NewService<Request = Request, Response = Request>,
X: ServiceFactory<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: NewService<Request = (Request, Framed<T, Codec>), Response = ()>,
X::Future: Unpin,
X::Service: Unpin,
<X::Service as Service>::Future: Unpin,
U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
U::Future: Unpin,
U::Service: Unpin,
<U::Service as Service>::Future: Unpin,
P: Unpin,
{
type Item = H1ServiceHandler<T, P, S::Service, B, X::Service, U::Service>;
type Error = ();
type Output =
Result<H1ServiceHandler<T, P, S::Service, B, X::Service, U::Service>, ()>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_ex {
let expect = try_ready!(fut
.poll()
.map_err(|e| log::error!("Init http service error: {:?}", e)));
self.expect = Some(expect);
self.fut_ex.take();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
if let Some(ref mut fut) = this.fut_ex {
let expect = ready!(Pin::new(fut)
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this.expect = Some(expect);
this.fut_ex.take();
}
if let Some(ref mut fut) = self.fut_upg {
let upgrade = try_ready!(fut
.poll()
.map_err(|e| log::error!("Init http service error: {:?}", e)));
self.upgrade = Some(upgrade);
self.fut_ex.take();
if let Some(ref mut fut) = this.fut_upg {
let upgrade = ready!(Pin::new(fut)
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this.upgrade = Some(upgrade);
this.fut_ex.take();
}
let service = try_ready!(self
.fut
.poll()
let result = ready!(Pin::new(&mut this.fut)
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Ok(Async::Ready(H1ServiceHandler::new(
self.cfg.take().unwrap(),
service,
self.expect.take().unwrap(),
self.upgrade.take(),
self.on_connect.clone(),
)))
Poll::Ready(result.map(|service| {
H1ServiceHandler::new(
this.cfg.take().unwrap(),
service,
this.expect.take().unwrap(),
this.upgrade.take(),
this.on_connect.clone(),
)
}))
}
}
@ -240,14 +295,18 @@ pub struct H1ServiceHandler<T, P, S, B, X, U> {
impl<T, P, S, B, X, U> H1ServiceHandler<T, P, S, B, X, U>
where
S: Service<Request = Request>,
S: Service<Request = Request> + Unpin,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request = Request, Response = Request> + Unpin,
X::Future: Unpin,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()> + Unpin,
U::Future: Unpin,
U::Error: fmt::Display,
P: Unpin,
{
fn new(
cfg: ServiceConfig,
@ -270,24 +329,28 @@ where
impl<T, P, S, B, X, U> Service for H1ServiceHandler<T, P, S, B, X, U>
where
T: IoStream,
S: Service<Request = Request>,
S: Service<Request = Request> + Unpin,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::Future: Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request = Request, Response = Request> + Unpin,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()> + Unpin,
U::Error: fmt::Display,
U::Future: Unpin,
P: Unpin,
{
type Request = Io<T, P>;
type Response = ();
type Error = DispatchError;
type Future = Dispatcher<T, S, B, X, U>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let ready = self
.expect
.poll_ready()
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
@ -297,7 +360,7 @@ where
let ready = self
.srv
.poll_ready()
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
@ -307,9 +370,9 @@ where
&& ready;
if ready {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
@ -333,7 +396,7 @@ where
}
}
/// `NewService` implementation for `OneRequestService` service
/// `ServiceFactory` implementation for `OneRequestService` service
#[derive(Default)]
pub struct OneRequest<T, P> {
config: ServiceConfig,
@ -353,7 +416,7 @@ where
}
}
impl<T, P> NewService for OneRequest<T, P>
impl<T, P> ServiceFactory for OneRequest<T, P>
where
T: IoStream,
{
@ -363,7 +426,7 @@ where
type Error = ParseError;
type InitError = ();
type Service = OneRequestService<T, P>;
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &SrvConfig) -> Self::Future {
ok(OneRequestService {
@ -389,8 +452,8 @@ where
type Error = ParseError;
type Future = OneRequestServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
@ -415,19 +478,19 @@ impl<T> Future for OneRequestServiceResponse<T>
where
T: IoStream,
{
type Item = (Request, Framed<T, Codec>);
type Error = ParseError;
type Output = Result<(Request, Framed<T, Codec>), ParseError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.framed.as_mut().unwrap().poll()? {
Async::Ready(Some(req)) => match req {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.framed.as_mut().unwrap().next_item(cx) {
Poll::Ready(Some(Ok(req))) => match req {
Message::Item(req) => {
Ok(Async::Ready((req, self.framed.take().unwrap())))
Poll::Ready(Ok((req, self.framed.take().unwrap())))
}
Message::Chunk(_) => unreachable!("Something is wrong"),
},
Async::Ready(None) => Err(ParseError::Incomplete),
Async::NotReady => Ok(Async::NotReady),
Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)),
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -1,10 +1,12 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::Framed;
use actix_server_config::ServerConfig;
use actix_service::{NewService, Service};
use futures::future::FutureResult;
use futures::{Async, Poll};
use actix_service::{Service, ServiceFactory};
use futures::future::Ready;
use crate::error::Error;
use crate::h1::Codec;
@ -12,14 +14,14 @@ use crate::request::Request;
pub struct UpgradeHandler<T>(PhantomData<T>);
impl<T> NewService for UpgradeHandler<T> {
impl<T> ServiceFactory for UpgradeHandler<T> {
type Config = ServerConfig;
type Request = (Request, Framed<T, Codec>);
type Response = ();
type Error = Error;
type Service = UpgradeHandler<T>;
type InitError = Error;
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &ServerConfig) -> Self::Future {
unimplemented!()
@ -30,10 +32,10 @@ impl<T> Service for UpgradeHandler<T> {
type Request = (Request, Framed<T, Codec>);
type Response = ();
type Error = Error;
type Future = FutureResult<Self::Response, Self::Error>;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: Self::Request) -> Self::Future {

View File

@ -1,5 +1,9 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use futures::{Async, Future, Poll, Sink};
use futures::Sink;
use crate::body::{BodySize, MessageBody, ResponseBody};
use crate::error::Error;
@ -30,63 +34,64 @@ where
impl<T, B> Future for SendResponse<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Unpin,
B: MessageBody,
{
type Item = Framed<T, Codec>;
type Error = Error;
type Output = Result<Framed<T, Codec>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let mut body_ready = self.body.is_some();
let framed = self.framed.as_mut().unwrap();
let mut body_ready = this.body.is_some();
let framed = this.framed.as_mut().unwrap();
// send body
if self.res.is_none() && self.body.is_some() {
while body_ready && self.body.is_some() && !framed.is_write_buf_full() {
match self.body.as_mut().unwrap().poll_next()? {
Async::Ready(item) => {
if this.res.is_none() && this.body.is_some() {
while body_ready && this.body.is_some() && !framed.is_write_buf_full() {
match this.body.as_mut().unwrap().poll_next(cx)? {
Poll::Ready(item) => {
// body is done
if item.is_none() {
let _ = self.body.take();
let _ = this.body.take();
}
framed.force_send(Message::Chunk(item))?;
}
Async::NotReady => body_ready = false,
Poll::Pending => body_ready = false,
}
}
}
// flush write buffer
if !framed.is_write_buf_empty() {
match framed.poll_complete()? {
Async::Ready(_) => {
match framed.flush(cx)? {
Poll::Ready(_) => {
if body_ready {
continue;
} else {
return Ok(Async::NotReady);
return Poll::Pending;
}
}
Async::NotReady => return Ok(Async::NotReady),
Poll::Pending => return Poll::Pending,
}
}
// send response
if let Some(res) = self.res.take() {
if let Some(res) = this.res.take() {
framed.force_send(res)?;
continue;
}
if self.body.is_some() {
if this.body.is_some() {
if body_ready {
continue;
} else {
return Ok(Async::NotReady);
return Poll::Pending;
}
} else {
break;
}
}
Ok(Async::Ready(self.framed.take().unwrap()))
Poll::Ready(Ok(this.framed.take().unwrap()))
}
}

View File

@ -1,5 +1,8 @@
use std::collections::VecDeque;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use std::{fmt, mem, net};
@ -8,7 +11,7 @@ use actix_server_config::IoStream;
use actix_service::Service;
use bitflags::bitflags;
use bytes::{Bytes, BytesMut};
use futures::{try_ready, Async, Future, Poll, Sink, Stream};
use futures::{ready, Sink, Stream};
use h2::server::{Connection, SendResponse};
use h2::{RecvStream, SendStream};
use http::header::{
@ -43,13 +46,24 @@ pub struct Dispatcher<T: IoStream, S: Service<Request = Request>, B: MessageBody
_t: PhantomData<B>,
}
impl<T, S, B> Unpin for Dispatcher<T, S, B>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
{
}
impl<T, S, B> Dispatcher<T, S, B>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
{
pub(crate) fn new(
@ -93,61 +107,75 @@ impl<T, S, B> Future for Dispatcher<T, S, B>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
{
type Item = ();
type Error = DispatchError;
type Output = Result<(), DispatchError>;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match self.connection.poll()? {
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some((req, res))) => {
match Pin::new(&mut this.connection).poll_accept(cx) {
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
Poll::Ready(Some(Ok((req, res)))) => {
// update keep-alive expire
if self.ka_timer.is_some() {
if let Some(expire) = self.config.keep_alive_expire() {
self.ka_expire = expire;
if this.ka_timer.is_some() {
if let Some(expire) = this.config.keep_alive_expire() {
this.ka_expire = expire;
}
}
let (parts, body) = req.into_parts();
let mut req = Request::with_payload(body.into());
// let b: () = body;
let mut req = Request::with_payload(Payload::<
crate::payload::PayloadStream,
>::H2(
crate::h2::Payload::new(body)
));
let head = &mut req.head_mut();
head.uri = parts.uri;
head.method = parts.method;
head.version = parts.version;
head.headers = parts.headers.into();
head.peer_addr = self.peer_addr;
head.peer_addr = this.peer_addr;
// set on_connect data
if let Some(ref on_connect) = self.on_connect {
if let Some(ref on_connect) = this.on_connect {
on_connect.set(&mut req.extensions_mut());
}
tokio_current_thread::spawn(ServiceResponse::<S::Future, B> {
state: ServiceResponseState::ServiceCall(
self.service.call(req),
Some(res),
),
config: self.config.clone(),
buffer: None,
})
// tokio_executor::current_thread::spawn(ServiceResponse::<
// S::Future,
// S::Response,
// S::Error,
// B,
// > {
// state: ServiceResponseState::ServiceCall(
// this.service.call(req),
// Some(res),
// ),
// config: this.config.clone(),
// buffer: None,
// _t: PhantomData,
// });
}
Async::NotReady => return Ok(Async::NotReady),
Poll::Pending => return Poll::Pending,
}
}
}
}
struct ServiceResponse<F, B> {
struct ServiceResponse<F, I, E, B> {
state: ServiceResponseState<F, B>,
config: ServiceConfig,
buffer: Option<Bytes>,
_t: PhantomData<(I, E)>,
}
enum ServiceResponseState<F, B> {
@ -155,11 +183,11 @@ enum ServiceResponseState<F, B> {
SendPayload(SendStream<Bytes>, ResponseBody<B>),
}
impl<F, B> ServiceResponse<F, B>
impl<F, I, E, B> ServiceResponse<F, I, E, B>
where
F: Future,
F::Error: Into<Error>,
F::Item: Into<Response<B>>,
F: Future<Output = Result<I, E>> + Unpin,
E: Into<Error> + Unpin + 'static,
I: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
{
fn prepare_response(
@ -223,109 +251,116 @@ where
}
}
impl<F, B> Future for ServiceResponse<F, B>
impl<F, I, E, B> Future for ServiceResponse<F, I, E, B>
where
F: Future,
F::Error: Into<Error>,
F::Item: Into<Response<B>>,
F: Future<Output = Result<I, E>> + Unpin,
E: Into<Error> + Unpin + 'static,
I: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
{
type Item = ();
type Error = ();
type Output = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.state {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
match this.state {
ServiceResponseState::ServiceCall(ref mut call, ref mut send) => {
match call.poll() {
Ok(Async::Ready(res)) => {
match Pin::new(call).poll(cx) {
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
let mut send = send.take().unwrap();
let mut size = body.size();
let h2_res = self.prepare_response(res.head(), &mut size);
let h2_res = this.prepare_response(res.head(), &mut size);
let stream =
send.send_response(h2_res, size.is_eof()).map_err(|e| {
let stream = match send.send_response(h2_res, size.is_eof()) {
Err(e) => {
trace!("Error sending h2 response: {:?}", e);
})?;
return Poll::Ready(());
}
Ok(stream) => stream,
};
if size.is_eof() {
Ok(Async::Ready(()))
Poll::Ready(())
} else {
self.state = ServiceResponseState::SendPayload(stream, body);
self.poll()
this.state = ServiceResponseState::SendPayload(stream, body);
Pin::new(this).poll(cx)
}
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
let mut send = send.take().unwrap();
let mut size = body.size();
let h2_res = self.prepare_response(res.head(), &mut size);
let h2_res = this.prepare_response(res.head(), &mut size);
let stream =
send.send_response(h2_res, size.is_eof()).map_err(|e| {
let stream = match send.send_response(h2_res, size.is_eof()) {
Err(e) => {
trace!("Error sending h2 response: {:?}", e);
})?;
return Poll::Ready(());
}
Ok(stream) => stream,
};
if size.is_eof() {
Ok(Async::Ready(()))
Poll::Ready(())
} else {
self.state = ServiceResponseState::SendPayload(
this.state = ServiceResponseState::SendPayload(
stream,
body.into_body(),
);
self.poll()
Pin::new(this).poll(cx)
}
}
}
}
ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop {
loop {
if let Some(ref mut buffer) = self.buffer {
match stream.poll_capacity().map_err(|e| warn!("{:?}", e))? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(cap)) => {
if let Some(ref mut buffer) = this.buffer {
match stream.poll_capacity(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(Ok(cap))) => {
let len = buffer.len();
let bytes = buffer.split_to(std::cmp::min(cap, len));
if let Err(e) = stream.send_data(bytes, false) {
warn!("{:?}", e);
return Err(());
return Poll::Ready(());
} else if !buffer.is_empty() {
let cap = std::cmp::min(buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
self.buffer.take();
this.buffer.take();
}
}
Poll::Ready(Some(Err(e))) => {
warn!("{:?}", e);
return Poll::Ready(());
}
}
} else {
match body.poll_next() {
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) => {
match body.poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
if let Err(e) = stream.send_data(Bytes::new(), true) {
warn!("{:?}", e);
return Err(());
} else {
return Ok(Async::Ready(()));
}
return Poll::Ready(());
}
Ok(Async::Ready(Some(chunk))) => {
Poll::Ready(Some(Ok(chunk))) => {
stream.reserve_capacity(std::cmp::min(
chunk.len(),
CHUNK_SIZE,
));
self.buffer = Some(chunk);
this.buffer = Some(chunk);
}
Err(e) => {
Poll::Ready(Some(Err(e))) => {
error!("Response payload stream error: {:?}", e);
return Err(());
return Poll::Ready(());
}
}
}

View File

@ -1,9 +1,11 @@
#![allow(dead_code, unused_imports)]
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::{Async, Poll, Stream};
use futures::Stream;
use h2::RecvStream;
mod dispatcher;
@ -25,22 +27,23 @@ impl Payload {
}
impl Stream for Payload {
type Item = Bytes;
type Error = PayloadError;
type Item = Result<Bytes, PayloadError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.pl.poll() {
Ok(Async::Ready(Some(chunk))) => {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.pl).poll_data(cx) {
Poll::Ready(Some(Ok(chunk))) => {
let len = chunk.len();
if let Err(err) = self.pl.release_capacity().release_capacity(len) {
Err(err.into())
if let Err(err) = this.pl.release_capacity().release_capacity(len) {
Poll::Ready(Some(Err(err.into())))
} else {
Ok(Async::Ready(Some(chunk)))
Poll::Ready(Some(Ok(chunk)))
}
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
}
}
}

View File

@ -1,13 +1,16 @@
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{io, net, rc};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig};
use actix_service::{IntoNewService, NewService, Service};
use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use bytes::Bytes;
use futures::future::{ok, FutureResult};
use futures::{try_ready, Async, Future, IntoFuture, Poll, Stream};
use futures::future::{ok, Ready};
use futures::{ready, Stream};
use h2::server::{self, Connection, Handshake};
use h2::RecvStream;
use log::error;
@ -23,7 +26,7 @@ use crate::response::Response;
use super::dispatcher::Dispatcher;
/// `NewService` implementation for HTTP2 transport
/// `ServiceFactory` implementation for HTTP2 transport
pub struct H2Service<T, P, S, B> {
srv: S,
cfg: ServiceConfig,
@ -33,30 +36,35 @@ pub struct H2Service<T, P, S, B> {
impl<T, P, S, B> H2Service<T, P, S, B>
where
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody + 'static,
P: Unpin,
{
/// Create new `HttpService` instance.
pub fn new<F: IntoNewService<S>>(service: F) -> Self {
pub fn new<F: IntoServiceFactory<S>>(service: F) -> Self {
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0);
H2Service {
cfg,
on_connect: None,
srv: service.into_new_service(),
srv: service.into_factory(),
_t: PhantomData,
}
}
/// Create new `HttpService` instance with config.
pub fn with_config<F: IntoNewService<S>>(cfg: ServiceConfig, service: F) -> Self {
pub fn with_config<F: IntoServiceFactory<S>>(
cfg: ServiceConfig,
service: F,
) -> Self {
H2Service {
cfg,
on_connect: None,
srv: service.into_new_service(),
srv: service.into_factory(),
_t: PhantomData,
}
}
@ -71,14 +79,16 @@ where
}
}
impl<T, P, S, B> NewService for H2Service<T, P, S, B>
impl<T, P, S, B> ServiceFactory for H2Service<T, P, S, B>
where
T: IoStream,
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody + 'static,
P: Unpin,
{
type Config = SrvConfig;
type Request = Io<T, P>;
@ -90,7 +100,7 @@ where
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
H2ServiceResponse {
fut: self.srv.new_service(cfg).into_future(),
fut: self.srv.new_service(cfg),
cfg: Some(self.cfg.clone()),
on_connect: self.on_connect.clone(),
_t: PhantomData,
@ -99,8 +109,8 @@ where
}
#[doc(hidden)]
pub struct H2ServiceResponse<T, P, S: NewService, B> {
fut: <S::Future as IntoFuture>::Future,
pub struct H2ServiceResponse<T, P, S: ServiceFactory, B> {
fut: S::Future,
cfg: Option<ServiceConfig>,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, P, B)>,
@ -109,22 +119,26 @@ pub struct H2ServiceResponse<T, P, S: NewService, B> {
impl<T, P, S, B> Future for H2ServiceResponse<T, P, S, B>
where
T: IoStream,
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody + 'static,
P: Unpin,
{
type Item = H2ServiceHandler<T, P, S::Service, B>;
type Error = S::InitError;
type Output = Result<H2ServiceHandler<T, P, S::Service, B>, S::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let service = try_ready!(self.fut.poll());
Ok(Async::Ready(H2ServiceHandler::new(
self.cfg.take().unwrap(),
self.on_connect.clone(),
service,
)))
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
Poll::Ready(ready!(Pin::new(&mut this.fut).poll(cx)).map(|service| {
H2ServiceHandler::new(
this.cfg.take().unwrap(),
this.on_connect.clone(),
service,
)
}))
}
}
@ -139,10 +153,11 @@ pub struct H2ServiceHandler<T, P, S, B> {
impl<T, P, S, B> H2ServiceHandler<T, P, S, B>
where
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
P: Unpin,
{
fn new(
cfg: ServiceConfig,
@ -162,18 +177,19 @@ impl<T, P, S, B> Service for H2ServiceHandler<T, P, S, B>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
P: Unpin,
{
type Request = Io<T, P>;
type Response = ();
type Error = DispatchError;
type Future = H2ServiceHandlerResponse<T, S, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.srv.poll_ready().map_err(|e| {
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.srv.poll_ready(cx).map_err(|e| {
let e = e.into();
error!("Service readiness error: {:?}", e);
DispatchError::Service(e)
@ -219,9 +235,9 @@ pub struct H2ServiceHandlerResponse<T, S, B>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
{
state: State<T, S, B>,
@ -231,25 +247,24 @@ impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody,
{
type Item = ();
type Error = DispatchError;
type Output = Result<(), DispatchError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.state {
State::Incoming(ref mut disp) => disp.poll(),
State::Incoming(ref mut disp) => Pin::new(disp).poll(cx),
State::Handshake(
ref mut srv,
ref mut config,
ref peer_addr,
ref mut on_connect,
ref mut handshake,
) => match handshake.poll() {
Ok(Async::Ready(conn)) => {
) => match Pin::new(handshake).poll(cx) {
Poll::Ready(Ok(conn)) => {
self.state = State::Incoming(Dispatcher::new(
srv.take().unwrap(),
conn,
@ -258,13 +273,13 @@ where
None,
*peer_addr,
));
self.poll()
self.poll(cx)
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
Poll::Ready(Err(err)) => {
trace!("H2 handshake error: {}", err);
Err(err.into())
Poll::Ready(Err(err.into()))
}
Poll::Pending => Poll::Pending,
},
}
}

View File

@ -4,7 +4,8 @@
clippy::too_many_arguments,
clippy::new_without_default,
clippy::borrow_interior_mutable_const,
clippy::write_with_newline
clippy::write_with_newline,
unused_imports
)]
#[macro_use]
@ -12,7 +13,7 @@ extern crate log;
pub mod body;
mod builder;
pub mod client;
// pub mod client;
mod cloneable;
mod config;
pub mod encoding;
@ -31,8 +32,8 @@ pub mod cookie;
pub mod error;
pub mod h1;
pub mod h2;
pub mod test;
pub mod ws;
// pub mod test;
// pub mod ws;
pub use self::builder::HttpServiceBuilder;
pub use self::config::{KeepAlive, ServiceConfig};

View File

@ -1,11 +1,15 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::{Async, Poll, Stream};
use futures::Stream;
use h2::RecvStream;
use crate::error::PayloadError;
/// Type represent boxed payload
pub type PayloadStream = Box<dyn Stream<Item = Bytes, Error = PayloadError>>;
pub type PayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>;
/// Type represent streaming payload
pub enum Payload<S = PayloadStream> {
@ -48,18 +52,17 @@ impl<S> Payload<S> {
impl<S> Stream for Payload<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
{
type Item = Bytes;
type Error = PayloadError;
type Item = Result<Bytes, PayloadError>;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
Payload::None => Ok(Async::Ready(None)),
Payload::H1(ref mut pl) => pl.poll(),
Payload::H2(ref mut pl) => pl.poll(),
Payload::Stream(ref mut pl) => pl.poll(),
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.get_mut() {
Payload::None => Poll::Ready(None),
Payload::H1(ref mut pl) => pl.readany(cx),
Payload::H2(ref mut pl) => Pin::new(pl).poll_next(cx),
Payload::Stream(ref mut pl) => Pin::new(pl).poll_next(cx),
}
}
}

View File

@ -4,7 +4,7 @@ use std::io::Write;
use std::{fmt, str};
use bytes::{BufMut, Bytes, BytesMut};
use futures::future::{ok, FutureResult, IntoFuture};
use futures::future::{ok, Ready};
use futures::Stream;
use serde::Serialize;
use serde_json;
@ -280,15 +280,15 @@ impl<B: MessageBody> fmt::Debug for Response<B> {
}
}
impl IntoFuture for Response {
type Item = Response;
type Error = Error;
type Future = FutureResult<Response, Error>;
// impl IntoFuture for Response {
// type Item = Response;
// type Error = Error;
// type Future = FutureResult<Response, Error>;
fn into_future(self) -> Self::Future {
ok(self)
}
}
// fn into_future(self) -> Self::Future {
// ok(self)
// }
// }
pub struct CookieIter<'a> {
iter: header::GetAll<'a>,
@ -635,8 +635,8 @@ impl ResponseBuilder {
/// `ResponseBuilder` can not be used after this call.
pub fn streaming<S, E>(&mut self, stream: S) -> Response
where
S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static,
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + Unpin + 'static,
{
self.body(Body::from_message(BodyStream::new(stream)))
}
@ -757,15 +757,15 @@ impl<'a> From<&'a ResponseHead> for ResponseBuilder {
}
}
impl IntoFuture for ResponseBuilder {
type Item = Response;
type Error = Error;
type Future = FutureResult<Response, Error>;
// impl IntoFuture for ResponseBuilder {
// type Item = Response;
// type Error = Error;
// type Future = FutureResult<Response, Error>;
fn into_future(mut self) -> Self::Future {
ok(self.finish())
}
}
// fn into_future(mut self) -> Self::Future {
// ok(self.finish())
// }
// }
impl fmt::Debug for ResponseBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {

View File

@ -1,13 +1,15 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io, net, rc};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_server_config::{
Io as ServerIo, IoStream, Protocol, ServerConfig as SrvConfig,
};
use actix_service::{IntoNewService, NewService, Service};
use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{try_ready, Async, Future, IntoFuture, Poll};
use futures::{ready, Future};
use h2::server::{self, Handshake};
use crate::body::MessageBody;
@ -20,7 +22,7 @@ use crate::request::Request;
use crate::response::Response;
use crate::{h1, h2::Dispatcher};
/// `NewService` HTTP1.1/HTTP2 transport implementation
/// `ServiceFactory` HTTP1.1/HTTP2 transport implementation
pub struct HttpService<T, P, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler<T>> {
srv: S,
cfg: ServiceConfig,
@ -32,11 +34,13 @@ pub struct HttpService<T, P, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler
impl<T, S, B> HttpService<T, (), S, B>
where
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody + 'static,
{
/// Create builder for `HttpService` instance.
@ -47,20 +51,23 @@ where
impl<T, P, S, B> HttpService<T, P, S, B>
where
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody + 'static,
P: Unpin,
{
/// Create new `HttpService` instance.
pub fn new<F: IntoNewService<S>>(service: F) -> Self {
pub fn new<F: IntoServiceFactory<S>>(service: F) -> Self {
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0);
HttpService {
cfg,
srv: service.into_new_service(),
srv: service.into_factory(),
expect: h1::ExpectHandler,
upgrade: None,
on_connect: None,
@ -69,13 +76,13 @@ where
}
/// Create new `HttpService` instance with config.
pub(crate) fn with_config<F: IntoNewService<S>>(
pub(crate) fn with_config<F: IntoServiceFactory<S>>(
cfg: ServiceConfig,
service: F,
) -> Self {
HttpService {
cfg,
srv: service.into_new_service(),
srv: service.into_factory(),
expect: h1::ExpectHandler,
upgrade: None,
on_connect: None,
@ -86,11 +93,15 @@ where
impl<T, P, S, B, X, U> HttpService<T, P, S, B, X, U>
where
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody,
P: Unpin,
{
/// Provide service for `EXPECT: 100-Continue` support.
///
@ -99,9 +110,12 @@ where
/// request will be forwarded to main service.
pub fn expect<X1>(self, expect: X1) -> HttpService<T, P, S, B, X1, U>
where
X1: NewService<Config = SrvConfig, Request = Request, Response = Request>,
X1: ServiceFactory<Config = SrvConfig, Request = Request, Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
X1::Future: Unpin,
X1::Service: Unpin,
<X1::Service as Service>::Future: Unpin + 'static,
{
HttpService {
expect,
@ -119,13 +133,16 @@ where
/// and this service get called with original request and framed object.
pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, P, S, B, X, U1>
where
U1: NewService<
U1: ServiceFactory<
Config = SrvConfig,
Request = (Request, Framed<T, h1::Codec>),
Response = (),
>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
U1::Future: Unpin,
U1::Service: Unpin,
<U1::Service as Service>::Future: Unpin + 'static,
{
HttpService {
upgrade,
@ -147,25 +164,35 @@ where
}
}
impl<T, P, S, B, X, U> NewService for HttpService<T, P, S, B, X, U>
impl<T, P, S, B, X, U> ServiceFactory for HttpService<T, P, S, B, X, U>
where
T: IoStream,
S: NewService<Config = SrvConfig, Request = Request>,
S::Error: Into<Error>,
T: IoStream + Unpin,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S::Service: Unpin,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody + 'static,
X: NewService<Config = SrvConfig, Request = Request, Response = Request>,
X: ServiceFactory<Config = SrvConfig, Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: NewService<
X::Future: Unpin,
X::Service: Unpin,
<X::Service as Service>::Future: Unpin + 'static,
U: ServiceFactory<
Config = SrvConfig,
Request = (Request, Framed<T, h1::Codec>),
Response = (),
>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
U::Future: Unpin,
U::Service: Unpin,
<U::Service as Service>::Future: Unpin + 'static,
P: Unpin,
{
type Config = SrvConfig;
type Request = ServerIo<T, P>;
@ -177,7 +204,7 @@ where
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
HttpServiceResponse {
fut: self.srv.new_service(cfg).into_future(),
fut: self.srv.new_service(cfg),
fut_ex: Some(self.expect.new_service(cfg)),
fut_upg: self.upgrade.as_ref().map(|f| f.new_service(cfg)),
expect: None,
@ -190,7 +217,14 @@ where
}
#[doc(hidden)]
pub struct HttpServiceResponse<T, P, S: NewService, B, X: NewService, U: NewService> {
pub struct HttpServiceResponse<
T,
P,
S: ServiceFactory,
B,
X: ServiceFactory,
U: ServiceFactory,
> {
fut: S::Future,
fut_ex: Option<X::Future>,
fut_upg: Option<U::Future>,
@ -204,50 +238,62 @@ pub struct HttpServiceResponse<T, P, S: NewService, B, X: NewService, U: NewServ
impl<T, P, S, B, X, U> Future for HttpServiceResponse<T, P, S, B, X, U>
where
T: IoStream,
S: NewService<Request = Request>,
S::Error: Into<Error>,
S: ServiceFactory<Request = Request>,
S::Error: Into<Error> + Unpin + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
S::Service: Unpin,
<S::Service as Service>::Future: Unpin + 'static,
B: MessageBody + 'static,
X: NewService<Request = Request, Response = Request>,
X: ServiceFactory<Request = Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: NewService<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
X::Future: Unpin,
X::Service: Unpin,
<X::Service as Service>::Future: Unpin + 'static,
U: ServiceFactory<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
U::Future: Unpin,
U::Service: Unpin,
<U::Service as Service>::Future: Unpin + 'static,
P: Unpin,
{
type Item = HttpServiceHandler<T, P, S::Service, B, X::Service, U::Service>;
type Error = ();
type Output =
Result<HttpServiceHandler<T, P, S::Service, B, X::Service, U::Service>, ()>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_ex {
let expect = try_ready!(fut
.poll()
.map_err(|e| log::error!("Init http service error: {:?}", e)));
self.expect = Some(expect);
self.fut_ex.take();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
if let Some(ref mut fut) = this.fut_ex {
let expect = ready!(Pin::new(fut)
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this.expect = Some(expect);
this.fut_ex.take();
}
if let Some(ref mut fut) = self.fut_upg {
let upgrade = try_ready!(fut
.poll()
.map_err(|e| log::error!("Init http service error: {:?}", e)));
self.upgrade = Some(upgrade);
self.fut_ex.take();
if let Some(ref mut fut) = this.fut_upg {
let upgrade = ready!(Pin::new(fut)
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this.upgrade = Some(upgrade);
this.fut_ex.take();
}
let service = try_ready!(self
.fut
.poll()
let result = ready!(Pin::new(&mut this.fut)
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Ok(Async::Ready(HttpServiceHandler::new(
self.cfg.take().unwrap(),
service,
self.expect.take().unwrap(),
self.upgrade.take(),
self.on_connect.clone(),
)))
Poll::Ready(result.map(|service| {
HttpServiceHandler::new(
this.cfg.take().unwrap(),
service,
this.expect.take().unwrap(),
this.upgrade.take(),
this.on_connect.clone(),
)
}))
}
}
@ -263,15 +309,19 @@ pub struct HttpServiceHandler<T, P, S, B, X, U> {
impl<T, P, S, B, X, U> HttpServiceHandler<T, P, S, B, X, U>
where
S: Service<Request = Request>,
S::Error: Into<Error>,
S: Service<Request = Request> + Unpin,
S::Error: Into<Error> + Unpin + 'static,
S::Future: 'static,
S::Response: Into<Response<B>>,
S::Response: Into<Response<B>> + Unpin + 'static,
S::Future: Unpin,
B: MessageBody + 'static,
X: Service<Request = Request, Response = Request>,
X: Service<Request = Request, Response = Request> + Unpin,
X::Future: Unpin,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()> + Unpin,
U::Future: Unpin,
U::Error: fmt::Display,
P: Unpin,
{
fn new(
cfg: ServiceConfig,
@ -293,26 +343,29 @@ where
impl<T, P, S, B, X, U> Service for HttpServiceHandler<T, P, S, B, X, U>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
T: IoStream + Unpin,
S: Service<Request = Request> + Unpin,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
X: Service<Request = Request, Response = Request>,
X: Service<Request = Request, Response = Request> + Unpin,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()> + Unpin,
U::Error: fmt::Display,
U::Future: Unpin,
P: Unpin,
{
type Request = ServerIo<T, P>;
type Response = ();
type Error = DispatchError;
type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let ready = self
.expect
.poll_ready()
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
@ -322,7 +375,7 @@ where
let ready = self
.srv
.poll_ready()
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
@ -332,9 +385,9 @@ where
&& ready;
if ready {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
@ -391,15 +444,17 @@ where
enum State<T, S, B, X, U>
where
S: Service<Request = Request>,
S::Future: 'static,
S: Service<Request = Request> + Unpin,
S::Future: Unpin + 'static,
S::Error: Into<Error>,
T: IoStream,
T: IoStream + Unpin,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request = Request, Response = Request> + Unpin,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()> + Unpin,
U::Error: fmt::Display,
U::Future: Unpin,
{
H1(h1::Dispatcher<T, S, B, X, U>),
H2(Dispatcher<Io<T>, S, B>),
@ -427,16 +482,18 @@ where
pub struct HttpServiceHandlerResponse<T, S, B, X, U>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
T: IoStream + Unpin,
S: Service<Request = Request> + Unpin,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody + 'static,
X: Service<Request = Request, Response = Request>,
X: Service<Request = Request, Response = Request> + Unpin,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
X::Future: Unpin,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()> + Unpin,
U::Error: fmt::Display,
U::Future: Unpin,
{
state: State<T, S, B, X, U>,
}
@ -445,32 +502,33 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U>
where
T: IoStream,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Future: 'static,
S::Response: Into<Response<B>>,
T: IoStream + Unpin,
S: Service<Request = Request> + Unpin,
S::Error: Into<Error> + Unpin + 'static,
S::Future: Unpin + 'static,
S::Response: Into<Response<B>> + Unpin + 'static,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request = Request, Response = Request> + Unpin,
X::Future: Unpin,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()> + Unpin,
U::Future: Unpin,
U::Error: fmt::Display,
{
type Item = ();
type Error = DispatchError;
type Output = Result<(), DispatchError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.state {
State::H1(ref mut disp) => disp.poll(),
State::H2(ref mut disp) => disp.poll(),
State::H1(ref mut disp) => Pin::new(disp).poll(cx),
State::H2(ref mut disp) => Pin::new(disp).poll(cx),
State::Unknown(ref mut data) => {
if let Some(ref mut item) = data {
loop {
// Safety - we only write to the returned slice.
let b = unsafe { item.1.bytes_mut() };
let n = try_ready!(item.0.poll_read(b));
let n = ready!(Pin::new(&mut item.0).poll_read(cx, b))?;
if n == 0 {
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
// Safety - we know that 'n' bytes have
// been initialized via the contract of
@ -511,17 +569,17 @@ where
on_connect,
))
}
self.poll()
self.poll(cx)
}
State::Handshake(ref mut data) => {
let conn = if let Some(ref mut item) = data {
match item.0.poll() {
Ok(Async::Ready(conn)) => conn,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
match Pin::new(&mut item.0).poll(cx) {
Poll::Ready(Ok(conn)) => conn,
Poll::Ready(Err(err)) => {
trace!("H2 handshake error: {}", err);
return Err(err.into());
return Poll::Ready(Err(err.into()));
}
Poll::Pending => return Poll::Pending,
}
} else {
panic!()
@ -530,7 +588,7 @@ where
self.state = State::H2(Dispatcher::new(
srv, conn, on_connect, cfg, None, peer_addr,
));
self.poll()
self.poll(cx)
}
}
}
@ -542,6 +600,8 @@ struct Io<T> {
inner: T,
}
impl<T> Unpin for Io<T> {}
impl<T: io::Read> io::Read for Io<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Some(mut bytes) = self.unread.take() {
@ -567,22 +627,62 @@ impl<T: io::Write> io::Write for Io<T> {
}
}
impl<T: AsyncRead> AsyncRead for Io<T> {
impl<T: AsyncRead + Unpin> AsyncRead for Io<T> {
// unsafe fn initializer(&self) -> io::Initializer {
// self.get_mut().inner.initializer()
// }
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
}
// fn poll_read_vectored(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// bufs: &mut [io::IoSliceMut<'_>],
// ) -> Poll<io::Result<usize>> {
// self.get_mut().inner.poll_read_vectored(cx, bufs)
// }
}
impl<T: AsyncWrite> AsyncWrite for Io<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
impl<T: AsyncWrite + Unpin> tokio_io::AsyncWrite for Io<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
// fn poll_write_vectored(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// bufs: &[io::IoSlice<'_>],
// ) -> Poll<io::Result<usize>> {
// self.get_mut().inner.poll_write_vectored(cx, bufs)
// }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_shutdown(cx)
}
}
impl<T: IoStream> IoStream for Io<T> {
impl<T: IoStream> actix_server_config::IoStream for Io<T> {
#[inline]
fn peer_addr(&self) -> Option<net::SocketAddr> {
self.inner.peer_addr()

View File

@ -1,12 +1,13 @@
//! Test Various helpers for Actix applications to use during testing.
use std::fmt::Write as FmtWrite;
use std::io;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_server_config::IoStream;
use bytes::{Buf, Bytes, BytesMut};
use futures::{Async, Poll};
use http::header::{self, HeaderName, HeaderValue};
use http::{HttpTryFrom, Method, Uri, Version};
use percent_encoding::percent_encode;
@ -244,16 +245,16 @@ impl io::Write for TestBuffer {
}
}
impl AsyncRead for TestBuffer {}
// impl AsyncRead for TestBuffer {}
impl AsyncWrite for TestBuffer {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(Async::Ready(()))
}
fn write_buf<B: Buf>(&mut self, _: &mut B) -> Poll<usize, io::Error> {
Ok(Async::NotReady)
}
}
// impl AsyncWrite for TestBuffer {
// fn shutdown(&mut self) -> Poll<(), io::Error> {
// Ok(Async::Ready(()))
// }
// fn write_buf<B: Buf>(&mut self, _: &mut B) -> Poll<usize, io::Error> {
// Ok(Async::NotReady)
// }
// }
impl IoStream for TestBuffer {
fn set_nodelay(&mut self, _nodelay: bool) -> io::Result<()> {