1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-30 18:44:35 +01:00

migrate awc and test-server to std::future

This commit is contained in:
Nikolay Kim 2019-11-19 09:55:17 +06:00
parent a6a2d2f444
commit 5ab29b2e62
12 changed files with 376 additions and 397 deletions

View File

@ -41,7 +41,7 @@ members = [
# "actix-web-codegen", # "actix-web-codegen",
# "test-server", # "test-server",
] ]
exclude = ["actix-http"] exclude = ["awc", "actix-http", "test-server"]
[features] [features]
default = ["brotli", "flate2-zlib", "client", "fail"] default = ["brotli", "flate2-zlib", "client", "fail"]

View File

@ -253,7 +253,7 @@ where
impl<S, E> From<BodyStream<S, E>> for Body impl<S, E> From<BodyStream<S, E>> for Body
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + Unpin + 'static, E: Into<Error> + 'static,
{ {
fn from(s: BodyStream<S, E>) -> Body { fn from(s: BodyStream<S, E>) -> Body {
Body::from_message(s) Body::from_message(s)
@ -368,10 +368,17 @@ where
} }
} }
impl<S, E> Unpin for BodyStream<S, E>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error>,
{
}
impl<S, E> MessageBody for BodyStream<S, E> impl<S, E> MessageBody for BodyStream<S, E>
where where
S: Stream<Item = Result<Bytes, E>> + Unpin, S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Error> + Unpin, E: Into<Error>,
{ {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
BodySize::Stream BodySize::Stream

View File

@ -22,7 +22,7 @@ pub(crate) enum ConnectionType<Io> {
} }
pub trait Connection { pub trait Connection {
type Io: AsyncRead + AsyncWrite; type Io: AsyncRead + AsyncWrite + Unpin;
type Future: Future<Output = Result<(ResponseHead, Payload), SendRequestError>>; type Future: Future<Output = Result<(ResponseHead, Payload), SendRequestError>>;
fn protocol(&self) -> Protocol; fn protocol(&self) -> Protocol;

View File

@ -1,6 +1,6 @@
[package] [package]
name = "awc" name = "awc"
version = "0.2.8" version = "0.3.0-alpha.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http client." description = "Actix http client."
readme = "README.md" readme = "README.md"
@ -14,23 +14,23 @@ categories = ["network-programming", "asynchronous",
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018" edition = "2018"
workspace = ".." # workspace = ".."
[lib] [lib]
name = "awc" name = "awc"
path = "src/lib.rs" path = "src/lib.rs"
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["ssl", "brotli", "flate2-zlib"] features = ["openssl", "brotli", "flate2-zlib"]
[features] [features]
default = ["brotli", "flate2-zlib"] default = ["brotli", "flate2-zlib"]
# openssl # openssl
ssl = ["openssl", "actix-http/ssl"] openssl = ["open-ssl", "actix-http/openssl"]
# rustls # rustls
rust-tls = ["rustls", "actix-http/rust-tls"] rustls = ["rust-tls", "actix-http/rustls"]
# brotli encoding, requires c compiler # brotli encoding, requires c compiler
brotli = ["actix-http/brotli"] brotli = ["actix-http/brotli"]
@ -42,13 +42,14 @@ flate2-zlib = ["actix-http/flate2-zlib"]
flate2-rust = ["actix-http/flate2-rust"] flate2-rust = ["actix-http/flate2-rust"]
[dependencies] [dependencies]
actix-codec = "0.1.2" actix-codec = "0.2.0-alpha.1"
actix-service = "0.4.1" actix-service = "1.0.0-alpha.1"
actix-http = "0.2.11" actix-http = "0.3.0-alpha.1"
base64 = "0.10.1" base64 = "0.10.1"
bytes = "0.4" bytes = "0.4"
derive_more = "0.15.0" derive_more = "0.15.0"
futures = "0.1.25" futures = "0.3.1"
log =" 0.4" log =" 0.4"
mime = "0.3" mime = "0.3"
percent-encoding = "2.1" percent-encoding = "2.1"
@ -56,21 +57,33 @@ rand = "0.7"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_urlencoded = "0.6.1" serde_urlencoded = "0.6.1"
tokio-timer = "0.2.8" tokio-timer = "0.3.0-alpha.6"
openssl = { version="0.10", optional = true } open-ssl = { version="0.10", package="openssl", optional = true }
rustls = { version = "0.15.2", optional = true } rust-tls = { version = "0.16.0", package="rustls", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "0.2.2" actix-rt = "1.0.0-alpha.1"
actix-web = { version = "1.0.8", features=["ssl"] } #actix-web = { version = "1.0.8", features=["ssl"] }
actix-http = { version = "0.2.11", features=["ssl"] } actix-http = { version = "0.3.0-alpha.1", features=["openssl"] }
actix-http-test = { version = "0.2.0", features=["ssl"] } #actix-http-test = { version = "0.2.0", features=["ssl"] }
actix-utils = "0.4.1" actix-utils = "0.5.0-alpha.1"
actix-server = { version = "0.6.0", features=["ssl", "rust-tls"] } actix-server = { version = "0.8.0-alpha.1", features=["openssl", "rustls"] }
brotli2 = { version="0.3.2" } brotli2 = { version="0.3.2" }
flate2 = { version="1.0.2" } flate2 = { version="1.0.2" }
env_logger = "0.6" env_logger = "0.6"
rand = "0.7" rand = "0.7"
tokio-tcp = "0.1" tokio-tcp = "0.1"
webpki = "0.19" webpki = { version = "0.21" }
rustls = { version = "0.15.2", features = ["dangerous_configuration"] } rus-tls = { version = "0.16.0", package="rustls", features = ["dangerous_configuration"] }
[patch.crates-io]
actix-http = { path = "../actix-http" }
actix-codec = { path = "../../actix-net/actix-codec" }
actix-connect = { path = "../../actix-net/actix-connect" }
actix-rt = { path = "../../actix-net/actix-rt" }
actix-server = { path = "../../actix-net/actix-server" }
actix-server-config = { path = "../../actix-net/actix-server-config" }
actix-service = { path = "../../actix-net/actix-service" }
actix-threadpool = { path = "../../actix-net/actix-threadpool" }
actix-utils = { path = "../../actix-net/actix-utils" }

View File

@ -1,4 +1,6 @@
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll};
use std::{fmt, io, net}; use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
@ -10,7 +12,7 @@ use actix_http::h1::ClientCodec;
use actix_http::http::HeaderMap; use actix_http::http::HeaderMap;
use actix_http::{RequestHead, RequestHeadType, ResponseHead}; use actix_http::{RequestHead, RequestHeadType, ResponseHead};
use actix_service::Service; use actix_service::Service;
use futures::{Future, Poll}; use futures::future::{FutureExt, LocalBoxFuture};
use crate::response::ClientResponse; use crate::response::ClientResponse;
@ -22,7 +24,7 @@ pub(crate) trait Connect {
head: RequestHead, head: RequestHead,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box<dyn Future<Item = ClientResponse, Error = SendRequestError>>; ) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>>;
fn send_request_extra( fn send_request_extra(
&mut self, &mut self,
@ -30,18 +32,16 @@ pub(crate) trait Connect {
extra_headers: Option<HeaderMap>, extra_headers: Option<HeaderMap>,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box<dyn Future<Item = ClientResponse, Error = SendRequestError>>; ) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>>;
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel( fn open_tunnel(
&mut self, &mut self,
head: RequestHead, head: RequestHead,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box< ) -> LocalBoxFuture<
dyn Future< 'static,
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>), Result<(ResponseHead, Framed<BoxedSocket, ClientCodec>), SendRequestError>,
Error = SendRequestError,
>,
>; >;
/// Send request and extra headers, returns Response and Framed /// Send request and extra headers, returns Response and Framed
@ -50,11 +50,9 @@ pub(crate) trait Connect {
head: Rc<RequestHead>, head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>, extra_headers: Option<HeaderMap>,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box< ) -> LocalBoxFuture<
dyn Future< 'static,
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>), Result<(ResponseHead, Framed<BoxedSocket, ClientCodec>), SendRequestError>,
Error = SendRequestError,
>,
>; >;
} }
@ -72,21 +70,23 @@ where
head: RequestHead, head: RequestHead,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box<dyn Future<Item = ClientResponse, Error = SendRequestError>> { ) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>> {
Box::new(
self.0
// connect to the host // connect to the host
.call(ClientConnect { let fut = self.0.call(ClientConnect {
uri: head.uri.clone(), uri: head.uri.clone(),
addr, addr,
}) });
.from_err()
async move {
let connection = fut.await?;
// send request // send request
.and_then(move |connection| { connection
connection.send_request(RequestHeadType::from(head), body) .send_request(RequestHeadType::from(head), body)
}) .await
.map(|(head, payload)| ClientResponse::new(head, payload)), .map(|(head, payload)| ClientResponse::new(head, payload))
) }
.boxed_local()
} }
fn send_request_extra( fn send_request_extra(
@ -95,51 +95,51 @@ where
extra_headers: Option<HeaderMap>, extra_headers: Option<HeaderMap>,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box<dyn Future<Item = ClientResponse, Error = SendRequestError>> { ) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>> {
Box::new(
self.0
// connect to the host // connect to the host
.call(ClientConnect { let fut = self.0.call(ClientConnect {
uri: head.uri.clone(), uri: head.uri.clone(),
addr, addr,
}) });
.from_err()
async move {
let connection = fut.await?;
// send request // send request
.and_then(move |connection| { let (head, payload) = connection
connection
.send_request(RequestHeadType::Rc(head, extra_headers), body) .send_request(RequestHeadType::Rc(head, extra_headers), body)
}) .await?;
.map(|(head, payload)| ClientResponse::new(head, payload)),
) Ok(ClientResponse::new(head, payload))
}
.boxed_local()
} }
fn open_tunnel( fn open_tunnel(
&mut self, &mut self,
head: RequestHead, head: RequestHead,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box< ) -> LocalBoxFuture<
dyn Future< 'static,
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>), Result<(ResponseHead, Framed<BoxedSocket, ClientCodec>), SendRequestError>,
Error = SendRequestError,
>,
> { > {
Box::new(
self.0
// connect to the host // connect to the host
.call(ClientConnect { let fut = self.0.call(ClientConnect {
uri: head.uri.clone(), uri: head.uri.clone(),
addr, addr,
}) });
.from_err()
async move {
let connection = fut.await?;
// send request // send request
.and_then(move |connection| { let (head, framed) =
connection.open_tunnel(RequestHeadType::from(head)) connection.open_tunnel(RequestHeadType::from(head)).await?;
})
.map(|(head, framed)| {
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
(head, framed) Ok((head, framed))
}), }
) .boxed_local()
} }
fn open_tunnel_extra( fn open_tunnel_extra(
@ -147,48 +147,47 @@ where
head: Rc<RequestHead>, head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>, extra_headers: Option<HeaderMap>,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box< ) -> LocalBoxFuture<
dyn Future< 'static,
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>), Result<(ResponseHead, Framed<BoxedSocket, ClientCodec>), SendRequestError>,
Error = SendRequestError,
>,
> { > {
Box::new(
self.0
// connect to the host // connect to the host
.call(ClientConnect { let fut = self.0.call(ClientConnect {
uri: head.uri.clone(), uri: head.uri.clone(),
addr, addr,
}) });
.from_err()
async move {
let connection = fut.await?;
// send request // send request
.and_then(move |connection| { let (head, framed) = connection
connection.open_tunnel(RequestHeadType::Rc(head, extra_headers)) .open_tunnel(RequestHeadType::Rc(head, extra_headers))
}) .await?;
.map(|(head, framed)| {
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
(head, framed) Ok((head, framed))
}), }
) .boxed_local()
} }
} }
trait AsyncSocket { trait AsyncSocket {
fn as_read(&self) -> &dyn AsyncRead; fn as_read(&self) -> &(dyn AsyncRead + Unpin);
fn as_read_mut(&mut self) -> &mut dyn AsyncRead; fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin);
fn as_write(&mut self) -> &mut dyn AsyncWrite; fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin);
} }
struct Socket<T: AsyncRead + AsyncWrite>(T); struct Socket<T: AsyncRead + AsyncWrite + Unpin>(T);
impl<T: AsyncRead + AsyncWrite> AsyncSocket for Socket<T> { impl<T: AsyncRead + AsyncWrite + Unpin> AsyncSocket for Socket<T> {
fn as_read(&self) -> &dyn AsyncRead { fn as_read(&self) -> &(dyn AsyncRead + Unpin) {
&self.0 &self.0
} }
fn as_read_mut(&mut self) -> &mut dyn AsyncRead { fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin) {
&mut self.0 &mut self.0
} }
fn as_write(&mut self) -> &mut dyn AsyncWrite { fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin) {
&mut self.0 &mut self.0
} }
} }
@ -201,30 +200,37 @@ impl fmt::Debug for BoxedSocket {
} }
} }
impl io::Read for BoxedSocket {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.as_read_mut().read(buf)
}
}
impl AsyncRead for BoxedSocket { impl AsyncRead for BoxedSocket {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.0.as_read().prepare_uninitialized_buffer(buf) self.0.as_read().prepare_uninitialized_buffer(buf)
} }
}
impl io::Write for BoxedSocket { fn poll_read(
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self: Pin<&mut Self>,
self.0.as_write().write(buf) cx: &mut Context<'_>,
} buf: &mut [u8],
) -> Poll<io::Result<usize>> {
fn flush(&mut self) -> io::Result<()> { Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf)
self.0.as_write().flush()
} }
} }
impl AsyncWrite for BoxedSocket { impl AsyncWrite for BoxedSocket {
fn shutdown(&mut self) -> Poll<(), io::Error> { fn poll_write(
self.0.as_write().shutdown() self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(self.get_mut().0.as_write()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_write()).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_write()).poll_shutdown(cx)
} }
} }

View File

@ -82,7 +82,7 @@ impl FrozenClientRequest {
/// Send a streaming body. /// Send a streaming body.
pub fn send_stream<S, E>(&self, stream: S) -> SendClientRequest pub fn send_stream<S, E>(&self, stream: S) -> SendClientRequest
where where
S: Stream<Item = Bytes, Error = E> + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
RequestSender::Rc(self.head.clone(), None).send_stream( RequestSender::Rc(self.head.clone(), None).send_stream(
@ -203,7 +203,7 @@ impl FrozenSendBuilder {
/// Complete request construction and send a streaming body. /// Complete request construction and send a streaming body.
pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest
where where
S: Stream<Item = Bytes, Error = E> + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
if let Some(e) = self.err { if let Some(e) = self.err {

View File

@ -478,7 +478,7 @@ impl ClientRequest {
/// Set an streaming body and generate `ClientRequest`. /// Set an streaming body and generate `ClientRequest`.
pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest
where where
S: Stream<Item = Bytes, Error = E> + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
let slf = match self.prep_for_sending() { let slf = match self.prep_for_sending() {

View File

@ -1,9 +1,11 @@
use std::cell::{Ref, RefMut}; use std::cell::{Ref, RefMut};
use std::fmt; use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Future, Poll, Stream}; use futures::{ready, Future, Stream};
use actix_http::cookie::Cookie; use actix_http::cookie::Cookie;
use actix_http::error::{CookieParseError, PayloadError}; use actix_http::error::{CookieParseError, PayloadError};
@ -104,7 +106,7 @@ impl<S> ClientResponse<S> {
impl<S> ClientResponse<S> impl<S> ClientResponse<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Result<Bytes, PayloadError>>,
{ {
/// Loads http response's body. /// Loads http response's body.
pub fn body(&mut self) -> MessageBody<S> { pub fn body(&mut self) -> MessageBody<S> {
@ -125,13 +127,12 @@ where
impl<S> Stream for ClientResponse<S> impl<S> Stream for ClientResponse<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
{ {
type Item = Bytes; type Item = Result<Bytes, PayloadError>;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.payload.poll() Pin::new(&mut self.get_mut().payload).poll_next(cx)
} }
} }
@ -155,7 +156,7 @@ pub struct MessageBody<S> {
impl<S> MessageBody<S> impl<S> MessageBody<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Result<Bytes, PayloadError>>,
{ {
/// Create `MessageBody` for request. /// Create `MessageBody` for request.
pub fn new(res: &mut ClientResponse<S>) -> MessageBody<S> { pub fn new(res: &mut ClientResponse<S>) -> MessageBody<S> {
@ -198,23 +199,24 @@ where
impl<S> Future for MessageBody<S> impl<S> Future for MessageBody<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
{ {
type Item = Bytes; type Output = Result<Bytes, PayloadError>;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(err) = self.err.take() { let this = self.get_mut();
return Err(err);
if let Some(err) = this.err.take() {
return Poll::Ready(Err(err));
} }
if let Some(len) = self.length.take() { if let Some(len) = this.length.take() {
if len > self.fut.as_ref().unwrap().limit { if len > this.fut.as_ref().unwrap().limit {
return Err(PayloadError::Overflow); return Poll::Ready(Err(PayloadError::Overflow));
} }
} }
self.fut.as_mut().unwrap().poll() Pin::new(&mut this.fut.as_mut().unwrap()).poll(cx)
} }
} }
@ -233,7 +235,7 @@ pub struct JsonBody<S, U> {
impl<S, U> JsonBody<S, U> impl<S, U> JsonBody<S, U>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Result<Bytes, PayloadError>>,
U: DeserializeOwned, U: DeserializeOwned,
{ {
/// Create `JsonBody` for request. /// Create `JsonBody` for request.
@ -279,27 +281,35 @@ where
} }
} }
impl<T, U> Future for JsonBody<T, U> impl<T, U> Unpin for JsonBody<T, U>
where where
T: Stream<Item = Bytes, Error = PayloadError>, T: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
U: DeserializeOwned, U: DeserializeOwned,
{ {
type Item = U; }
type Error = JsonPayloadError;
fn poll(&mut self) -> Poll<U, JsonPayloadError> { impl<T, U> Future for JsonBody<T, U>
where
T: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
U: DeserializeOwned,
{
type Output = Result<U, JsonPayloadError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(err) = self.err.take() { if let Some(err) = self.err.take() {
return Err(err); return Poll::Ready(Err(err));
} }
if let Some(len) = self.length.take() { if let Some(len) = self.length.take() {
if len > self.fut.as_ref().unwrap().limit { if len > self.fut.as_ref().unwrap().limit {
return Err(JsonPayloadError::Payload(PayloadError::Overflow)); return Poll::Ready(Err(JsonPayloadError::Payload(
PayloadError::Overflow,
)));
} }
} }
let body = futures::try_ready!(self.fut.as_mut().unwrap().poll()); let body = ready!(Pin::new(&mut self.get_mut().fut.as_mut().unwrap()).poll(cx))?;
Ok(Async::Ready(serde_json::from_slice::<U>(&body)?)) Poll::Ready(serde_json::from_slice::<U>(&body).map_err(JsonPayloadError::from))
} }
} }
@ -321,24 +331,25 @@ impl<S> ReadBody<S> {
impl<S> Future for ReadBody<S> impl<S> Future for ReadBody<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
{ {
type Item = Bytes; type Output = Result<Bytes, PayloadError>;
type Error = PayloadError;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop { loop {
return match self.stream.poll()? { return match Pin::new(&mut this.stream).poll_next(cx)? {
Async::Ready(Some(chunk)) => { Poll::Ready(Some(chunk)) => {
if (self.buf.len() + chunk.len()) > self.limit { if (this.buf.len() + chunk.len()) > this.limit {
Err(PayloadError::Overflow) Poll::Ready(Err(PayloadError::Overflow))
} else { } else {
self.buf.extend_from_slice(&chunk); this.buf.extend_from_slice(&chunk);
continue; continue;
} }
} }
Async::Ready(None) => Ok(Async::Ready(self.buf.take().freeze())), Poll::Ready(None) => Poll::Ready(Ok(this.buf.take().freeze())),
Async::NotReady => Ok(Async::NotReady), Poll::Pending => Poll::Pending,
}; };
} }
} }

View File

@ -1,13 +1,15 @@
use std::net; use std::net;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::time::{Duration, Instant}; use std::task::{Context, Poll};
use std::time::Duration;
use bytes::Bytes; use bytes::Bytes;
use derive_more::From; use derive_more::From;
use futures::{try_ready, Async, Future, Poll, Stream}; use futures::{future::LocalBoxFuture, ready, Future, Stream};
use serde::Serialize; use serde::Serialize;
use serde_json; use serde_json;
use tokio_timer::Delay; use tokio_timer::{delay_for, Delay};
use actix_http::body::{Body, BodyStream}; use actix_http::body::{Body, BodyStream};
use actix_http::encoding::Decoder; use actix_http::encoding::Decoder;
@ -47,7 +49,7 @@ impl Into<SendRequestError> for PrepForSendingError {
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub enum SendClientRequest { pub enum SendClientRequest {
Fut( Fut(
Box<dyn Future<Item = ClientResponse, Error = SendRequestError>>, LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>>,
Option<Delay>, Option<Delay>,
bool, bool,
), ),
@ -56,41 +58,51 @@ pub enum SendClientRequest {
impl SendClientRequest { impl SendClientRequest {
pub(crate) fn new( pub(crate) fn new(
send: Box<dyn Future<Item = ClientResponse, Error = SendRequestError>>, send: LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>>,
response_decompress: bool, response_decompress: bool,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> SendClientRequest { ) -> SendClientRequest {
let delay = timeout.map(|t| Delay::new(Instant::now() + t)); let delay = timeout.map(|t| delay_for(t));
SendClientRequest::Fut(send, delay, response_decompress) SendClientRequest::Fut(send, delay, response_decompress)
} }
} }
impl Future for SendClientRequest { impl Future for SendClientRequest {
type Item = ClientResponse<Decoder<Payload<PayloadStream>>>; type Output =
type Error = SendRequestError; Result<ClientResponse<Decoder<Payload<PayloadStream>>>, SendRequestError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self { let this = self.get_mut();
match this {
SendClientRequest::Fut(send, delay, response_decompress) => { SendClientRequest::Fut(send, delay, response_decompress) => {
if delay.is_some() { if delay.is_some() {
match delay.poll() { match Pin::new(delay.as_mut().unwrap()).poll(cx) {
Ok(Async::NotReady) => (), Poll::Pending => (),
_ => return Err(SendRequestError::Timeout), _ => return Poll::Ready(Err(SendRequestError::Timeout)),
} }
} }
let res = try_ready!(send.poll()).map_body(|head, payload| { let res = ready!(Pin::new(send).poll(cx)).map(|res| {
res.map_body(|head, payload| {
if *response_decompress { if *response_decompress {
Payload::Stream(Decoder::from_headers(payload, &head.headers)) Payload::Stream(Decoder::from_headers(
payload,
&head.headers,
))
} else { } else {
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) Payload::Stream(Decoder::new(
payload,
ContentEncoding::Identity,
))
} }
})
}); });
Ok(Async::Ready(res)) Poll::Ready(res)
} }
SendClientRequest::Err(ref mut e) => match e.take() { SendClientRequest::Err(ref mut e) => match e.take() {
Some(e) => Err(e), Some(e) => Poll::Ready(Err(e)),
None => panic!("Attempting to call completed future"), None => panic!("Attempting to call completed future"),
}, },
} }
@ -223,7 +235,7 @@ impl RequestSender {
stream: S, stream: S,
) -> SendClientRequest ) -> SendClientRequest
where where
S: Stream<Item = Bytes, Error = E> + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
self.send_body( self.send_body(

View File

@ -7,7 +7,6 @@ use std::{fmt, str};
use actix_codec::Framed; use actix_codec::Framed;
use actix_http::cookie::{Cookie, CookieJar}; use actix_http::cookie::{Cookie, CookieJar};
use actix_http::{ws, Payload, RequestHead}; use actix_http::{ws, Payload, RequestHead};
use futures::future::{err, Either, Future};
use percent_encoding::percent_encode; use percent_encoding::percent_encode;
use tokio_timer::Timeout; use tokio_timer::Timeout;
@ -210,27 +209,26 @@ impl WebsocketsRequest {
} }
/// Complete request construction and connect to a websockets server. /// Complete request construction and connect to a websockets server.
pub fn connect( pub async fn connect(
mut self, mut self,
) -> impl Future<Item = (ClientResponse, Framed<BoxedSocket, Codec>), Error = WsClientError> ) -> Result<(ClientResponse, Framed<BoxedSocket, Codec>), WsClientError> {
{
if let Some(e) = self.err.take() { if let Some(e) = self.err.take() {
return Either::A(err(e.into())); return Err(e.into());
} }
// validate uri // validate uri
let uri = &self.head.uri; let uri = &self.head.uri;
if uri.host().is_none() { if uri.host().is_none() {
return Either::A(err(InvalidUrl::MissingHost.into())); return Err(InvalidUrl::MissingHost.into());
} else if uri.scheme_part().is_none() { } else if uri.scheme_part().is_none() {
return Either::A(err(InvalidUrl::MissingScheme.into())); return Err(InvalidUrl::MissingScheme.into());
} else if let Some(scheme) = uri.scheme_part() { } else if let Some(scheme) = uri.scheme_part() {
match scheme.as_str() { match scheme.as_str() {
"http" | "ws" | "https" | "wss" => (), "http" | "ws" | "https" | "wss" => (),
_ => return Either::A(err(InvalidUrl::UnknownScheme.into())), _ => return Err(InvalidUrl::UnknownScheme.into()),
} }
} else { } else {
return Either::A(err(InvalidUrl::UnknownScheme.into())); return Err(InvalidUrl::UnknownScheme.into());
} }
if !self.head.headers.contains_key(header::HOST) { if !self.head.headers.contains_key(header::HOST) {
@ -294,13 +292,23 @@ impl WebsocketsRequest {
.config .config
.connector .connector
.borrow_mut() .borrow_mut()
.open_tunnel(head, self.addr) .open_tunnel(head, self.addr);
.from_err()
.and_then(move |(head, framed)| { // set request timeout
let (head, framed) = if let Some(timeout) = self.config.timeout {
Timeout::new(fut, timeout)
.await
.map_err(|_| SendRequestError::Timeout.into())
.and_then(|res| res)?
} else {
fut.await?
};
// verify response // verify response
if head.status != StatusCode::SWITCHING_PROTOCOLS { if head.status != StatusCode::SWITCHING_PROTOCOLS {
return Err(WsClientError::InvalidResponseStatus(head.status)); return Err(WsClientError::InvalidResponseStatus(head.status));
} }
// Check for "UPGRADE" to websocket header // Check for "UPGRADE" to websocket header
let has_hdr = if let Some(hdr) = head.headers.get(&header::UPGRADE) { let has_hdr = if let Some(hdr) = head.headers.get(&header::UPGRADE) {
if let Ok(s) = hdr.to_str() { if let Ok(s) = hdr.to_str() {
@ -315,20 +323,17 @@ impl WebsocketsRequest {
log::trace!("Invalid upgrade header"); log::trace!("Invalid upgrade header");
return Err(WsClientError::InvalidUpgradeHeader); return Err(WsClientError::InvalidUpgradeHeader);
} }
// Check for "CONNECTION" header // Check for "CONNECTION" header
if let Some(conn) = head.headers.get(&header::CONNECTION) { if let Some(conn) = head.headers.get(&header::CONNECTION) {
if let Ok(s) = conn.to_str() { if let Ok(s) = conn.to_str() {
if !s.to_ascii_lowercase().contains("upgrade") { if !s.to_ascii_lowercase().contains("upgrade") {
log::trace!("Invalid connection header: {}", s); log::trace!("Invalid connection header: {}", s);
return Err(WsClientError::InvalidConnectionHeader( return Err(WsClientError::InvalidConnectionHeader(conn.clone()));
conn.clone(),
));
} }
} else { } else {
log::trace!("Invalid connection header: {:?}", conn); log::trace!("Invalid connection header: {:?}", conn);
return Err(WsClientError::InvalidConnectionHeader( return Err(WsClientError::InvalidConnectionHeader(conn.clone()));
conn.clone(),
));
} }
} else { } else {
log::trace!("Missing connection header"); log::trace!("Missing connection header");
@ -364,20 +369,6 @@ impl WebsocketsRequest {
} }
}), }),
)) ))
});
// set request timeout
if let Some(timeout) = self.config.timeout {
Either::B(Either::A(Timeout::new(fut, timeout).map_err(|e| {
if let Some(e) = e.into_inner() {
e
} else {
SendRequestError::Timeout.into()
}
})))
} else {
Either::B(Either::B(fut))
}
} }
} }

View File

@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous",
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018" edition = "2018"
workspace = ".." # workspace = ".."
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = [] features = []
@ -27,20 +27,22 @@ path = "src/lib.rs"
default = [] default = []
# openssl # openssl
ssl = ["openssl", "actix-server/ssl", "awc/ssl"] openssl = ["open-ssl", "actix-server/openssl", "awc/openssl"]
[dependencies] [dependencies]
actix-codec = "0.1.2" actix-service = "1.0.0-alpha.1"
actix-rt = "0.2.2" actix-codec = "0.2.0-alpha.1"
actix-service = "0.4.1" actix-connect = "1.0.0-alpha.1"
actix-server = "0.6.0" actix-utils = "0.5.0-alpha.1"
actix-utils = "0.4.1" actix-rt = "1.0.0-alpha.1"
awc = "0.2.6" actix-server = "0.8.0-alpha.1"
actix-connect = "0.2.2" actix-server-config = "0.3.0-alpha.1"
actix-testing = "0.3.0-alpha.1"
awc = "0.3.0-alpha.1"
base64 = "0.10" base64 = "0.10"
bytes = "0.4" bytes = "0.4"
futures = "0.1" futures = "0.3.1"
http = "0.1.8" http = "0.1.8"
log = "0.4" log = "0.4"
env_logger = "0.6" env_logger = "0.6"
@ -51,10 +53,25 @@ sha1 = "0.6"
slab = "0.4" slab = "0.4"
serde_urlencoded = "0.6.1" serde_urlencoded = "0.6.1"
time = "0.1" time = "0.1"
tokio-tcp = "0.1" tokio-net = "0.2.0-alpha.6"
tokio-timer = "0.2" tokio-timer = "0.3.0-alpha.6"
openssl = { version="0.10", optional = true }
open-ssl = { version="0.10", package="openssl", optional = true }
[dev-dependencies] [dev-dependencies]
actix-web = "1.0.7" #actix-web = "1.0.7"
actix-http = "0.2.9" actix-http = "0.3.0-alpha.1"
[patch.crates-io]
actix-http = { path = "../actix-http" }
awc = { path = "../awc" }
actix-codec = { path = "../../actix-net/actix-codec" }
actix-connect = { path = "../../actix-net/actix-connect" }
actix-rt = { path = "../../actix-net/actix-rt" }
actix-server = { path = "../../actix-net/actix-server" }
actix-server-config = { path = "../../actix-net/actix-server-config" }
actix-service = { path = "../../actix-net/actix-service" }
actix-testing = { path = "../../actix-net/actix-testing" }
actix-threadpool = { path = "../../actix-net/actix-threadpool" }
actix-utils = { path = "../../actix-net/actix-utils" }

View File

@ -1,73 +1,18 @@
//! Various helpers for Actix applications to use during testing. //! Various helpers for Actix applications to use during testing.
use std::cell::RefCell;
use std::sync::mpsc; use std::sync::mpsc;
use std::{net, thread, time}; use std::{net, thread, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::{Runtime, System}; use actix_rt::{System};
use actix_server::{Server, StreamServiceFactory}; use actix_server::{Server, ServiceFactory};
use awc::{error::PayloadError, ws, Client, ClientRequest, ClientResponse, Connector}; use awc::{error::PayloadError, ws, Client, ClientRequest, ClientResponse, Connector};
use bytes::Bytes; use bytes::Bytes;
use futures::future::lazy; use futures::{Stream, future::lazy};
use futures::{Future, IntoFuture, Stream};
use http::Method; use http::Method;
use net2::TcpBuilder; use net2::TcpBuilder;
use tokio_tcp::TcpStream; use tokio_net::tcp::TcpStream;
thread_local! { pub use actix_testing::*;
static RT: RefCell<Inner> = {
RefCell::new(Inner(Some(Runtime::new().unwrap())))
};
}
struct Inner(Option<Runtime>);
impl Inner {
fn get_mut(&mut self) -> &mut Runtime {
self.0.as_mut().unwrap()
}
}
impl Drop for Inner {
fn drop(&mut self) {
std::mem::forget(self.0.take().unwrap())
}
}
/// Runs the provided future, blocking the current thread until the future
/// completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function is intended to be used only for testing purpose.
/// This function panics on nested call.
pub fn block_on<F>(f: F) -> Result<F::Item, F::Error>
where
F: IntoFuture,
{
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(f.into_future()))
}
/// Runs the provided function, blocking the current thread until the resul
/// future completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function is intended to be used only for testing purpose.
/// This function panics on nested call.
pub fn block_fn<F, R>(f: F) -> Result<R::Item, R::Error>
where
F: FnOnce() -> R,
R: IntoFuture,
{
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(f)))
}
/// The `TestServer` type. /// The `TestServer` type.
/// ///
@ -110,7 +55,7 @@ pub struct TestServerRuntime {
impl TestServer { impl TestServer {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
/// Start new test server with application factory /// Start new test server with application factory
pub fn new<F: StreamServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime { pub fn new<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread
@ -131,7 +76,7 @@ impl TestServer {
let (system, addr) = rx.recv().unwrap(); let (system, addr) = rx.recv().unwrap();
let client = block_on(lazy(move || { let client = block_on(lazy(move |_| {
let connector = { let connector = {
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
{ {
@ -161,9 +106,9 @@ impl TestServer {
})) }))
.unwrap(); .unwrap();
block_on(lazy( block_on(lazy(|_| {
|| Ok::<_, ()>(actix_connect::start_default_resolver()), Ok::<_, ()>(actix_connect::start_default_resolver())
)) }))
.unwrap(); .unwrap();
TestServerRuntime { TestServerRuntime {
@ -185,31 +130,6 @@ impl TestServer {
} }
impl TestServerRuntime { impl TestServerRuntime {
/// Execute future on current core
pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E>
where
F: Future<Item = I, Error = E>,
{
block_on(fut)
}
/// Execute future on current core
pub fn block_on_fn<F, R>(&mut self, f: F) -> Result<R::Item, R::Error>
where
F: FnOnce() -> R,
R: Future,
{
block_on(lazy(f))
}
/// Execute function on current core
pub fn execute<F, R>(&mut self, fut: F) -> R
where
F: FnOnce() -> R,
{
block_on(lazy(|| Ok::<_, ()>(fut()))).unwrap()
}
/// Construct test server url /// Construct test server url
pub fn addr(&self) -> net::SocketAddr { pub fn addr(&self) -> net::SocketAddr {
self.addr self.addr
@ -313,9 +233,9 @@ impl TestServerRuntime {
mut response: ClientResponse<S>, mut response: ClientResponse<S>,
) -> Result<Bytes, PayloadError> ) -> Result<Bytes, PayloadError>
where where
S: Stream<Item = Bytes, Error = PayloadError> + 'static, S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
{ {
self.block_on(response.body().limit(10_485_760)) block_on(response.body().limit(10_485_760))
} }
/// Connect to websocket server at a given path /// Connect to websocket server at a given path
@ -326,7 +246,9 @@ impl TestServerRuntime {
{ {
let url = self.url(path); let url = self.url(path);
let connect = self.client.ws(url).connect(); let connect = self.client.ws(url).connect();
block_on(lazy(move || connect.map(|(_, framed)| framed))) block_on(async move {
connect.await.map(|(_, framed)| framed)
})
} }
/// Connect to a websocket server /// Connect to a websocket server