1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-03 09:36:36 +02:00

Compare commits

...

4 Commits

15 changed files with 589 additions and 64 deletions

View File

@ -12,6 +12,11 @@
* `.to_async()` handler can return `Responder` type #792
### Fixed
* Fix async web::Data factory handling
## [1.0.0-beta.1] - 2019-04-20
### Added

View File

@ -71,7 +71,7 @@ actix-utils = "0.3.4"
actix-router = "0.1.3"
actix-rt = "0.2.2"
actix-web-codegen = "0.1.0-beta.1"
actix-http = { version = "0.1.1", features=["fail"] }
actix-http = { version = "0.1.2", features=["fail"] }
actix-server = "0.4.3"
actix-server-config = "0.1.1"
actix-threadpool = "0.1.0"
@ -98,8 +98,8 @@ openssl = { version="0.10", optional = true }
rustls = { version = "^0.15", optional = true }
[dev-dependencies]
actix-http = { version = "0.1.1", features=["ssl", "brotli", "flate2-zlib"] }
actix-http-test = { version = "0.1.0", features=["ssl"] }
actix-http = { version = "0.1.2", features=["ssl", "brotli", "flate2-zlib"] }
actix-http-test = { version = "0.1.1", features=["ssl"] }
actix-files = { version = "0.1.0-beta.1" }
rand = "0.6"
env_logger = "0.6"

View File

@ -1,5 +1,14 @@
# Changes
## [0.1.3] - 2019-04-23
### Fixed
* Fix http client pool management
* Fix http client wait queue management #794
## [0.1.2] - 2019-04-23
### Fixed

View File

@ -1,6 +1,6 @@
[package]
name = "actix-http"
version = "0.1.2"
version = "0.1.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http primitives"
readme = "README.md"

View File

@ -114,7 +114,8 @@ where
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U>,
Error = actix_connect::ConnectError,
> + Clone,
> + Clone
+ 'static,
{
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
/// Set to 1 second by default.
@ -284,7 +285,9 @@ mod connect_impl {
pub(crate) struct InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
pub(crate) tcp_pool: ConnectionPool<T, Io>,
}
@ -293,7 +296,8 @@ mod connect_impl {
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone,
+ Clone
+ 'static,
{
fn clone(&self) -> Self {
InnerConnector {
@ -305,7 +309,9 @@ mod connect_impl {
impl<T, Io> Service for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
type Request = Connect;
type Response = IoConnection<Io>;
@ -356,9 +362,11 @@ mod connect_impl {
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone,
+ Clone
+ 'static,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone,
+ Clone
+ 'static,
{
fn clone(&self) -> Self {
InnerConnector {
@ -372,8 +380,12 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
type Request = Connect;
type Response = EitherConnection<Io1, Io2>;
@ -409,7 +421,9 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
where
Io1: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
fut: <ConnectionPool<T, Io1> as Service>::Future,
_t: PhantomData<Io2>,
@ -417,7 +431,9 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
where
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
@ -435,7 +451,9 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
where
Io2: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
fut: <ConnectionPool<T, Io2> as Service>::Future,
_t: PhantomData<Io1>,
@ -443,7 +461,9 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
where
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{

View File

@ -274,7 +274,7 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
Ok(Async::Ready(Some(chunk)))
} else {
let framed = self.framed.take().unwrap();
let force_close = framed.get_codec().keepalive();
let force_close = !framed.get_codec().keepalive();
release_connection(framed, force_close);
Ok(Async::Ready(None))
}

View File

@ -49,7 +49,9 @@ pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
impl<T, Io> ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
pub(crate) fn new(
connector: T,
@ -69,7 +71,7 @@ where
waiters: Slab::new(),
waiters_queue: IndexSet::new(),
available: HashMap::new(),
task: AtomicTask::new(),
task: None,
})),
)
}
@ -88,7 +90,9 @@ where
impl<T, Io> Service for ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
type Request = Connect;
type Response = IoConnection<Io>;
@ -131,7 +135,17 @@ where
}
// connection is not available, wait
let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req);
let (rx, token, support) = self.1.as_ref().borrow_mut().wait_for(req);
// start support future
if !support {
self.1.as_ref().borrow_mut().task = Some(AtomicTask::new());
tokio_current_thread::spawn(ConnectorPoolSupport {
connector: self.0.clone(),
inner: self.1.clone(),
})
}
Either::B(Either::A(WaitForConnection {
rx,
key,
@ -245,7 +259,7 @@ where
Ok(Async::Ready(IoConnection::new(
ConnectionType::H2(snd),
Instant::now(),
Some(Acquired(self.key.clone(), self.inner.clone())),
Some(Acquired(self.key.clone(), self.inner.take())),
)))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
@ -256,12 +270,11 @@ where
match self.fut.poll() {
Err(err) => Err(err),
Ok(Async::Ready((io, proto))) => {
let _ = self.inner.take();
if proto == Protocol::Http1 {
Ok(Async::Ready(IoConnection::new(
ConnectionType::H1(io),
Instant::now(),
Some(Acquired(self.key.clone(), self.inner.clone())),
Some(Acquired(self.key.clone(), self.inner.take())),
)))
} else {
self.h2 = Some(handshake(io));
@ -279,7 +292,6 @@ enum Acquire<T> {
NotAvailable,
}
// #[derive(Debug)]
struct AvailableConnection<Io> {
io: ConnectionType<Io>,
used: Instant,
@ -298,7 +310,7 @@ pub(crate) struct Inner<Io> {
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
)>,
waiters_queue: IndexSet<(Key, usize)>,
task: AtomicTask,
task: Option<AtomicTask>,
}
impl<Io> Inner<Io> {
@ -314,18 +326,6 @@ impl<Io> Inner<Io> {
self.waiters.remove(token);
self.waiters_queue.remove(&(key.clone(), token));
}
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
self.acquired -= 1;
self.available
.entry(key.clone())
.or_insert_with(VecDeque::new)
.push_back(AvailableConnection {
io,
created,
used: Instant::now(),
});
}
}
impl<Io> Inner<Io>
@ -339,6 +339,7 @@ where
) -> (
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
usize,
bool,
) {
let (tx, rx) = oneshot::channel();
@ -346,8 +347,9 @@ where
let entry = self.waiters.vacant_entry();
let token = entry.key();
entry.insert((connect, tx));
assert!(!self.waiters_queue.insert((key, token)));
(rx, token)
assert!(self.waiters_queue.insert((key, token)));
(rx, token, self.task.is_some())
}
fn acquire(&mut self, key: &Key) -> Acquire<Io> {
@ -400,6 +402,19 @@ where
Acquire::Available
}
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
self.acquired -= 1;
self.available
.entry(key.clone())
.or_insert_with(VecDeque::new)
.push_back(AvailableConnection {
io,
created,
used: Instant::now(),
});
self.check_availibility();
}
fn release_close(&mut self, io: ConnectionType<Io>) {
self.acquired -= 1;
if let Some(timeout) = self.disconnect_timeout {
@ -407,11 +422,12 @@ where
tokio_current_thread::spawn(CloseConnection::new(io, timeout))
}
}
self.check_availibility();
}
fn check_availibility(&self) {
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
self.task.notify()
self.task.as_ref().map(|t| t.notify());
}
}
}
@ -451,6 +467,147 @@ where
}
}
struct ConnectorPoolSupport<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
connector: T,
inner: Rc<RefCell<Inner<Io>>>,
}
impl<T, Io> Future for ConnectorPoolSupport<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T::Future: 'static,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut inner = self.inner.as_ref().borrow_mut();
inner.task.as_ref().unwrap().register();
// check waiters
loop {
let (key, token) = {
if let Some((key, token)) = inner.waiters_queue.get_index(0) {
(key.clone(), *token)
} else {
break;
}
};
match inner.acquire(&key) {
Acquire::NotAvailable => break,
Acquire::Acquired(io, created) => {
let (_, tx) = inner.waiters.remove(token);
if let Err(conn) = tx.send(Ok(IoConnection::new(
io,
created,
Some(Acquired(key.clone(), Some(self.inner.clone()))),
))) {
let (io, created) = conn.unwrap().into_inner();
inner.release_conn(&key, io, created);
}
}
Acquire::Available => {
let (connect, tx) = inner.waiters.remove(token);
OpenWaitingConnection::spawn(
key.clone(),
tx,
self.inner.clone(),
self.connector.call(connect),
);
}
}
let _ = inner.waiters_queue.swap_remove_index(0);
}
Ok(Async::NotReady)
}
}
struct OpenWaitingConnection<F, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
fut: F,
key: Key,
h2: Option<Handshake<Io, Bytes>>,
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
inner: Option<Rc<RefCell<Inner<Io>>>>,
}
impl<F, Io> OpenWaitingConnection<F, Io>
where
F: Future<Item = (Io, Protocol), Error = ConnectError> + 'static,
Io: AsyncRead + AsyncWrite + 'static,
{
fn spawn(
key: Key,
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
inner: Rc<RefCell<Inner<Io>>>,
fut: F,
) {
tokio_current_thread::spawn(OpenWaitingConnection {
key,
fut,
h2: None,
rx: Some(rx),
inner: Some(inner),
})
}
}
impl<F, Io> Drop for OpenWaitingConnection<F, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let mut inner = inner.as_ref().borrow_mut();
inner.release();
inner.check_availibility();
}
}
}
impl<F, Io> Future for OpenWaitingConnection<F, Io>
where
F: Future<Item = (Io, Protocol), Error = ConnectError>,
Io: AsyncRead + AsyncWrite,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Err(err) => {
let _ = self.inner.take();
if let Some(rx) = self.rx.take() {
let _ = rx.send(Err(err));
}
Err(())
}
Ok(Async::Ready((io, proto))) => {
if proto == Protocol::Http1 {
let rx = self.rx.take().unwrap();
let _ = rx.send(Ok(IoConnection::new(
ConnectionType::H1(io),
Instant::now(),
Some(Acquired(self.key.clone(), self.inner.take())),
)));
Ok(Async::Ready(()))
} else {
self.h2 = Some(handshake(io));
self.poll()
}
}
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
impl<T> Acquired<T>

View File

@ -38,7 +38,7 @@ flate2-rust = ["actix-http/flate2-rust"]
[dependencies]
actix-codec = "0.1.2"
actix-service = "0.3.6"
actix-http = "0.1.1"
actix-http = "0.1.2"
base64 = "0.10.1"
bytes = "0.4"
derive_more = "0.14"
@ -55,8 +55,8 @@ openssl = { version="0.10", optional = true }
[dev-dependencies]
actix-rt = "0.2.2"
actix-web = { version = "1.0.0-alpha.6", features=["ssl"] }
actix-http = { version = "0.1.1", features=["ssl"] }
actix-web = { version = "1.0.0-beta.1", features=["ssl"] }
actix-http = { version = "0.1.2", features=["ssl"] }
actix-http-test = { version = "0.1.0", features=["ssl"] }
actix-utils = "0.3.4"
actix-server = { version = "0.4.3", features=["ssl"] }

View File

@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use brotli2::write::BrotliEncoder;
@ -7,12 +9,14 @@ use bytes::Bytes;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::future::Future;
use futures::Future;
use rand::Rng;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_http::HttpService;
use actix_http_test::TestServer;
use actix_web::http::Cookie;
use actix_service::{fn_service, NewService};
use actix_web::http::{Cookie, Version};
use actix_web::middleware::{BodyEncoding, Compress};
use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse};
use awc::error::SendRequestError;
@ -39,6 +43,30 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World";
#[cfg(feature = "ssl")]
fn ssl_acceptor<T: AsyncRead + AsyncWrite>(
) -> std::io::Result<actix_server::ssl::OpensslAcceptor<T, ()>> {
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder
.set_private_key_file("../tests/key.pem", SslFiletype::PEM)
.unwrap();
builder
.set_certificate_chain_file("../tests/cert.pem")
.unwrap();
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(openssl::ssl::AlpnError::NOACK)
}
});
builder.set_alpn_protos(b"\x02h2")?;
Ok(actix_server::ssl::OpensslAcceptor::new(builder.build()))
}
#[test]
fn test_simple() {
let mut srv =
@ -144,17 +172,249 @@ fn test_timeout_override() {
}
#[test]
fn test_connection_close() {
let mut srv = TestServer::new(|| {
HttpService::new(
App::new().service(web::resource("/").to(|| HttpResponse::Ok())),
fn test_connection_reuse() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))),
))
});
let client = awc::Client::default();
// req 1
let request = client.get(srv.url("/")).send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.url("/"));
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
// one connection
assert_eq!(num.load(Ordering::Relaxed), 1);
}
#[cfg(feature = "ssl")]
#[test]
fn test_connection_reuse_h2() {
let openssl = ssl_acceptor().unwrap();
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(
openssl
.clone()
.map_err(|e| println!("Openssl error: {}", e)),
)
.and_then(
HttpService::build()
.h2(App::new()
.service(web::resource("/").route(web::to(|| HttpResponse::Ok()))))
.map_err(|_| ()),
)
});
let res = srv
.block_on(awc::Client::new().get(srv.url("/")).force_close().send())
.unwrap();
assert!(res.status().is_success());
// disable ssl verification
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
let client = awc::Client::build()
.connector(awc::Connector::new().ssl(builder.build()).finish())
.finish();
// req 1
let request = client.get(srv.surl("/")).send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.surl("/"));
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
assert_eq!(response.version(), Version::HTTP_2);
// one connection
assert_eq!(num.load(Ordering::Relaxed), 1);
}
#[test]
fn test_connection_force_close() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))),
))
});
let client = awc::Client::default();
// req 1
let request = client.get(srv.url("/")).force_close().send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.url("/")).force_close();
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 2);
}
#[test]
fn test_connection_server_close() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(
web::resource("/")
.route(web::to(|| HttpResponse::Ok().force_close().finish())),
),
))
});
let client = awc::Client::default();
// req 1
let request = client.get(srv.url("/")).send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.url("/"));
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 2);
}
#[test]
fn test_connection_wait_queue() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(App::new().service(
web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))),
)))
});
let client = awc::Client::build()
.connector(awc::Connector::new().limit(1).finish())
.finish();
// req 1
let request = client.get(srv.url("/")).send();
let mut response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req2 = client.post(srv.url("/"));
let req2_fut = srv.execute(move || {
let mut fut = req2.send();
assert!(fut.poll().unwrap().is_not_ready());
fut
});
// read response 1
let bytes = srv.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
// req 2
let response = srv.block_on(req2_fut).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 1);
}
#[test]
fn test_connection_wait_queue_force_close() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(
web::resource("/")
.route(web::to(|| HttpResponse::Ok().force_close().body(STR))),
),
))
});
let client = awc::Client::build()
.connector(awc::Connector::new().limit(1).finish())
.finish();
// req 1
let request = client.get(srv.url("/")).send();
let mut response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req2 = client.post(srv.url("/"));
let req2_fut = srv.execute(move || {
let mut fut = req2.send();
assert!(fut.poll().unwrap().is_not_ready());
fut
});
// read response 1
let bytes = srv.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
// req 2
let response = srv.block_on(req2_fut).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 2);
}
#[test]

View File

@ -431,13 +431,14 @@ where
#[cfg(test)]
mod tests {
use actix_service::Service;
use bytes::Bytes;
use futures::{Future, IntoFuture};
use super::*;
use crate::http::{header, HeaderValue, Method, StatusCode};
use crate::service::{ServiceRequest, ServiceResponse};
use crate::test::{block_on, call_service, init_service, TestRequest};
use crate::{web, Error, HttpResponse};
use crate::test::{block_on, call_service, init_service, read_body, TestRequest};
use crate::{web, Error, HttpRequest, HttpResponse};
#[test]
fn test_default_resource() {
@ -598,4 +599,26 @@ mod tests {
HeaderValue::from_static("0001")
);
}
#[test]
fn test_external_resource() {
let mut srv = init_service(
App::new()
.external_resource("youtube", "https://youtube.com/watch/{video_id}")
.route(
"/test",
web::get().to(|req: HttpRequest| {
HttpResponse::Ok().body(format!(
"{}",
req.url_for("youtube", &["12345"]).unwrap()
))
}),
),
);
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), StatusCode::OK);
let body = read_body(resp);
assert_eq!(body, Bytes::from_static(b"https://youtube.com/watch/12345"));
}
}

View File

@ -162,7 +162,7 @@ where
}
}
if self.endpoint.is_some() {
if self.endpoint.is_some() && self.data.is_empty() {
Ok(Async::Ready(AppInitService {
service: self.endpoint.take().unwrap(),
rmap: self.rmap.clone(),

View File

@ -253,11 +253,14 @@ impl ServiceConfig {
#[cfg(test)]
mod tests {
use actix_service::Service;
use bytes::Bytes;
use futures::Future;
use tokio_timer::sleep;
use super::*;
use crate::http::{Method, StatusCode};
use crate::test::{block_on, call_service, init_service, TestRequest};
use crate::{web, App, HttpResponse};
use crate::test::{block_on, call_service, init_service, read_body, TestRequest};
use crate::{web, App, HttpRequest, HttpResponse};
#[test]
fn test_data() {
@ -277,7 +280,12 @@ mod tests {
#[test]
fn test_data_factory() {
let cfg = |cfg: &mut ServiceConfig| {
cfg.data_factory(|| Ok::<_, ()>(10usize));
cfg.data_factory(|| {
sleep(std::time::Duration::from_millis(50)).then(|_| {
println!("READY");
Ok::<_, ()>(10usize)
})
});
};
let mut srv =
@ -301,6 +309,33 @@ mod tests {
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[test]
fn test_external_resource() {
let mut srv = init_service(
App::new()
.configure(|cfg| {
cfg.external_resource(
"youtube",
"https://youtube.com/watch/{video_id}",
);
})
.route(
"/test",
web::get().to(|req: HttpRequest| {
HttpResponse::Ok().body(format!(
"{}",
req.url_for("youtube", &["12345"]).unwrap()
))
}),
),
);
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), StatusCode::OK);
let body = read_body(resp);
assert_eq!(body, Bytes::from_static(b"https://youtube.com/watch/12345"));
}
#[test]
fn test_service() {
let mut srv = init_service(App::new().configure(|cfg| {

View File

@ -1,5 +1,10 @@
# Changes
## [0.1.1] - 2019-04-24
* Always make new connection for http client
## [0.1.0] - 2019-04-16
* No changes

View File

@ -1,6 +1,6 @@
[package]
name = "actix-http-test"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http test server"
readme = "README.md"
@ -35,7 +35,7 @@ actix-rt = "0.2.2"
actix-service = "0.3.6"
actix-server = "0.4.3"
actix-utils = "0.3.5"
awc = "0.1.0"
awc = "0.1.1"
base64 = "0.10"
bytes = "0.4"
@ -55,5 +55,5 @@ tokio-timer = "0.2"
openssl = { version="0.10", optional = true }
[dev-dependencies]
actix-web = "1.0.0-alpha.5"
actix-http = "0.1.0"
actix-web = "1.0.0-beta.1"
actix-http = "0.1.2"

View File

@ -124,6 +124,7 @@ impl TestServer {
|e| log::error!("Can not set alpn protocol: {:?}", e),
);
Connector::new()
.conn_lifetime(time::Duration::from_secs(0))
.timeout(time::Duration::from_millis(500))
.ssl(builder.build())
.finish()
@ -131,6 +132,7 @@ impl TestServer {
#[cfg(not(feature = "ssl"))]
{
Connector::new()
.conn_lifetime(time::Duration::from_secs(0))
.timeout(time::Duration::from_millis(500))
.finish()
}
@ -163,6 +165,15 @@ impl TestServerRuntime {
self.rt.block_on(fut)
}
/// Execute future on current core
pub fn block_on_fn<F, R>(&mut self, f: F) -> Result<R::Item, R::Error>
where
F: FnOnce() -> R,
R: Future,
{
self.rt.block_on(lazy(|| f()))
}
/// Execute function on current core
pub fn execute<F, R>(&mut self, fut: F) -> R
where