1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

move high level client code to awc crate

This commit is contained in:
Nikolay Kim 2019-03-26 11:43:22 -07:00
parent 9037473e0f
commit 2c7da28ef9
17 changed files with 211 additions and 1126 deletions

View File

@ -6,11 +6,11 @@ use futures::Future;
use h2::client::SendRequest; use h2::client::SendRequest;
use crate::body::MessageBody; use crate::body::MessageBody;
use crate::message::RequestHead; use crate::message::{RequestHead, ResponseHead};
use crate::payload::Payload;
use super::error::SendRequestError; use super::error::SendRequestError;
use super::pool::Acquired; use super::pool::Acquired;
use super::response::ClientResponse;
use super::{h1proto, h2proto}; use super::{h1proto, h2proto};
pub(crate) enum ConnectionType<Io> { pub(crate) enum ConnectionType<Io> {
@ -19,7 +19,7 @@ pub(crate) enum ConnectionType<Io> {
} }
pub trait Connection { pub trait Connection {
type Future: Future<Item = ClientResponse, Error = SendRequestError>; type Future: Future<Item = (ResponseHead, Payload), Error = SendRequestError>;
/// Send request and body /// Send request and body
fn send_request<B: MessageBody + 'static>( fn send_request<B: MessageBody + 'static>(
@ -80,7 +80,7 @@ impl<T> Connection for IoConnection<T>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
{ {
type Future = Box<Future<Item = ClientResponse, Error = SendRequestError>>; type Future = Box<Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
fn send_request<B: MessageBody + 'static>( fn send_request<B: MessageBody + 'static>(
mut self, mut self,
@ -117,7 +117,7 @@ where
A: AsyncRead + AsyncWrite + 'static, A: AsyncRead + AsyncWrite + 'static,
B: AsyncRead + AsyncWrite + 'static, B: AsyncRead + AsyncWrite + 'static,
{ {
type Future = Box<Future<Item = ClientResponse, Error = SendRequestError>>; type Future = Box<Future<Item = (ResponseHead, Payload), Error = SendRequestError>>;
fn send_request<RB: MessageBody + 'static>( fn send_request<RB: MessageBody + 'static>(
self, self,

View File

@ -2,17 +2,18 @@ use std::{io, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::Bytes; use bytes::Bytes;
use futures::future::{err, ok, Either}; use futures::future::{ok, Either};
use futures::{Async, Future, Poll, Sink, Stream}; use futures::{Async, Future, Poll, Sink, Stream};
use crate::error::PayloadError;
use crate::h1;
use crate::message::{RequestHead, ResponseHead};
use crate::payload::{Payload, PayloadStream};
use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; use super::connection::{ConnectionLifetime, ConnectionType, IoConnection};
use super::error::{ConnectError, SendRequestError}; use super::error::{ConnectError, SendRequestError};
use super::pool::Acquired; use super::pool::Acquired;
use super::response::ClientResponse;
use crate::body::{BodyLength, MessageBody}; use crate::body::{BodyLength, MessageBody};
use crate::error::PayloadError;
use crate::h1;
use crate::message::RequestHead;
pub(crate) fn send_request<T, B>( pub(crate) fn send_request<T, B>(
io: T, io: T,
@ -20,7 +21,7 @@ pub(crate) fn send_request<T, B>(
body: B, body: B,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, pool: Option<Acquired<T>>,
) -> impl Future<Item = ClientResponse, Error = SendRequestError> ) -> impl Future<Item = (ResponseHead, Payload), Error = SendRequestError>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
B: MessageBody, B: MessageBody,
@ -50,19 +51,20 @@ where
.into_future() .into_future()
.map_err(|(e, _)| SendRequestError::from(e)) .map_err(|(e, _)| SendRequestError::from(e))
.and_then(|(item, framed)| { .and_then(|(item, framed)| {
if let Some(mut res) = item { if let Some(res) = item {
match framed.get_codec().message_type() { match framed.get_codec().message_type() {
h1::MessageType::None => { h1::MessageType::None => {
let force_close = !framed.get_codec().keepalive(); let force_close = !framed.get_codec().keepalive();
release_connection(framed, force_close) release_connection(framed, force_close);
Ok((res, Payload::None))
} }
_ => { _ => {
res.set_payload(Payload::stream(framed).into()); let pl: PayloadStream = Box::new(PlStream::new(framed));
Ok((res, pl.into()))
} }
} }
ok(res)
} else { } else {
err(ConnectError::Disconnected.into()) Err(ConnectError::Disconnected.into())
} }
}) })
}) })
@ -199,21 +201,19 @@ where
} }
} }
pub(crate) struct Payload<Io> { pub(crate) struct PlStream<Io> {
framed: Option<Framed<Io, h1::ClientPayloadCodec>>, framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
} }
impl<Io: ConnectionLifetime> Payload<Io> { impl<Io: ConnectionLifetime> PlStream<Io> {
pub fn stream( fn new(framed: Framed<Io, h1::ClientCodec>) -> Self {
framed: Framed<Io, h1::ClientCodec>, PlStream {
) -> Box<Stream<Item = Bytes, Error = PayloadError>> {
Box::new(Payload {
framed: Some(framed.map_codec(|codec| codec.into_payload_codec())), framed: Some(framed.map_codec(|codec| codec.into_payload_codec())),
}) }
} }
} }
impl<Io: ConnectionLifetime> Stream for Payload<Io> { impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
type Item = Bytes; type Item = Bytes;
type Error = PayloadError; type Error = PayloadError;

View File

@ -9,13 +9,12 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, HttpTryFrom, Method, Version}; use http::{request::Request, HttpTryFrom, Method, Version};
use crate::body::{BodyLength, MessageBody}; use crate::body::{BodyLength, MessageBody};
use crate::message::{Message, RequestHead, ResponseHead}; use crate::message::{RequestHead, ResponseHead};
use crate::payload::Payload; use crate::payload::Payload;
use super::connection::{ConnectionType, IoConnection}; use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError; use super::error::SendRequestError;
use super::pool::Acquired; use super::pool::Acquired;
use super::response::ClientResponse;
pub(crate) fn send_request<T, B>( pub(crate) fn send_request<T, B>(
io: SendRequest<Bytes>, io: SendRequest<Bytes>,
@ -23,7 +22,7 @@ pub(crate) fn send_request<T, B>(
body: B, body: B,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, pool: Option<Acquired<T>>,
) -> impl Future<Item = ClientResponse, Error = SendRequestError> ) -> impl Future<Item = (ResponseHead, Payload), Error = SendRequestError>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
B: MessageBody, B: MessageBody,
@ -105,12 +104,12 @@ where
let (parts, body) = resp.into_parts(); let (parts, body) = resp.into_parts();
let payload = if head_req { Payload::None } else { body.into() }; let payload = if head_req { Payload::None } else { body.into() };
let mut head: Message<ResponseHead> = Message::new(); let mut head = ResponseHead::default();
head.version = parts.version; head.version = parts.version;
head.status = parts.status; head.status = parts.status;
head.headers = parts.headers; head.headers = parts.headers;
Ok(ClientResponse { head, payload }) Ok((head, payload))
}) })
.from_err() .from_err()
} }

View File

@ -5,11 +5,7 @@ mod error;
mod h1proto; mod h1proto;
mod h2proto; mod h2proto;
mod pool; mod pool;
mod request;
mod response;
pub use self::connection::Connection; pub use self::connection::Connection;
pub use self::connector::Connector; pub use self::connector::Connector;
pub use self::error::{ConnectError, InvalidUrl, SendRequestError}; pub use self::error::{ConnectError, InvalidUrl, SendRequestError};
pub use self::request::{ClientRequest, ClientRequestBuilder};
pub use self::response::ClientResponse;

View File

@ -1,699 +0,0 @@
use std::fmt;
use std::io::Write;
use actix_service::Service;
use bytes::{BufMut, Bytes, BytesMut};
#[cfg(feature = "cookies")]
use cookie::{Cookie, CookieJar};
use futures::future::{err, Either};
use futures::{Future, Stream};
use serde::Serialize;
use serde_json;
use crate::body::{BodyStream, MessageBody};
use crate::error::Error;
use crate::header::{self, Header, IntoHeaderValue};
use crate::http::{
uri, Error as HttpError, HeaderMap, HeaderName, HeaderValue, HttpTryFrom, Method,
Uri, Version,
};
use crate::message::{ConnectionType, Head, RequestHead};
use super::connection::Connection;
use super::error::{ConnectError, InvalidUrl, SendRequestError};
use super::response::ClientResponse;
/// An HTTP Client Request
///
/// ```rust
/// use futures::future::{Future, lazy};
/// use actix_rt::System;
/// use actix_http::client;
///
/// fn main() {
/// System::new("test").block_on(lazy(|| {
/// let mut connector = client::Connector::new().service();
///
/// client::ClientRequest::get("http://www.rust-lang.org") // <- Create request builder
/// .header("User-Agent", "Actix-web")
/// .finish().unwrap()
/// .send(&mut connector) // <- Send http request
/// .map_err(|_| ())
/// .and_then(|response| { // <- server http response
/// println!("Response: {:?}", response);
/// Ok(())
/// })
/// }));
/// }
/// ```
pub struct ClientRequest<B: MessageBody = ()> {
head: RequestHead,
body: B,
}
impl ClientRequest<()> {
/// Create client request builder
pub fn build() -> ClientRequestBuilder {
ClientRequestBuilder {
head: Some(RequestHead::default()),
err: None,
#[cfg(feature = "cookies")]
cookies: None,
default_headers: true,
}
}
/// Create request builder for `GET` request
pub fn get<U>(uri: U) -> ClientRequestBuilder
where
Uri: HttpTryFrom<U>,
{
let mut builder = ClientRequest::build();
builder.method(Method::GET).uri(uri);
builder
}
/// Create request builder for `HEAD` request
pub fn head<U>(uri: U) -> ClientRequestBuilder
where
Uri: HttpTryFrom<U>,
{
let mut builder = ClientRequest::build();
builder.method(Method::HEAD).uri(uri);
builder
}
/// Create request builder for `POST` request
pub fn post<U>(uri: U) -> ClientRequestBuilder
where
Uri: HttpTryFrom<U>,
{
let mut builder = ClientRequest::build();
builder.method(Method::POST).uri(uri);
builder
}
/// Create request builder for `PUT` request
pub fn put<U>(uri: U) -> ClientRequestBuilder
where
Uri: HttpTryFrom<U>,
{
let mut builder = ClientRequest::build();
builder.method(Method::PUT).uri(uri);
builder
}
/// Create request builder for `DELETE` request
pub fn delete<U>(uri: U) -> ClientRequestBuilder
where
Uri: HttpTryFrom<U>,
{
let mut builder = ClientRequest::build();
builder.method(Method::DELETE).uri(uri);
builder
}
}
impl<B> ClientRequest<B>
where
B: MessageBody,
{
/// Create new client request
pub fn new(head: RequestHead, body: B) -> Self {
ClientRequest { head, body }
}
/// Get the request URI
#[inline]
pub fn uri(&self) -> &Uri {
&self.head.uri
}
/// Set client request URI
#[inline]
pub fn set_uri(&mut self, uri: Uri) {
self.head.uri = uri
}
/// Get the request method
#[inline]
pub fn method(&self) -> &Method {
&self.head.method
}
/// Set HTTP `Method` for the request
#[inline]
pub fn set_method(&mut self, method: Method) {
self.head.method = method
}
/// Get HTTP version for the request
#[inline]
pub fn version(&self) -> Version {
self.head.version
}
/// Set http `Version` for the request
#[inline]
pub fn set_version(&mut self, version: Version) {
self.head.version = version
}
/// Get the headers from the request
#[inline]
pub fn headers(&self) -> &HeaderMap {
&self.head.headers
}
/// Get a mutable reference to the headers
#[inline]
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.head.headers
}
/// Deconstruct ClientRequest to a RequestHead and body tuple
pub fn into_parts(self) -> (RequestHead, B) {
(self.head, self.body)
}
// Send request
///
/// This method returns a future that resolves to a ClientResponse
pub fn send<T>(
self,
connector: &mut T,
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
where
B: 'static,
T: Service<Request = Uri, Error = ConnectError>,
T::Response: Connection,
{
let Self { head, body } = self;
let uri = head.uri.clone();
// validate uri
if uri.host().is_none() {
Either::A(err(InvalidUrl::MissingHost.into()))
} else if uri.scheme_part().is_none() {
Either::A(err(InvalidUrl::MissingScheme.into()))
} else if let Some(scheme) = uri.scheme_part() {
match scheme.as_str() {
"http" | "ws" | "https" | "wss" => Either::B(
connector
// connect to the host
.call(uri)
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, body)),
),
_ => Either::A(err(InvalidUrl::UnknownScheme.into())),
}
} else {
Either::A(err(InvalidUrl::UnknownScheme.into()))
}
}
}
impl<B> fmt::Debug for ClientRequest<B>
where
B: MessageBody,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(
f,
"\nClientRequest {:?} {}:{}",
self.head.version, self.head.method, self.head.uri
)?;
writeln!(f, " headers:")?;
for (key, val) in self.head.headers.iter() {
writeln!(f, " {:?}: {:?}", key, val)?;
}
Ok(())
}
}
/// An HTTP Client request builder
///
/// This type can be used to construct an instance of `ClientRequest` through a
/// builder-like pattern.
pub struct ClientRequestBuilder {
head: Option<RequestHead>,
err: Option<HttpError>,
#[cfg(feature = "cookies")]
cookies: Option<CookieJar>,
default_headers: bool,
}
impl ClientRequestBuilder {
/// Set HTTP URI of request.
#[inline]
pub fn uri<U>(&mut self, uri: U) -> &mut Self
where
Uri: HttpTryFrom<U>,
{
match Uri::try_from(uri) {
Ok(uri) => {
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.uri = uri;
}
}
Err(e) => self.err = Some(e.into()),
}
self
}
/// Set HTTP method of this request.
#[inline]
pub fn method(&mut self, method: Method) -> &mut Self {
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.method = method;
}
self
}
/// Set HTTP method of this request.
#[inline]
pub fn get_method(&mut self) -> &Method {
let parts = self.head.as_ref().expect("cannot reuse request builder");
&parts.method
}
/// Set HTTP version of this request.
///
/// By default requests's HTTP version depends on network stream
#[inline]
pub fn version(&mut self, version: Version) -> &mut Self {
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.version = version;
}
self
}
/// Set a header.
///
/// ```rust
/// # extern crate mime;
/// # extern crate actix_http;
/// #
/// use actix_http::{client, http};
///
/// fn main() {
/// let req = client::ClientRequest::build()
/// .set(http::header::Date::now())
/// .set(http::header::ContentType(mime::TEXT_HTML))
/// .finish()
/// .unwrap();
/// }
/// ```
pub fn set<H: Header>(&mut self, hdr: H) -> &mut Self {
if let Some(parts) = parts(&mut self.head, &self.err) {
match hdr.try_into() {
Ok(value) => {
parts.headers.insert(H::name(), value);
}
Err(e) => self.err = Some(e.into()),
}
}
self
}
/// Append a header.
///
/// Header gets appended to existing header.
/// To override header use `set_header()` method.
///
/// ```rust
/// # extern crate actix_http;
/// #
/// use actix_http::{client, http};
///
/// fn main() {
/// let req = client::ClientRequest::build()
/// .header("X-TEST", "value")
/// .header(http::header::CONTENT_TYPE, "application/json")
/// .finish()
/// .unwrap();
/// }
/// ```
pub fn header<K, V>(&mut self, key: K, value: V) -> &mut Self
where
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
if let Some(parts) = parts(&mut self.head, &self.err) {
match HeaderName::try_from(key) {
Ok(key) => match value.try_into() {
Ok(value) => {
parts.headers.append(key, value);
}
Err(e) => self.err = Some(e.into()),
},
Err(e) => self.err = Some(e.into()),
};
}
self
}
/// Set a header.
pub fn set_header<K, V>(&mut self, key: K, value: V) -> &mut Self
where
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
if let Some(parts) = parts(&mut self.head, &self.err) {
match HeaderName::try_from(key) {
Ok(key) => match value.try_into() {
Ok(value) => {
parts.headers.insert(key, value);
}
Err(e) => self.err = Some(e.into()),
},
Err(e) => self.err = Some(e.into()),
};
}
self
}
/// Set a header only if it is not yet set.
pub fn set_header_if_none<K, V>(&mut self, key: K, value: V) -> &mut Self
where
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
if let Some(parts) = parts(&mut self.head, &self.err) {
match HeaderName::try_from(key) {
Ok(key) => {
if !parts.headers.contains_key(&key) {
match value.try_into() {
Ok(value) => {
parts.headers.insert(key, value);
}
Err(e) => self.err = Some(e.into()),
}
}
}
Err(e) => self.err = Some(e.into()),
};
}
self
}
/// Enable connection upgrade
#[inline]
pub fn upgrade<V>(&mut self, value: V) -> &mut Self
where
V: IntoHeaderValue,
{
{
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.set_connection_type(ConnectionType::Upgrade);
}
}
self.set_header(header::UPGRADE, value)
}
/// Close connection
#[inline]
pub fn close(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.set_connection_type(ConnectionType::Close);
}
self
}
/// Set request's content type
#[inline]
pub fn content_type<V>(&mut self, value: V) -> &mut Self
where
HeaderValue: HttpTryFrom<V>,
{
if let Some(parts) = parts(&mut self.head, &self.err) {
match HeaderValue::try_from(value) {
Ok(value) => {
parts.headers.insert(header::CONTENT_TYPE, value);
}
Err(e) => self.err = Some(e.into()),
};
}
self
}
/// Set content length
#[inline]
pub fn content_length(&mut self, len: u64) -> &mut Self {
let mut wrt = BytesMut::new().writer();
let _ = write!(wrt, "{}", len);
self.header(header::CONTENT_LENGTH, wrt.get_mut().take().freeze())
}
#[cfg(feature = "cookies")]
/// Set a cookie
///
/// ```rust
/// # extern crate actix_http;
/// use actix_http::{client, http};
///
/// fn main() {
/// let req = client::ClientRequest::build()
/// .cookie(
/// http::Cookie::build("name", "value")
/// .domain("www.rust-lang.org")
/// .path("/")
/// .secure(true)
/// .http_only(true)
/// .finish(),
/// )
/// .finish()
/// .unwrap();
/// }
/// ```
pub fn cookie<'c>(&mut self, cookie: Cookie<'c>) -> &mut Self {
if self.cookies.is_none() {
let mut jar = CookieJar::new();
jar.add(cookie.into_owned());
self.cookies = Some(jar)
} else {
self.cookies.as_mut().unwrap().add(cookie.into_owned());
}
self
}
/// Do not add default request headers.
/// By default `Accept-Encoding` and `User-Agent` headers are set.
pub fn no_default_headers(&mut self) -> &mut Self {
self.default_headers = false;
self
}
/// This method calls provided closure with builder reference if
/// value is `true`.
pub fn if_true<F>(&mut self, value: bool, f: F) -> &mut Self
where
F: FnOnce(&mut ClientRequestBuilder),
{
if value {
f(self);
}
self
}
/// This method calls provided closure with builder reference if
/// value is `Some`.
pub fn if_some<T, F>(&mut self, value: Option<T>, f: F) -> &mut Self
where
F: FnOnce(T, &mut ClientRequestBuilder),
{
if let Some(val) = value {
f(val, self);
}
self
}
/// Set a body and generate `ClientRequest`.
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn body<B: MessageBody>(
&mut self,
body: B,
) -> Result<ClientRequest<B>, HttpError> {
if let Some(e) = self.err.take() {
return Err(e);
}
if self.default_headers {
// enable br only for https
let https = if let Some(parts) = parts(&mut self.head, &self.err) {
parts
.uri
.scheme_part()
.map(|s| s == &uri::Scheme::HTTPS)
.unwrap_or(true)
} else {
true
};
if https {
self.set_header_if_none(header::ACCEPT_ENCODING, "br, gzip, deflate");
} else {
self.set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate");
}
// set request host header
if let Some(parts) = parts(&mut self.head, &self.err) {
if let Some(host) = parts.uri.host() {
if !parts.headers.contains_key(header::HOST) {
let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
let _ = match parts.uri.port_u16() {
None | Some(80) | Some(443) => write!(wrt, "{}", host),
Some(port) => write!(wrt, "{}:{}", host, port),
};
match wrt.get_mut().take().freeze().try_into() {
Ok(value) => {
parts.headers.insert(header::HOST, value);
}
Err(e) => self.err = Some(e.into()),
}
}
}
}
// user agent
self.set_header_if_none(
header::USER_AGENT,
concat!("actix-http/", env!("CARGO_PKG_VERSION")),
);
}
#[allow(unused_mut)]
let mut head = self.head.take().expect("cannot reuse request builder");
#[cfg(feature = "cookies")]
{
use percent_encoding::{percent_encode, USERINFO_ENCODE_SET};
use std::fmt::Write;
// set cookies
if let Some(ref mut jar) = self.cookies {
let mut cookie = String::new();
for c in jar.delta() {
let name = percent_encode(c.name().as_bytes(), USERINFO_ENCODE_SET);
let value =
percent_encode(c.value().as_bytes(), USERINFO_ENCODE_SET);
let _ = write!(&mut cookie, "; {}={}", name, value);
}
head.headers.insert(
header::COOKIE,
HeaderValue::from_str(&cookie.as_str()[2..]).unwrap(),
);
}
}
Ok(ClientRequest { head, body })
}
/// Set a JSON body and generate `ClientRequest`
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn json<T: Serialize>(
&mut self,
value: T,
) -> Result<ClientRequest<String>, Error> {
let body = serde_json::to_string(&value)?;
let contains = if let Some(head) = parts(&mut self.head, &self.err) {
head.headers.contains_key(header::CONTENT_TYPE)
} else {
true
};
if !contains {
self.header(header::CONTENT_TYPE, "application/json");
}
Ok(self.body(body)?)
}
/// Set a urlencoded body and generate `ClientRequest`
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn form<T: Serialize>(
&mut self,
value: T,
) -> Result<ClientRequest<String>, Error> {
let body = serde_urlencoded::to_string(&value)?;
let contains = if let Some(head) = parts(&mut self.head, &self.err) {
head.headers.contains_key(header::CONTENT_TYPE)
} else {
true
};
if !contains {
self.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded");
}
Ok(self.body(body)?)
}
/// Set an streaming body and generate `ClientRequest`.
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn stream<S, E>(
&mut self,
stream: S,
) -> Result<ClientRequest<impl MessageBody>, HttpError>
where
S: Stream<Item = Bytes, Error = E>,
E: Into<Error> + 'static,
{
self.body(BodyStream::new(stream))
}
/// Set an empty body and generate `ClientRequest`.
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn finish(&mut self) -> Result<ClientRequest<()>, HttpError> {
self.body(())
}
/// This method construct new `ClientRequestBuilder`
pub fn take(&mut self) -> ClientRequestBuilder {
ClientRequestBuilder {
head: self.head.take(),
err: self.err.take(),
#[cfg(feature = "cookies")]
cookies: self.cookies.take(),
default_headers: self.default_headers,
}
}
}
#[inline]
fn parts<'a>(
parts: &'a mut Option<RequestHead>,
err: &Option<HttpError>,
) -> Option<&'a mut RequestHead> {
if err.is_some() {
return None;
}
parts.as_mut()
}
impl fmt::Debug for ClientRequestBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref parts) = self.head {
writeln!(
f,
"\nClientRequestBuilder {:?} {}:{}",
parts.version, parts.method, parts.uri
)?;
writeln!(f, " headers:")?;
for (key, val) in parts.headers.iter() {
writeln!(f, " {:?}: {:?}", key, val)?;
}
Ok(())
} else {
write!(f, "ClientRequestBuilder(Consumed)")
}
}
}

View File

@ -1,116 +0,0 @@
use std::cell::{Ref, RefMut};
use std::fmt;
use bytes::Bytes;
use futures::{Poll, Stream};
use http::{HeaderMap, StatusCode, Version};
use crate::error::PayloadError;
use crate::extensions::Extensions;
use crate::httpmessage::HttpMessage;
use crate::message::{Head, Message, ResponseHead};
use crate::payload::{Payload, PayloadStream};
/// Client Response
pub struct ClientResponse {
pub(crate) head: Message<ResponseHead>,
pub(crate) payload: Payload,
}
impl HttpMessage for ClientResponse {
type Stream = PayloadStream;
fn headers(&self) -> &HeaderMap {
&self.head.headers
}
fn extensions(&self) -> Ref<Extensions> {
self.head.extensions()
}
fn extensions_mut(&self) -> RefMut<Extensions> {
self.head.extensions_mut()
}
fn take_payload(&mut self) -> Payload {
std::mem::replace(&mut self.payload, Payload::None)
}
}
impl ClientResponse {
/// Create new Request instance
pub fn new() -> ClientResponse {
let head: Message<ResponseHead> = Message::new();
head.extensions_mut().clear();
ClientResponse {
head,
payload: Payload::None,
}
}
#[inline]
pub(crate) fn head(&self) -> &ResponseHead {
&self.head
}
#[inline]
pub(crate) fn head_mut(&mut self) -> &mut ResponseHead {
&mut self.head
}
/// Read the Request Version.
#[inline]
pub fn version(&self) -> Version {
self.head().version
}
/// Get the status from the server.
#[inline]
pub fn status(&self) -> StatusCode {
self.head().status
}
#[inline]
/// Returns Request's headers.
pub fn headers(&self) -> &HeaderMap {
&self.head().headers
}
#[inline]
/// Returns mutable Request's headers.
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.head_mut().headers
}
/// Checks if a connection should be kept alive.
#[inline]
pub fn keep_alive(&self) -> bool {
self.head().keep_alive()
}
/// Set response payload
pub fn set_payload(&mut self, payload: Payload) {
self.payload = payload;
}
}
impl Stream for ClientResponse {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.payload.poll()
}
}
impl fmt::Debug for ClientResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
writeln!(f, " headers:")?;
for (key, val) in self.headers().iter() {
writeln!(f, " {:?}: {:?}", key, val)?;
}
Ok(())
}
}

View File

@ -13,11 +13,10 @@ use super::decoder::{PayloadDecoder, PayloadItem, PayloadType};
use super::{decoder, encoder}; use super::{decoder, encoder};
use super::{Message, MessageType}; use super::{Message, MessageType};
use crate::body::BodyLength; use crate::body::BodyLength;
use crate::client::ClientResponse;
use crate::config::ServiceConfig; use crate::config::ServiceConfig;
use crate::error::{ParseError, PayloadError}; use crate::error::{ParseError, PayloadError};
use crate::helpers; use crate::helpers;
use crate::message::{ConnectionType, Head, MessagePool, RequestHead}; use crate::message::{ConnectionType, Head, MessagePool, RequestHead, ResponseHead};
bitflags! { bitflags! {
struct Flags: u8 { struct Flags: u8 {
@ -41,7 +40,7 @@ pub struct ClientPayloadCodec {
struct ClientCodecInner { struct ClientCodecInner {
config: ServiceConfig, config: ServiceConfig,
decoder: decoder::MessageDecoder<ClientResponse>, decoder: decoder::MessageDecoder<ResponseHead>,
payload: Option<PayloadDecoder>, payload: Option<PayloadDecoder>,
version: Version, version: Version,
ctype: ConnectionType, ctype: ConnectionType,
@ -123,14 +122,14 @@ impl ClientPayloadCodec {
} }
impl Decoder for ClientCodec { impl Decoder for ClientCodec {
type Item = ClientResponse; type Item = ResponseHead;
type Error = ParseError; type Error = ParseError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set"); debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set");
if let Some((req, payload)) = self.inner.decoder.decode(src)? { if let Some((req, payload)) = self.inner.decoder.decode(src)? {
if let Some(ctype) = req.head().ctype { if let Some(ctype) = req.ctype {
// do not use peer's keep-alive // do not use peer's keep-alive
self.inner.ctype = if ctype == ConnectionType::KeepAlive { self.inner.ctype = if ctype == ConnectionType::KeepAlive {
self.inner.ctype self.inner.ctype

View File

@ -9,9 +9,8 @@ use http::{header, HeaderMap, HttpTryFrom, Method, StatusCode, Uri, Version};
use httparse; use httparse;
use log::{debug, error, trace}; use log::{debug, error, trace};
use crate::client::ClientResponse;
use crate::error::ParseError; use crate::error::ParseError;
use crate::message::ConnectionType; use crate::message::{ConnectionType, ResponseHead};
use crate::request::Request; use crate::request::Request;
const MAX_BUFFER_SIZE: usize = 131_072; const MAX_BUFFER_SIZE: usize = 131_072;
@ -227,13 +226,13 @@ impl MessageType for Request {
} }
} }
impl MessageType for ClientResponse { impl MessageType for ResponseHead {
fn set_connection_type(&mut self, ctype: Option<ConnectionType>) { fn set_connection_type(&mut self, ctype: Option<ConnectionType>) {
self.head.ctype = ctype; self.ctype = ctype;
} }
fn headers_mut(&mut self) -> &mut HeaderMap { fn headers_mut(&mut self) -> &mut HeaderMap {
self.headers_mut() &mut self.headers
} }
fn decode(src: &mut BytesMut) -> Result<Option<(Self, PayloadType)>, ParseError> { fn decode(src: &mut BytesMut) -> Result<Option<(Self, PayloadType)>, ParseError> {
@ -263,7 +262,7 @@ impl MessageType for ClientResponse {
} }
}; };
let mut msg = ClientResponse::new(); let mut msg = ResponseHead::default();
// convert headers // convert headers
let len = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len])?; let len = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len])?;
@ -281,8 +280,8 @@ impl MessageType for ClientResponse {
PayloadType::None PayloadType::None
}; };
msg.head.status = status; msg.status = status;
msg.head.version = ver; msg.version = ver;
Ok(Some((msg, decoder))) Ok(Some((msg, decoder)))
} }

View File

@ -553,7 +553,7 @@ mod tests {
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::IntoService; use actix_service::IntoService;
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes};
use futures::future::{lazy, ok}; use futures::future::{lazy, ok};
use super::*; use super::*;

View File

@ -1,64 +1,4 @@
//! Actix web is a small, pragmatic, and extremely fast web framework //! Basic http primitives for actix-net framework.
//! for Rust.
//!
//! ```rust,ignore
//! use actix_web::{server, App, Path, Responder};
//! # use std::thread;
//!
//! fn index(info: Path<(String, u32)>) -> impl Responder {
//! format!("Hello {}! id:{}", info.0, info.1)
//! }
//!
//! fn main() {
//! # thread::spawn(|| {
//! server::new(|| {
//! App::new().resource("/{name}/{id}/index.html", |r| r.with(index))
//! }).bind("127.0.0.1:8080")
//! .unwrap()
//! .run();
//! # });
//! }
//! ```
//!
//! ## Documentation & community resources
//!
//! Besides the API documentation (which you are currently looking
//! at!), several other resources are available:
//!
//! * [User Guide](https://actix.rs/docs/)
//! * [Chat on gitter](https://gitter.im/actix/actix)
//! * [GitHub repository](https://github.com/actix/actix-web)
//! * [Cargo package](https://crates.io/crates/actix-web)
//!
//! To get started navigating the API documentation you may want to
//! consider looking at the following pages:
//!
//! * [App](struct.App.html): This struct represents an actix-web
//! application and is used to configure routes and other common
//! settings.
//!
//! * [HttpServer](server/struct.HttpServer.html): This struct
//! represents an HTTP server instance and is used to instantiate and
//! configure servers.
//!
//! * [Request](struct.Request.html) and
//! [Response](struct.Response.html): These structs
//! represent HTTP requests and responses and expose various methods
//! for inspecting, creating and otherwise utilizing them.
//!
//! ## Features
//!
//! * Supported *HTTP/1.x* protocol
//! * Streaming and pipelining
//! * Keep-alive and slow requests handling
//! * `WebSockets` server/client
//! * Supported Rust version: 1.26 or later
//!
//! ## Package feature
//!
//! * `session` - enables session support, includes `ring` crate as
//! dependency
//!
#![allow( #![allow(
clippy::type_complexity, clippy::type_complexity,
clippy::new_without_default, clippy::new_without_default,

View File

@ -1042,8 +1042,6 @@ mod tests {
#[test] #[test]
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
fn test_into_builder() { fn test_into_builder() {
use crate::httpmessage::HttpMessage;
let mut resp: Response = "test".into(); let mut resp: Response = "test".into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);

View File

@ -4,15 +4,15 @@ use std::str;
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
use cookie::Cookie; use cookie::Cookie;
use http::header::{HeaderName, HeaderValue}; use http::header::{HeaderName, HeaderValue};
use http::{Error as HttpError, HttpTryFrom}; use http::{Error as HttpError, HttpTryFrom, Uri};
use super::ClientError; use super::ClientError;
use crate::client::{ClientRequest, ClientRequestBuilder};
use crate::header::IntoHeaderValue; use crate::header::IntoHeaderValue;
use crate::message::RequestHead;
/// `WebSocket` connection /// `WebSocket` connection
pub struct Connect { pub struct Connect {
pub(super) request: ClientRequestBuilder, pub(super) head: RequestHead,
pub(super) err: Option<ClientError>, pub(super) err: Option<ClientError>,
pub(super) http_err: Option<HttpError>, pub(super) http_err: Option<HttpError>,
pub(super) origin: Option<HeaderValue>, pub(super) origin: Option<HeaderValue>,
@ -25,7 +25,7 @@ impl Connect {
/// Create new websocket connection /// Create new websocket connection
pub fn new<S: AsRef<str>>(uri: S) -> Connect { pub fn new<S: AsRef<str>>(uri: S) -> Connect {
let mut cl = Connect { let mut cl = Connect {
request: ClientRequest::build(), head: RequestHead::default(),
err: None, err: None,
http_err: None, http_err: None,
origin: None, origin: None,
@ -33,7 +33,12 @@ impl Connect {
max_size: 65_536, max_size: 65_536,
server_mode: false, server_mode: false,
}; };
cl.request.uri(uri.as_ref());
match Uri::try_from(uri.as_ref()) {
Ok(uri) => cl.head.uri = uri,
Err(e) => cl.http_err = Some(e.into()),
}
cl cl
} }
@ -51,12 +56,12 @@ impl Connect {
self self
} }
#[cfg(feature = "cookies")] // #[cfg(feature = "cookies")]
/// Set cookie for handshake request // /// Set cookie for handshake request
pub fn cookie(mut self, cookie: Cookie) -> Self { // pub fn cookie(mut self, cookie: Cookie) -> Self {
self.request.cookie(cookie); // self.request.cookie(cookie);
self // self
} // }
/// Set request Origin /// Set request Origin
pub fn origin<V>(mut self, origin: V) -> Self pub fn origin<V>(mut self, origin: V) -> Self
@ -90,7 +95,15 @@ impl Connect {
HeaderName: HttpTryFrom<K>, HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue, V: IntoHeaderValue,
{ {
self.request.header(key, value); match HeaderName::try_from(key) {
Ok(key) => match value.try_into() {
Ok(value) => {
self.head.headers.append(key, value);
}
Err(e) => self.http_err = Some(e.into()),
},
Err(e) => self.http_err = Some(e.into()),
}
self self
} }
} }

View File

@ -14,8 +14,8 @@ use rand;
use sha1::Sha1; use sha1::Sha1;
use crate::body::BodyLength; use crate::body::BodyLength;
use crate::client::ClientResponse;
use crate::h1; use crate::h1;
use crate::message::{ConnectionType, Head, ResponseHead};
use crate::ws::Codec; use crate::ws::Codec;
use super::{ClientError, Connect, Protocol}; use super::{ClientError, Connect, Protocol};
@ -89,27 +89,35 @@ where
} else { } else {
// origin // origin
if let Some(origin) = req.origin.take() { if let Some(origin) = req.origin.take() {
req.request.set_header(header::ORIGIN, origin); req.head.headers.insert(header::ORIGIN, origin);
} }
req.request.upgrade("websocket"); req.head.set_connection_type(ConnectionType::Upgrade);
req.request.set_header(header::SEC_WEBSOCKET_VERSION, "13"); req.head
.headers
.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
req.head.headers.insert(
header::SEC_WEBSOCKET_VERSION,
HeaderValue::from_static("13"),
);
if let Some(protocols) = req.protocols.take() { if let Some(protocols) = req.protocols.take() {
req.request req.head.headers.insert(
.set_header(header::SEC_WEBSOCKET_PROTOCOL, protocols.as_str()); header::SEC_WEBSOCKET_PROTOCOL,
HeaderValue::try_from(protocols.as_str()).unwrap(),
);
} }
let mut request = match req.request.finish() { if let Some(e) = req.http_err {
Ok(req) => req, return Either::A(err(e.into()));
Err(e) => return Either::A(err(e.into())),
}; };
if request.uri().host().is_none() { let mut request = req.head;
if request.uri.host().is_none() {
return Either::A(err(ClientError::InvalidUrl)); return Either::A(err(ClientError::InvalidUrl));
} }
// supported protocols // supported protocols
let proto = if let Some(scheme) = request.uri().scheme_part() { let proto = if let Some(scheme) = request.uri.scheme_part() {
match Protocol::from(scheme.as_str()) { match Protocol::from(scheme.as_str()) {
Some(proto) => proto, Some(proto) => proto,
None => return Either::A(err(ClientError::InvalidUrl)), None => return Either::A(err(ClientError::InvalidUrl)),
@ -124,14 +132,14 @@ where
let sec_key: [u8; 16] = rand::random(); let sec_key: [u8; 16] = rand::random();
let key = base64::encode(&sec_key); let key = base64::encode(&sec_key);
request.headers_mut().insert( request.headers.insert(
header::SEC_WEBSOCKET_KEY, header::SEC_WEBSOCKET_KEY,
HeaderValue::try_from(key.as_str()).unwrap(), HeaderValue::try_from(key.as_str()).unwrap(),
); );
// prep connection // prep connection
let connect = TcpConnect::new(request.uri().host().unwrap().to_string()) let connect = TcpConnect::new(request.uri.host().unwrap().to_string())
.set_port(request.uri().port_u16().unwrap_or_else(|| proto.port())); .set_port(request.uri.port_u16().unwrap_or_else(|| proto.port()));
let fut = Box::new( let fut = Box::new(
self.connector self.connector
@ -141,7 +149,7 @@ where
// h1 protocol // h1 protocol
let framed = Framed::new(io, h1::ClientCodec::default()); let framed = Framed::new(io, h1::ClientCodec::default());
framed framed
.send((request.into_parts().0, BodyLength::None).into()) .send((request, BodyLength::None).into())
.map_err(ClientError::from) .map_err(ClientError::from)
.and_then(|framed| { .and_then(|framed| {
framed framed
@ -172,7 +180,7 @@ where
{ {
fut: Box< fut: Box<
Future< Future<
Item = (Option<ClientResponse>, Framed<T, h1::ClientCodec>), Item = (Option<ResponseHead>, Framed<T, h1::ClientCodec>),
Error = ClientError, Error = ClientError,
>, >,
>, >,
@ -198,11 +206,11 @@ where
}; };
// verify response // verify response
if res.status() != StatusCode::SWITCHING_PROTOCOLS { if res.status != StatusCode::SWITCHING_PROTOCOLS {
return Err(ClientError::InvalidResponseStatus(res.status())); return Err(ClientError::InvalidResponseStatus(res.status));
} }
// Check for "UPGRADE" to websocket header // Check for "UPGRADE" to websocket header
let has_hdr = if let Some(hdr) = res.headers().get(header::UPGRADE) { let has_hdr = if let Some(hdr) = res.headers.get(header::UPGRADE) {
if let Ok(s) = hdr.to_str() { if let Ok(s) = hdr.to_str() {
s.to_lowercase().contains("websocket") s.to_lowercase().contains("websocket")
} else { } else {
@ -216,7 +224,7 @@ where
return Err(ClientError::InvalidUpgradeHeader); return Err(ClientError::InvalidUpgradeHeader);
} }
// Check for "CONNECTION" header // Check for "CONNECTION" header
if let Some(conn) = res.headers().get(header::CONNECTION) { if let Some(conn) = res.headers.get(header::CONNECTION) {
if let Ok(s) = conn.to_str() { if let Ok(s) = conn.to_str() {
if !s.to_lowercase().contains("upgrade") { if !s.to_lowercase().contains("upgrade") {
trace!("Invalid connection header: {}", s); trace!("Invalid connection header: {}", s);
@ -231,7 +239,7 @@ where
return Err(ClientError::MissingConnectionHeader); return Err(ClientError::MissingConnectionHeader);
} }
if let Some(key) = res.headers().get(header::SEC_WEBSOCKET_ACCEPT) { if let Some(key) = res.headers.get(header::SEC_WEBSOCKET_ACCEPT) {
// field is constructed by concatenating /key/ // field is constructed by concatenating /key/
// with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (RFC 6455) // with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" (RFC 6455)
const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; const WS_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

View File

@ -29,7 +29,7 @@ default = ["session"]
session = ["cookie/secure"] session = ["cookie/secure"]
# openssl # openssl
ssl = ["openssl", "actix-http/ssl", "actix-server/ssl"] ssl = ["openssl", "actix-http/ssl", "actix-server/ssl", "awc/ssl"]
[dependencies] [dependencies]
actix-codec = "0.1.1" actix-codec = "0.1.1"
@ -38,6 +38,7 @@ actix-http = { path=".." }
actix-service = "0.3.4" actix-service = "0.3.4"
actix-server = "0.4.0" actix-server = "0.4.0"
actix-utils = "0.3.4" actix-utils = "0.3.4"
awc = { git = "https://github.com/actix/actix-web.git" }
base64 = "0.10" base64 = "0.10"
bytes = "0.4" bytes = "0.4"

View File

@ -3,15 +3,12 @@ use std::sync::mpsc;
use std::{net, thread, time}; use std::{net, thread, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::body::MessageBody; use actix_http::client::Connector;
use actix_http::client::{ use actix_http::ws;
ClientRequest, ClientRequestBuilder, ClientResponse, ConnectError, Connection,
Connector, SendRequestError,
};
use actix_http::{http::Uri, ws};
use actix_rt::{Runtime, System}; use actix_rt::{Runtime, System};
use actix_server::{Server, StreamServiceFactory}; use actix_server::{Server, StreamServiceFactory};
use actix_service::Service; use actix_service::Service;
use awc::{Client, ClientRequest};
use futures::future::{lazy, Future}; use futures::future::{lazy, Future};
use http::Method; use http::Method;
use net2::TcpBuilder; use net2::TcpBuilder;
@ -47,6 +44,7 @@ pub struct TestServer;
pub struct TestServerRuntime { pub struct TestServerRuntime {
addr: net::SocketAddr, addr: net::SocketAddr,
rt: Runtime, rt: Runtime,
client: Client,
} }
impl TestServer { impl TestServer {
@ -71,11 +69,39 @@ impl TestServer {
}); });
let (system, addr) = rx.recv().unwrap(); let (system, addr) = rx.recv().unwrap();
System::set_current(system); let mut rt = Runtime::new().unwrap();
TestServerRuntime {
addr, let client = rt
rt: Runtime::new().unwrap(), .block_on(lazy(move || {
let connector = {
#[cfg(feature = "ssl")]
{
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let mut builder =
SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder.set_alpn_protos(b"\x02h2\x08http/1.1").map_err(
|e| log::error!("Can not set alpn protocol: {:?}", e),
);
Connector::new()
.timeout(time::Duration::from_millis(500))
.ssl(builder.build())
.service()
} }
#[cfg(not(feature = "ssl"))]
{
Connector::new()
.timeout(time::Duration::from_millis(500))
.service()
}
};
Ok::<Client, ()>(Client::build().connector(connector).finish())
}))
.unwrap();
System::set_current(system);
TestServerRuntime { addr, rt, client }
} }
/// Get firat available unused address /// Get firat available unused address
@ -130,64 +156,38 @@ impl TestServerRuntime {
} }
/// Create `GET` request /// Create `GET` request
pub fn get(&self) -> ClientRequestBuilder { pub fn get(&self) -> ClientRequest {
ClientRequest::get(self.url("/").as_str()) self.client.get(self.url("/").as_str())
} }
/// Create https `GET` request /// Create https `GET` request
pub fn sget(&self) -> ClientRequestBuilder { pub fn sget(&self) -> ClientRequest {
ClientRequest::get(self.surl("/").as_str()) self.client.get(self.surl("/").as_str())
} }
/// Create `POST` request /// Create `POST` request
pub fn post(&self) -> ClientRequestBuilder { pub fn post(&self) -> ClientRequest {
ClientRequest::post(self.url("/").as_str()) self.client.post(self.url("/").as_str())
}
/// Create https `POST` request
pub fn spost(&self) -> ClientRequest {
self.client.post(self.surl("/").as_str())
} }
/// Create `HEAD` request /// Create `HEAD` request
pub fn head(&self) -> ClientRequestBuilder { pub fn head(&self) -> ClientRequest {
ClientRequest::head(self.url("/").as_str()) self.client.head(self.url("/").as_str())
}
/// Create https `HEAD` request
pub fn shead(&self) -> ClientRequest {
self.client.head(self.surl("/").as_str())
} }
/// Connect to test http server /// Connect to test http server
pub fn client(&self, meth: Method, path: &str) -> ClientRequestBuilder { pub fn request<S: AsRef<str>>(&self, method: Method, path: S) -> ClientRequest {
ClientRequest::build() self.client.request(method, path.as_ref())
.method(meth)
.uri(self.url(path).as_str())
.take()
}
fn new_connector(
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
{
#[cfg(feature = "ssl")]
{
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
Connector::new()
.timeout(time::Duration::from_millis(500))
.ssl(builder.build())
.service()
}
#[cfg(not(feature = "ssl"))]
{
Connector::new()
.timeout(time::Duration::from_millis(500))
.service()
}
}
/// Http connector
pub fn connector(
&mut self,
) -> impl Service<Request = Uri, Response = impl Connection, Error = ConnectError> + Clone
{
self.execute(|| TestServerRuntime::new_connector())
} }
/// Stop http server /// Stop http server
@ -213,15 +213,6 @@ impl TestServerRuntime {
) -> Result<Framed<impl AsyncRead + AsyncWrite, ws::Codec>, ws::ClientError> { ) -> Result<Framed<impl AsyncRead + AsyncWrite, ws::Codec>, ws::ClientError> {
self.ws_at("/") self.ws_at("/")
} }
/// Send request and read response message
pub fn send_request<B: MessageBody + 'static>(
&mut self,
req: ClientRequest<B>,
) -> Result<ClientResponse, SendRequestError> {
let mut conn = self.connector();
self.rt.block_on(req.send(&mut conn))
}
} }
impl Drop for TestServerRuntime { impl Drop for TestServerRuntime {

View File

@ -4,7 +4,7 @@ use futures::future::{self, ok};
use futures::{Future, Stream}; use futures::{Future, Stream};
use actix_http::{ use actix_http::{
client, error::PayloadError, HttpMessage, HttpService, Request, Response, error::PayloadError, http, HttpMessage, HttpService, Request, Response,
}; };
use actix_http_test::TestServer; use actix_http_test::TestServer;
@ -48,26 +48,18 @@ fn test_h1_v2() {
.finish(|_| future::ok::<_, ()>(Response::Ok().body(STR))) .finish(|_| future::ok::<_, ()>(Response::Ok().body(STR)))
.map(|_| ()) .map(|_| ())
}); });
let mut connector = srv.connector(); let response = srv.block_on(srv.get().send()).unwrap();
let request = srv.get().finish().unwrap();
let response = srv.block_on(request.send(&mut connector)).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
let request = srv.get().header("x-test", "111").finish().unwrap(); let request = srv.get().header("x-test", "111").send();
let repr = format!("{:?}", request); let mut response = srv.block_on(request).unwrap();
assert!(repr.contains("ClientRequest"));
assert!(repr.contains("x-test"));
let mut response = srv.block_on(request.send(&mut connector)).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
let request = srv.post().finish().unwrap(); let mut response = srv.block_on(srv.post().send()).unwrap();
let mut response = srv.block_on(request.send(&mut connector)).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
@ -82,10 +74,7 @@ fn test_connection_close() {
.finish(|_| ok::<_, ()>(Response::Ok().body(STR))) .finish(|_| ok::<_, ()>(Response::Ok().body(STR)))
.map(|_| ()) .map(|_| ())
}); });
let mut connector = srv.connector(); let response = srv.block_on(srv.get().close_connection().send()).unwrap();
let request = srv.get().close().finish().unwrap();
let response = srv.block_on(request.send(&mut connector)).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
} }
@ -102,12 +91,8 @@ fn test_with_query_parameter() {
}) })
.map(|_| ()) .map(|_| ())
}); });
let mut connector = srv.connector();
let request = client::ClientRequest::get(srv.url("/?qp=5")) let request = srv.request(http::Method::GET, srv.url("/?qp=5")).send();
.finish() let response = srv.block_on(request).unwrap();
.unwrap();
let response = srv.block_on(request.send(&mut connector)).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
} }

View File

@ -13,8 +13,8 @@ use futures::stream::{once, Stream};
use actix_http::body::Body; use actix_http::body::Body;
use actix_http::error::PayloadError; use actix_http::error::PayloadError;
use actix_http::{ use actix_http::{
body, client, error, http, http::header, Error, HttpMessage as HttpMessage2, body, error, http, http::header, Error, HttpMessage as HttpMessage2, HttpService,
HttpService, KeepAlive, Request, Response, KeepAlive, Request, Response,
}; };
fn load_body<S>(stream: S) -> impl Future<Item = BytesMut, Error = PayloadError> fn load_body<S>(stream: S) -> impl Future<Item = BytesMut, Error = PayloadError>
@ -37,8 +37,7 @@ fn test_h1() {
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|_| future::ok::<_, ()>(Response::Ok().finish()))
}); });
let req = client::ClientRequest::get(srv.url("/")).finish().unwrap(); let response = srv.block_on(srv.get().send()).unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
} }
@ -56,8 +55,7 @@ fn test_h1_2() {
.map(|_| ()) .map(|_| ())
}); });
let req = client::ClientRequest::get(srv.url("/")).finish().unwrap(); let response = srv.block_on(srv.get().send()).unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
} }
@ -100,8 +98,7 @@ fn test_h2() -> std::io::Result<()> {
) )
}); });
let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap(); let response = srv.block_on(srv.sget().send()).unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
Ok(()) Ok(())
} }
@ -124,8 +121,7 @@ fn test_h2_1() -> std::io::Result<()> {
) )
}); });
let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap(); let response = srv.block_on(srv.sget().send()).unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
Ok(()) Ok(())
} }
@ -149,10 +145,7 @@ fn test_h2_body() -> std::io::Result<()> {
) )
}); });
let req = client::ClientRequest::get(srv.surl("/")) let mut response = srv.block_on(srv.sget().send_body(data.clone())).unwrap();
.body(data.clone())
.unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
let body = srv.block_on(load_body(response.take_payload())).unwrap(); let body = srv.block_on(load_body(response.take_payload())).unwrap();
@ -331,24 +324,24 @@ fn test_content_length() {
{ {
for i in 0..4 { for i in 0..4 {
let req = client::ClientRequest::get(srv.url(&format!("/{}", i))) let req = srv
.finish() .request(http::Method::GET, srv.url(&format!("/{}", i)))
.unwrap(); .send();
let response = srv.send_request(req).unwrap(); let response = srv.block_on(req).unwrap();
assert_eq!(response.headers().get(&header), None); assert_eq!(response.headers().get(&header), None);
let req = client::ClientRequest::head(srv.url(&format!("/{}", i))) let req = srv
.finish() .request(http::Method::HEAD, srv.url(&format!("/{}", i)))
.unwrap(); .send();
let response = srv.send_request(req).unwrap(); let response = srv.block_on(req).unwrap();
assert_eq!(response.headers().get(&header), None); assert_eq!(response.headers().get(&header), None);
} }
for i in 4..6 { for i in 4..6 {
let req = client::ClientRequest::get(srv.url(&format!("/{}", i))) let req = srv
.finish() .request(http::Method::GET, srv.url(&format!("/{}", i)))
.unwrap(); .send();
let response = srv.send_request(req).unwrap(); let response = srv.block_on(req).unwrap();
assert_eq!(response.headers().get(&header), Some(&value)); assert_eq!(response.headers().get(&header), Some(&value));
} }
} }
@ -389,24 +382,24 @@ fn test_h2_content_length() {
{ {
for i in 0..4 { for i in 0..4 {
let req = client::ClientRequest::get(srv.surl(&format!("/{}", i))) let req = srv
.finish() .request(http::Method::GET, srv.surl(&format!("/{}", i)))
.unwrap(); .send();
let response = srv.send_request(req).unwrap(); let response = srv.block_on(req).unwrap();
assert_eq!(response.headers().get(&header), None); assert_eq!(response.headers().get(&header), None);
let req = client::ClientRequest::head(srv.surl(&format!("/{}", i))) let req = srv
.finish() .request(http::Method::HEAD, srv.surl(&format!("/{}", i)))
.unwrap(); .send();
let response = srv.send_request(req).unwrap(); let response = srv.block_on(req).unwrap();
assert_eq!(response.headers().get(&header), None); assert_eq!(response.headers().get(&header), None);
} }
for i in 4..6 { for i in 4..6 {
let req = client::ClientRequest::get(srv.surl(&format!("/{}", i))) let req = srv
.finish() .request(http::Method::GET, srv.surl(&format!("/{}", i)))
.unwrap(); .send();
let response = srv.send_request(req).unwrap(); let response = srv.block_on(req).unwrap();
assert_eq!(response.headers().get(&header), Some(&value)); assert_eq!(response.headers().get(&header), Some(&value));
} }
} }
@ -442,11 +435,8 @@ fn test_h1_headers() {
future::ok::<_, ()>(builder.body(data.clone())) future::ok::<_, ()>(builder.body(data.clone()))
}) })
}); });
let mut connector = srv.connector();
let req = srv.get().finish().unwrap(); let mut response = srv.block_on(srv.get().send()).unwrap();
let mut response = srv.block_on(req.send(&mut connector)).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
@ -489,10 +479,8 @@ fn test_h2_headers() {
future::ok::<_, ()>(builder.body(data.clone())) future::ok::<_, ()>(builder.body(data.clone()))
}).map_err(|_| ())) }).map_err(|_| ()))
}); });
let mut connector = srv.connector();
let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap(); let mut response = srv.block_on(srv.sget().send()).unwrap();
let mut response = srv.block_on(req.send(&mut connector)).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
@ -528,8 +516,7 @@ fn test_h1_body() {
HttpService::build().h1(|_| future::ok::<_, ()>(Response::Ok().body(STR))) HttpService::build().h1(|_| future::ok::<_, ()>(Response::Ok().body(STR)))
}); });
let req = srv.get().finish().unwrap(); let mut response = srv.block_on(srv.get().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
@ -551,8 +538,7 @@ fn test_h2_body2() {
) )
}); });
let req = srv.sget().finish().unwrap(); let mut response = srv.block_on(srv.sget().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
@ -566,8 +552,7 @@ fn test_h1_head_empty() {
HttpService::build().h1(|_| ok::<_, ()>(Response::Ok().body(STR))) HttpService::build().h1(|_| ok::<_, ()>(Response::Ok().body(STR)))
}); });
let req = client::ClientRequest::head(srv.url("/")).finish().unwrap(); let mut response = srv.block_on(srv.head().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
{ {
@ -597,8 +582,7 @@ fn test_h2_head_empty() {
) )
}); });
let req = client::ClientRequest::head(srv.surl("/")).finish().unwrap(); let mut response = srv.block_on(srv.shead().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
assert_eq!(response.version(), http::Version::HTTP_2); assert_eq!(response.version(), http::Version::HTTP_2);
@ -623,8 +607,7 @@ fn test_h1_head_binary() {
}) })
}); });
let req = client::ClientRequest::head(srv.url("/")).finish().unwrap(); let mut response = srv.block_on(srv.head().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
{ {
@ -658,8 +641,7 @@ fn test_h2_head_binary() {
) )
}); });
let req = client::ClientRequest::head(srv.surl("/")).finish().unwrap(); let mut response = srv.block_on(srv.shead().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
{ {
@ -681,8 +663,7 @@ fn test_h1_head_binary2() {
HttpService::build().h1(|_| ok::<_, ()>(Response::Ok().body(STR))) HttpService::build().h1(|_| ok::<_, ()>(Response::Ok().body(STR)))
}); });
let req = client::ClientRequest::head(srv.url("/")).finish().unwrap(); let response = srv.block_on(srv.head().send()).unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
{ {
@ -708,8 +689,7 @@ fn test_h2_head_binary2() {
) )
}); });
let req = client::ClientRequest::head(srv.surl("/")).finish().unwrap(); let response = srv.block_on(srv.shead().send()).unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
{ {
@ -733,8 +713,7 @@ fn test_h1_body_length() {
}) })
}); });
let req = srv.get().finish().unwrap(); let mut response = srv.block_on(srv.get().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
@ -761,8 +740,7 @@ fn test_h2_body_length() {
) )
}); });
let req = srv.sget().finish().unwrap(); let mut response = srv.block_on(srv.sget().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
// read response // read response
@ -783,8 +761,7 @@ fn test_h1_body_chunked_explicit() {
}) })
}); });
let req = srv.get().finish().unwrap(); let mut response = srv.block_on(srv.get().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
assert_eq!( assert_eq!(
response response
@ -825,8 +802,7 @@ fn test_h2_body_chunked_explicit() {
) )
}); });
let req = srv.sget().finish().unwrap(); let mut response = srv.block_on(srv.sget().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
assert!(!response.headers().contains_key(header::TRANSFER_ENCODING)); assert!(!response.headers().contains_key(header::TRANSFER_ENCODING));
@ -846,8 +822,7 @@ fn test_h1_body_chunked_implicit() {
}) })
}); });
let req = srv.get().finish().unwrap(); let mut response = srv.block_on(srv.get().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
assert_eq!( assert_eq!(
response response
@ -879,8 +854,7 @@ fn test_h1_response_http_error_handling() {
})) }))
}); });
let req = srv.get().finish().unwrap(); let mut response = srv.block_on(srv.get().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response // read response
@ -912,8 +886,7 @@ fn test_h2_response_http_error_handling() {
) )
}); });
let req = srv.sget().finish().unwrap(); let mut response = srv.block_on(srv.sget().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response // read response
@ -928,8 +901,7 @@ fn test_h1_service_error() {
.h1(|_| Err::<Response, Error>(error::ErrorBadRequest("error"))) .h1(|_| Err::<Response, Error>(error::ErrorBadRequest("error")))
}); });
let req = srv.get().finish().unwrap(); let mut response = srv.block_on(srv.get().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response // read response
@ -952,8 +924,7 @@ fn test_h2_service_error() {
) )
}); });
let req = srv.sget().finish().unwrap(); let mut response = srv.block_on(srv.sget().send()).unwrap();
let mut response = srv.send_request(req).unwrap();
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response // read response