mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-09 20:36:14 +02:00
Compare commits
6 Commits
multipart-
...
http-v0.1.
Author | SHA1 | Date | |
---|---|---|---|
9702b2d88e | |||
d2b0afd859 | |||
5f6a1a8249 | |||
5d531989e7 | |||
3532602299 | |||
48bee55087 |
12
CHANGES.md
12
CHANGES.md
@ -1,5 +1,17 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Add helper functions for reading response body `test::read_body()`
|
||||||
|
|
||||||
|
* Added support for `remainder match` (i.e "/path/{tail}*")
|
||||||
|
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
* `.to_async()` handler can return `Responder` type #792
|
||||||
|
|
||||||
|
|
||||||
## [1.0.0-beta.1] - 2019-04-20
|
## [1.0.0-beta.1] - 2019-04-20
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@ -68,10 +68,10 @@ rust-tls = ["rustls", "actix-server/rust-tls"]
|
|||||||
actix-codec = "0.1.2"
|
actix-codec = "0.1.2"
|
||||||
actix-service = "0.3.6"
|
actix-service = "0.3.6"
|
||||||
actix-utils = "0.3.4"
|
actix-utils = "0.3.4"
|
||||||
actix-router = "0.1.2"
|
actix-router = "0.1.3"
|
||||||
actix-rt = "0.2.2"
|
actix-rt = "0.2.2"
|
||||||
actix-web-codegen = "0.1.0-beta.1"
|
actix-web-codegen = "0.1.0-beta.1"
|
||||||
actix-http = { version = "0.1.1", features=["fail"] }
|
actix-http = { version = "0.1.2", features=["fail"] }
|
||||||
actix-server = "0.4.3"
|
actix-server = "0.4.3"
|
||||||
actix-server-config = "0.1.1"
|
actix-server-config = "0.1.1"
|
||||||
actix-threadpool = "0.1.0"
|
actix-threadpool = "0.1.0"
|
||||||
@ -98,7 +98,7 @@ openssl = { version="0.10", optional = true }
|
|||||||
rustls = { version = "^0.15", optional = true }
|
rustls = { version = "^0.15", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-http = { version = "0.1.1", features=["ssl", "brotli", "flate2-zlib"] }
|
actix-http = { version = "0.1.2", features=["ssl", "brotli", "flate2-zlib"] }
|
||||||
actix-http-test = { version = "0.1.0", features=["ssl"] }
|
actix-http-test = { version = "0.1.0", features=["ssl"] }
|
||||||
actix-files = { version = "0.1.0-beta.1" }
|
actix-files = { version = "0.1.0-beta.1" }
|
||||||
rand = "0.6"
|
rand = "0.6"
|
||||||
|
@ -1,5 +1,21 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.3] - 2019-04-23
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix http client pool management
|
||||||
|
|
||||||
|
* Fix http client wait queue management #794
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.2] - 2019-04-23
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix BorrowMutError panic in client connector #793
|
||||||
|
|
||||||
|
|
||||||
## [0.1.1] - 2019-04-19
|
## [0.1.1] - 2019-04-19
|
||||||
|
|
||||||
### Changes
|
### Changes
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-http"
|
name = "actix-http"
|
||||||
version = "0.1.1"
|
version = "0.1.3"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix http primitives"
|
description = "Actix http primitives"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
@ -114,7 +114,8 @@ where
|
|||||||
Request = TcpConnect<Uri>,
|
Request = TcpConnect<Uri>,
|
||||||
Response = TcpConnection<Uri, U>,
|
Response = TcpConnection<Uri, U>,
|
||||||
Error = actix_connect::ConnectError,
|
Error = actix_connect::ConnectError,
|
||||||
> + Clone,
|
> + Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
||||||
/// Set to 1 second by default.
|
/// Set to 1 second by default.
|
||||||
@ -284,7 +285,9 @@ mod connect_impl {
|
|||||||
pub(crate) struct InnerConnector<T, Io>
|
pub(crate) struct InnerConnector<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
pub(crate) tcp_pool: ConnectionPool<T, Io>,
|
pub(crate) tcp_pool: ConnectionPool<T, Io>,
|
||||||
}
|
}
|
||||||
@ -293,7 +296,8 @@ mod connect_impl {
|
|||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
+ Clone,
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
InnerConnector {
|
InnerConnector {
|
||||||
@ -305,7 +309,9 @@ mod connect_impl {
|
|||||||
impl<T, Io> Service for InnerConnector<T, Io>
|
impl<T, Io> Service for InnerConnector<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
type Request = Connect;
|
type Request = Connect;
|
||||||
type Response = IoConnection<Io>;
|
type Response = IoConnection<Io>;
|
||||||
@ -356,9 +362,11 @@ mod connect_impl {
|
|||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + 'static,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + 'static,
|
||||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
+ Clone,
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
+ Clone,
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
InnerConnector {
|
InnerConnector {
|
||||||
@ -372,8 +380,12 @@ mod connect_impl {
|
|||||||
where
|
where
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + 'static,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + 'static,
|
||||||
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
|
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
|
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
type Request = Connect;
|
type Request = Connect;
|
||||||
type Response = EitherConnection<Io1, Io2>;
|
type Response = EitherConnection<Io1, Io2>;
|
||||||
@ -409,7 +421,9 @@ mod connect_impl {
|
|||||||
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
|
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
fut: <ConnectionPool<T, Io1> as Service>::Future,
|
fut: <ConnectionPool<T, Io1> as Service>::Future,
|
||||||
_t: PhantomData<Io2>,
|
_t: PhantomData<Io2>,
|
||||||
@ -417,7 +431,9 @@ mod connect_impl {
|
|||||||
|
|
||||||
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
|
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + 'static,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + 'static,
|
||||||
{
|
{
|
||||||
@ -435,7 +451,9 @@ mod connect_impl {
|
|||||||
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
|
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
fut: <ConnectionPool<T, Io2> as Service>::Future,
|
fut: <ConnectionPool<T, Io2> as Service>::Future,
|
||||||
_t: PhantomData<Io1>,
|
_t: PhantomData<Io1>,
|
||||||
@ -443,7 +461,9 @@ mod connect_impl {
|
|||||||
|
|
||||||
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
|
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
|
||||||
where
|
where
|
||||||
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
Io1: AsyncRead + AsyncWrite + 'static,
|
Io1: AsyncRead + AsyncWrite + 'static,
|
||||||
Io2: AsyncRead + AsyncWrite + 'static,
|
Io2: AsyncRead + AsyncWrite + 'static,
|
||||||
{
|
{
|
||||||
|
@ -274,7 +274,7 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
|
|||||||
Ok(Async::Ready(Some(chunk)))
|
Ok(Async::Ready(Some(chunk)))
|
||||||
} else {
|
} else {
|
||||||
let framed = self.framed.take().unwrap();
|
let framed = self.framed.take().unwrap();
|
||||||
let force_close = framed.get_codec().keepalive();
|
let force_close = !framed.get_codec().keepalive();
|
||||||
release_connection(framed, force_close);
|
release_connection(framed, force_close);
|
||||||
Ok(Async::Ready(None))
|
Ok(Async::Ready(None))
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,9 @@ pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
|
|||||||
impl<T, Io> ConnectionPool<T, Io>
|
impl<T, Io> ConnectionPool<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
connector: T,
|
connector: T,
|
||||||
@ -69,7 +71,7 @@ where
|
|||||||
waiters: Slab::new(),
|
waiters: Slab::new(),
|
||||||
waiters_queue: IndexSet::new(),
|
waiters_queue: IndexSet::new(),
|
||||||
available: HashMap::new(),
|
available: HashMap::new(),
|
||||||
task: AtomicTask::new(),
|
task: None,
|
||||||
})),
|
})),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -88,7 +90,9 @@ where
|
|||||||
impl<T, Io> Service for ConnectionPool<T, Io>
|
impl<T, Io> Service for ConnectionPool<T, Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + 'static,
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
|
+ Clone
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
type Request = Connect;
|
type Request = Connect;
|
||||||
type Response = IoConnection<Io>;
|
type Response = IoConnection<Io>;
|
||||||
@ -113,15 +117,35 @@ where
|
|||||||
match self.1.as_ref().borrow_mut().acquire(&key) {
|
match self.1.as_ref().borrow_mut().acquire(&key) {
|
||||||
Acquire::Acquired(io, created) => {
|
Acquire::Acquired(io, created) => {
|
||||||
// use existing connection
|
// use existing connection
|
||||||
Either::A(ok(IoConnection::new(
|
return Either::A(ok(IoConnection::new(
|
||||||
io,
|
io,
|
||||||
created,
|
created,
|
||||||
Some(Acquired(key, Some(self.1.clone()))),
|
Some(Acquired(key, Some(self.1.clone()))),
|
||||||
)))
|
)));
|
||||||
}
|
}
|
||||||
Acquire::NotAvailable => {
|
Acquire::Available => {
|
||||||
|
// open new connection
|
||||||
|
return Either::B(Either::B(OpenConnection::new(
|
||||||
|
key,
|
||||||
|
self.1.clone(),
|
||||||
|
self.0.call(req),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
|
||||||
// connection is not available, wait
|
// connection is not available, wait
|
||||||
let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req);
|
let (rx, token, support) = self.1.as_ref().borrow_mut().wait_for(req);
|
||||||
|
|
||||||
|
// start support future
|
||||||
|
if !support {
|
||||||
|
self.1.as_ref().borrow_mut().task = Some(AtomicTask::new());
|
||||||
|
tokio_current_thread::spawn(ConnectorPoolSupport {
|
||||||
|
connector: self.0.clone(),
|
||||||
|
inner: self.1.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
Either::B(Either::A(WaitForConnection {
|
Either::B(Either::A(WaitForConnection {
|
||||||
rx,
|
rx,
|
||||||
key,
|
key,
|
||||||
@ -129,16 +153,6 @@ where
|
|||||||
inner: Some(self.1.clone()),
|
inner: Some(self.1.clone()),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
Acquire::Available => {
|
|
||||||
// open new connection
|
|
||||||
Either::B(Either::B(OpenConnection::new(
|
|
||||||
key,
|
|
||||||
self.1.clone(),
|
|
||||||
self.0.call(req),
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
@ -245,7 +259,7 @@ where
|
|||||||
Ok(Async::Ready(IoConnection::new(
|
Ok(Async::Ready(IoConnection::new(
|
||||||
ConnectionType::H2(snd),
|
ConnectionType::H2(snd),
|
||||||
Instant::now(),
|
Instant::now(),
|
||||||
Some(Acquired(self.key.clone(), self.inner.clone())),
|
Some(Acquired(self.key.clone(), self.inner.take())),
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
@ -256,12 +270,11 @@ where
|
|||||||
match self.fut.poll() {
|
match self.fut.poll() {
|
||||||
Err(err) => Err(err),
|
Err(err) => Err(err),
|
||||||
Ok(Async::Ready((io, proto))) => {
|
Ok(Async::Ready((io, proto))) => {
|
||||||
let _ = self.inner.take();
|
|
||||||
if proto == Protocol::Http1 {
|
if proto == Protocol::Http1 {
|
||||||
Ok(Async::Ready(IoConnection::new(
|
Ok(Async::Ready(IoConnection::new(
|
||||||
ConnectionType::H1(io),
|
ConnectionType::H1(io),
|
||||||
Instant::now(),
|
Instant::now(),
|
||||||
Some(Acquired(self.key.clone(), self.inner.clone())),
|
Some(Acquired(self.key.clone(), self.inner.take())),
|
||||||
)))
|
)))
|
||||||
} else {
|
} else {
|
||||||
self.h2 = Some(handshake(io));
|
self.h2 = Some(handshake(io));
|
||||||
@ -279,7 +292,6 @@ enum Acquire<T> {
|
|||||||
NotAvailable,
|
NotAvailable,
|
||||||
}
|
}
|
||||||
|
|
||||||
// #[derive(Debug)]
|
|
||||||
struct AvailableConnection<Io> {
|
struct AvailableConnection<Io> {
|
||||||
io: ConnectionType<Io>,
|
io: ConnectionType<Io>,
|
||||||
used: Instant,
|
used: Instant,
|
||||||
@ -298,7 +310,7 @@ pub(crate) struct Inner<Io> {
|
|||||||
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
|
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
|
||||||
)>,
|
)>,
|
||||||
waiters_queue: IndexSet<(Key, usize)>,
|
waiters_queue: IndexSet<(Key, usize)>,
|
||||||
task: AtomicTask,
|
task: Option<AtomicTask>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io> Inner<Io> {
|
impl<Io> Inner<Io> {
|
||||||
@ -314,18 +326,6 @@ impl<Io> Inner<Io> {
|
|||||||
self.waiters.remove(token);
|
self.waiters.remove(token);
|
||||||
self.waiters_queue.remove(&(key.clone(), token));
|
self.waiters_queue.remove(&(key.clone(), token));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
|
|
||||||
self.acquired -= 1;
|
|
||||||
self.available
|
|
||||||
.entry(key.clone())
|
|
||||||
.or_insert_with(VecDeque::new)
|
|
||||||
.push_back(AvailableConnection {
|
|
||||||
io,
|
|
||||||
created,
|
|
||||||
used: Instant::now(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io> Inner<Io>
|
impl<Io> Inner<Io>
|
||||||
@ -339,6 +339,7 @@ where
|
|||||||
) -> (
|
) -> (
|
||||||
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
||||||
usize,
|
usize,
|
||||||
|
bool,
|
||||||
) {
|
) {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
@ -346,8 +347,9 @@ where
|
|||||||
let entry = self.waiters.vacant_entry();
|
let entry = self.waiters.vacant_entry();
|
||||||
let token = entry.key();
|
let token = entry.key();
|
||||||
entry.insert((connect, tx));
|
entry.insert((connect, tx));
|
||||||
assert!(!self.waiters_queue.insert((key, token)));
|
assert!(self.waiters_queue.insert((key, token)));
|
||||||
(rx, token)
|
|
||||||
|
(rx, token, self.task.is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn acquire(&mut self, key: &Key) -> Acquire<Io> {
|
fn acquire(&mut self, key: &Key) -> Acquire<Io> {
|
||||||
@ -400,6 +402,19 @@ where
|
|||||||
Acquire::Available
|
Acquire::Available
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
|
||||||
|
self.acquired -= 1;
|
||||||
|
self.available
|
||||||
|
.entry(key.clone())
|
||||||
|
.or_insert_with(VecDeque::new)
|
||||||
|
.push_back(AvailableConnection {
|
||||||
|
io,
|
||||||
|
created,
|
||||||
|
used: Instant::now(),
|
||||||
|
});
|
||||||
|
self.check_availibility();
|
||||||
|
}
|
||||||
|
|
||||||
fn release_close(&mut self, io: ConnectionType<Io>) {
|
fn release_close(&mut self, io: ConnectionType<Io>) {
|
||||||
self.acquired -= 1;
|
self.acquired -= 1;
|
||||||
if let Some(timeout) = self.disconnect_timeout {
|
if let Some(timeout) = self.disconnect_timeout {
|
||||||
@ -407,11 +422,12 @@ where
|
|||||||
tokio_current_thread::spawn(CloseConnection::new(io, timeout))
|
tokio_current_thread::spawn(CloseConnection::new(io, timeout))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.check_availibility();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_availibility(&self) {
|
fn check_availibility(&self) {
|
||||||
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
|
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
|
||||||
self.task.notify()
|
self.task.as_ref().map(|t| t.notify());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -451,6 +467,147 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ConnectorPoolSupport<T, Io>
|
||||||
|
where
|
||||||
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
connector: T,
|
||||||
|
inner: Rc<RefCell<Inner<Io>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, Io> Future for ConnectorPoolSupport<T, Io>
|
||||||
|
where
|
||||||
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
|
||||||
|
T::Future: 'static,
|
||||||
|
{
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let mut inner = self.inner.as_ref().borrow_mut();
|
||||||
|
inner.task.as_ref().unwrap().register();
|
||||||
|
|
||||||
|
// check waiters
|
||||||
|
loop {
|
||||||
|
let (key, token) = {
|
||||||
|
if let Some((key, token)) = inner.waiters_queue.get_index(0) {
|
||||||
|
(key.clone(), *token)
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
match inner.acquire(&key) {
|
||||||
|
Acquire::NotAvailable => break,
|
||||||
|
Acquire::Acquired(io, created) => {
|
||||||
|
let (_, tx) = inner.waiters.remove(token);
|
||||||
|
if let Err(conn) = tx.send(Ok(IoConnection::new(
|
||||||
|
io,
|
||||||
|
created,
|
||||||
|
Some(Acquired(key.clone(), Some(self.inner.clone()))),
|
||||||
|
))) {
|
||||||
|
let (io, created) = conn.unwrap().into_inner();
|
||||||
|
inner.release_conn(&key, io, created);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Acquire::Available => {
|
||||||
|
let (connect, tx) = inner.waiters.remove(token);
|
||||||
|
OpenWaitingConnection::spawn(
|
||||||
|
key.clone(),
|
||||||
|
tx,
|
||||||
|
self.inner.clone(),
|
||||||
|
self.connector.call(connect),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = inner.waiters_queue.swap_remove_index(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct OpenWaitingConnection<F, Io>
|
||||||
|
where
|
||||||
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
fut: F,
|
||||||
|
key: Key,
|
||||||
|
h2: Option<Handshake<Io, Bytes>>,
|
||||||
|
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
|
||||||
|
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, Io> OpenWaitingConnection<F, Io>
|
||||||
|
where
|
||||||
|
F: Future<Item = (Io, Protocol), Error = ConnectError> + 'static,
|
||||||
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
fn spawn(
|
||||||
|
key: Key,
|
||||||
|
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
|
||||||
|
inner: Rc<RefCell<Inner<Io>>>,
|
||||||
|
fut: F,
|
||||||
|
) {
|
||||||
|
tokio_current_thread::spawn(OpenWaitingConnection {
|
||||||
|
key,
|
||||||
|
fut,
|
||||||
|
h2: None,
|
||||||
|
rx: Some(rx),
|
||||||
|
inner: Some(inner),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, Io> Drop for OpenWaitingConnection<F, Io>
|
||||||
|
where
|
||||||
|
Io: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(inner) = self.inner.take() {
|
||||||
|
let mut inner = inner.as_ref().borrow_mut();
|
||||||
|
inner.release();
|
||||||
|
inner.check_availibility();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, Io> Future for OpenWaitingConnection<F, Io>
|
||||||
|
where
|
||||||
|
F: Future<Item = (Io, Protocol), Error = ConnectError>,
|
||||||
|
Io: AsyncRead + AsyncWrite,
|
||||||
|
{
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
match self.fut.poll() {
|
||||||
|
Err(err) => {
|
||||||
|
let _ = self.inner.take();
|
||||||
|
if let Some(rx) = self.rx.take() {
|
||||||
|
let _ = rx.send(Err(err));
|
||||||
|
}
|
||||||
|
Err(())
|
||||||
|
}
|
||||||
|
Ok(Async::Ready((io, proto))) => {
|
||||||
|
if proto == Protocol::Http1 {
|
||||||
|
let rx = self.rx.take().unwrap();
|
||||||
|
let _ = rx.send(Ok(IoConnection::new(
|
||||||
|
ConnectionType::H1(io),
|
||||||
|
Instant::now(),
|
||||||
|
Some(Acquired(self.key.clone(), self.inner.take())),
|
||||||
|
)));
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
} else {
|
||||||
|
self.h2 = Some(handshake(io));
|
||||||
|
self.poll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
|
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
|
||||||
|
|
||||||
impl<T> Acquired<T>
|
impl<T> Acquired<T>
|
||||||
|
@ -38,7 +38,7 @@ flate2-rust = ["actix-http/flate2-rust"]
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.1.2"
|
actix-codec = "0.1.2"
|
||||||
actix-service = "0.3.6"
|
actix-service = "0.3.6"
|
||||||
actix-http = "0.1.1"
|
actix-http = "0.1.2"
|
||||||
base64 = "0.10.1"
|
base64 = "0.10.1"
|
||||||
bytes = "0.4"
|
bytes = "0.4"
|
||||||
derive_more = "0.14"
|
derive_more = "0.14"
|
||||||
@ -55,8 +55,8 @@ openssl = { version="0.10", optional = true }
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "0.2.2"
|
actix-rt = "0.2.2"
|
||||||
actix-web = { version = "1.0.0-alpha.6", features=["ssl"] }
|
actix-web = { version = "1.0.0-beta.1", features=["ssl"] }
|
||||||
actix-http = { version = "0.1.1", features=["ssl"] }
|
actix-http = { version = "0.1.2", features=["ssl"] }
|
||||||
actix-http-test = { version = "0.1.0", features=["ssl"] }
|
actix-http-test = { version = "0.1.0", features=["ssl"] }
|
||||||
actix-utils = "0.3.4"
|
actix-utils = "0.3.4"
|
||||||
actix-server = { version = "0.4.3", features=["ssl"] }
|
actix-server = { version = "0.4.3", features=["ssl"] }
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use brotli2::write::BrotliEncoder;
|
use brotli2::write::BrotliEncoder;
|
||||||
@ -7,12 +9,14 @@ use bytes::Bytes;
|
|||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use flate2::write::GzEncoder;
|
use flate2::write::GzEncoder;
|
||||||
use flate2::Compression;
|
use flate2::Compression;
|
||||||
use futures::future::Future;
|
use futures::Future;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
|
||||||
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
use actix_http::HttpService;
|
use actix_http::HttpService;
|
||||||
use actix_http_test::TestServer;
|
use actix_http_test::TestServer;
|
||||||
use actix_web::http::Cookie;
|
use actix_service::{fn_service, NewService};
|
||||||
|
use actix_web::http::{Cookie, Version};
|
||||||
use actix_web::middleware::{BodyEncoding, Compress};
|
use actix_web::middleware::{BodyEncoding, Compress};
|
||||||
use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse};
|
use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse};
|
||||||
use awc::error::SendRequestError;
|
use awc::error::SendRequestError;
|
||||||
@ -39,6 +43,30 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
|
|||||||
Hello World Hello World Hello World Hello World Hello World \
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
Hello World Hello World Hello World Hello World Hello World";
|
Hello World Hello World Hello World Hello World Hello World";
|
||||||
|
|
||||||
|
#[cfg(feature = "ssl")]
|
||||||
|
fn ssl_acceptor<T: AsyncRead + AsyncWrite>(
|
||||||
|
) -> std::io::Result<actix_server::ssl::OpensslAcceptor<T, ()>> {
|
||||||
|
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
|
||||||
|
// load ssl keys
|
||||||
|
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||||
|
builder
|
||||||
|
.set_private_key_file("../tests/key.pem", SslFiletype::PEM)
|
||||||
|
.unwrap();
|
||||||
|
builder
|
||||||
|
.set_certificate_chain_file("../tests/cert.pem")
|
||||||
|
.unwrap();
|
||||||
|
builder.set_alpn_select_callback(|_, protos| {
|
||||||
|
const H2: &[u8] = b"\x02h2";
|
||||||
|
if protos.windows(3).any(|window| window == H2) {
|
||||||
|
Ok(b"h2")
|
||||||
|
} else {
|
||||||
|
Err(openssl::ssl::AlpnError::NOACK)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
builder.set_alpn_protos(b"\x02h2")?;
|
||||||
|
Ok(actix_server::ssl::OpensslAcceptor::new(builder.build()))
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_simple() {
|
fn test_simple() {
|
||||||
let mut srv =
|
let mut srv =
|
||||||
@ -144,17 +172,249 @@ fn test_timeout_override() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_connection_close() {
|
fn test_connection_reuse() {
|
||||||
let mut srv = TestServer::new(|| {
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
HttpService::new(
|
let num2 = num.clone();
|
||||||
App::new().service(web::resource("/").to(|| HttpResponse::Ok())),
|
|
||||||
|
let mut srv = TestServer::new(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
fn_service(move |io| {
|
||||||
|
num2.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(io)
|
||||||
|
})
|
||||||
|
.and_then(HttpService::new(
|
||||||
|
App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))),
|
||||||
|
))
|
||||||
|
});
|
||||||
|
|
||||||
|
let client = awc::Client::default();
|
||||||
|
|
||||||
|
// req 1
|
||||||
|
let request = client.get(srv.url("/")).send();
|
||||||
|
let response = srv.block_on(request).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let req = client.post(srv.url("/"));
|
||||||
|
let response = srv.block_on_fn(move || req.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// one connection
|
||||||
|
assert_eq!(num.load(Ordering::Relaxed), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "ssl")]
|
||||||
|
#[test]
|
||||||
|
fn test_connection_reuse_h2() {
|
||||||
|
let openssl = ssl_acceptor().unwrap();
|
||||||
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
|
let num2 = num.clone();
|
||||||
|
|
||||||
|
let mut srv = TestServer::new(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
fn_service(move |io| {
|
||||||
|
num2.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(io)
|
||||||
|
})
|
||||||
|
.and_then(
|
||||||
|
openssl
|
||||||
|
.clone()
|
||||||
|
.map_err(|e| println!("Openssl error: {}", e)),
|
||||||
|
)
|
||||||
|
.and_then(
|
||||||
|
HttpService::build()
|
||||||
|
.h2(App::new()
|
||||||
|
.service(web::resource("/").route(web::to(|| HttpResponse::Ok()))))
|
||||||
|
.map_err(|_| ()),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let res = srv
|
// disable ssl verification
|
||||||
.block_on(awc::Client::new().get(srv.url("/")).force_close().send())
|
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
|
||||||
.unwrap();
|
|
||||||
assert!(res.status().is_success());
|
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||||
|
builder.set_verify(SslVerifyMode::NONE);
|
||||||
|
let _ = builder
|
||||||
|
.set_alpn_protos(b"\x02h2\x08http/1.1")
|
||||||
|
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
|
||||||
|
|
||||||
|
let client = awc::Client::build()
|
||||||
|
.connector(awc::Connector::new().ssl(builder.build()).finish())
|
||||||
|
.finish();
|
||||||
|
|
||||||
|
// req 1
|
||||||
|
let request = client.get(srv.surl("/")).send();
|
||||||
|
let response = srv.block_on(request).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let req = client.post(srv.surl("/"));
|
||||||
|
let response = srv.block_on_fn(move || req.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
assert_eq!(response.version(), Version::HTTP_2);
|
||||||
|
|
||||||
|
// one connection
|
||||||
|
assert_eq!(num.load(Ordering::Relaxed), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_connection_force_close() {
|
||||||
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
|
let num2 = num.clone();
|
||||||
|
|
||||||
|
let mut srv = TestServer::new(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
fn_service(move |io| {
|
||||||
|
num2.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(io)
|
||||||
|
})
|
||||||
|
.and_then(HttpService::new(
|
||||||
|
App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))),
|
||||||
|
))
|
||||||
|
});
|
||||||
|
|
||||||
|
let client = awc::Client::default();
|
||||||
|
|
||||||
|
// req 1
|
||||||
|
let request = client.get(srv.url("/")).force_close().send();
|
||||||
|
let response = srv.block_on(request).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let req = client.post(srv.url("/")).force_close();
|
||||||
|
let response = srv.block_on_fn(move || req.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// two connection
|
||||||
|
assert_eq!(num.load(Ordering::Relaxed), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_connection_server_close() {
|
||||||
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
|
let num2 = num.clone();
|
||||||
|
|
||||||
|
let mut srv = TestServer::new(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
fn_service(move |io| {
|
||||||
|
num2.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(io)
|
||||||
|
})
|
||||||
|
.and_then(HttpService::new(
|
||||||
|
App::new().service(
|
||||||
|
web::resource("/")
|
||||||
|
.route(web::to(|| HttpResponse::Ok().force_close().finish())),
|
||||||
|
),
|
||||||
|
))
|
||||||
|
});
|
||||||
|
|
||||||
|
let client = awc::Client::default();
|
||||||
|
|
||||||
|
// req 1
|
||||||
|
let request = client.get(srv.url("/")).send();
|
||||||
|
let response = srv.block_on(request).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let req = client.post(srv.url("/"));
|
||||||
|
let response = srv.block_on_fn(move || req.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// two connection
|
||||||
|
assert_eq!(num.load(Ordering::Relaxed), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_connection_wait_queue() {
|
||||||
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
|
let num2 = num.clone();
|
||||||
|
|
||||||
|
let mut srv = TestServer::new(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
fn_service(move |io| {
|
||||||
|
num2.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(io)
|
||||||
|
})
|
||||||
|
.and_then(HttpService::new(App::new().service(
|
||||||
|
web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))),
|
||||||
|
)))
|
||||||
|
});
|
||||||
|
|
||||||
|
let client = awc::Client::build()
|
||||||
|
.connector(awc::Connector::new().limit(1).finish())
|
||||||
|
.finish();
|
||||||
|
|
||||||
|
// req 1
|
||||||
|
let request = client.get(srv.url("/")).send();
|
||||||
|
let mut response = srv.block_on(request).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let req2 = client.post(srv.url("/"));
|
||||||
|
let req2_fut = srv.execute(move || {
|
||||||
|
let mut fut = req2.send();
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
fut
|
||||||
|
});
|
||||||
|
|
||||||
|
// read response 1
|
||||||
|
let bytes = srv.block_on(response.body()).unwrap();
|
||||||
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let response = srv.block_on(req2_fut).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// two connection
|
||||||
|
assert_eq!(num.load(Ordering::Relaxed), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_connection_wait_queue_force_close() {
|
||||||
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
|
let num2 = num.clone();
|
||||||
|
|
||||||
|
let mut srv = TestServer::new(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
fn_service(move |io| {
|
||||||
|
num2.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(io)
|
||||||
|
})
|
||||||
|
.and_then(HttpService::new(
|
||||||
|
App::new().service(
|
||||||
|
web::resource("/")
|
||||||
|
.route(web::to(|| HttpResponse::Ok().force_close().body(STR))),
|
||||||
|
),
|
||||||
|
))
|
||||||
|
});
|
||||||
|
|
||||||
|
let client = awc::Client::build()
|
||||||
|
.connector(awc::Connector::new().limit(1).finish())
|
||||||
|
.finish();
|
||||||
|
|
||||||
|
// req 1
|
||||||
|
let request = client.get(srv.url("/")).send();
|
||||||
|
let mut response = srv.block_on(request).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let req2 = client.post(srv.url("/"));
|
||||||
|
let req2_fut = srv.execute(move || {
|
||||||
|
let mut fut = req2.send();
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
fut
|
||||||
|
});
|
||||||
|
|
||||||
|
// read response 1
|
||||||
|
let bytes = srv.block_on(response.body()).unwrap();
|
||||||
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
|
|
||||||
|
// req 2
|
||||||
|
let response = srv.block_on(req2_fut).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// two connection
|
||||||
|
assert_eq!(num.load(Ordering::Relaxed), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -124,7 +124,7 @@ where
|
|||||||
pub trait AsyncFactory<T, R>: Clone + 'static
|
pub trait AsyncFactory<T, R>: Clone + 'static
|
||||||
where
|
where
|
||||||
R: IntoFuture,
|
R: IntoFuture,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
fn call(&self, param: T) -> R;
|
fn call(&self, param: T) -> R;
|
||||||
@ -134,7 +134,7 @@ impl<F, R> AsyncFactory<(), R> for F
|
|||||||
where
|
where
|
||||||
F: Fn() -> R + Clone + 'static,
|
F: Fn() -> R + Clone + 'static,
|
||||||
R: IntoFuture,
|
R: IntoFuture,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
fn call(&self, _: ()) -> R {
|
fn call(&self, _: ()) -> R {
|
||||||
@ -147,7 +147,7 @@ pub struct AsyncHandler<F, T, R>
|
|||||||
where
|
where
|
||||||
F: AsyncFactory<T, R>,
|
F: AsyncFactory<T, R>,
|
||||||
R: IntoFuture,
|
R: IntoFuture,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
hnd: F,
|
hnd: F,
|
||||||
@ -158,7 +158,7 @@ impl<F, T, R> AsyncHandler<F, T, R>
|
|||||||
where
|
where
|
||||||
F: AsyncFactory<T, R>,
|
F: AsyncFactory<T, R>,
|
||||||
R: IntoFuture,
|
R: IntoFuture,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
pub fn new(hnd: F) -> Self {
|
pub fn new(hnd: F) -> Self {
|
||||||
@ -173,7 +173,7 @@ impl<F, T, R> Clone for AsyncHandler<F, T, R>
|
|||||||
where
|
where
|
||||||
F: AsyncFactory<T, R>,
|
F: AsyncFactory<T, R>,
|
||||||
R: IntoFuture,
|
R: IntoFuture,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
@ -188,7 +188,7 @@ impl<F, T, R> Service for AsyncHandler<F, T, R>
|
|||||||
where
|
where
|
||||||
F: AsyncFactory<T, R>,
|
F: AsyncFactory<T, R>,
|
||||||
R: IntoFuture,
|
R: IntoFuture,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
type Request = (T, HttpRequest);
|
type Request = (T, HttpRequest);
|
||||||
@ -203,31 +203,38 @@ where
|
|||||||
fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future {
|
fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future {
|
||||||
AsyncHandlerServiceResponse {
|
AsyncHandlerServiceResponse {
|
||||||
fut: self.hnd.call(param).into_future(),
|
fut: self.hnd.call(param).into_future(),
|
||||||
|
fut2: None,
|
||||||
req: Some(req),
|
req: Some(req),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub struct AsyncHandlerServiceResponse<T> {
|
pub struct AsyncHandlerServiceResponse<T>
|
||||||
|
where
|
||||||
|
T: Future,
|
||||||
|
T::Item: Responder,
|
||||||
|
{
|
||||||
fut: T,
|
fut: T,
|
||||||
|
fut2: Option<<<T::Item as Responder>::Future as IntoFuture>::Future>,
|
||||||
req: Option<HttpRequest>,
|
req: Option<HttpRequest>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Future for AsyncHandlerServiceResponse<T>
|
impl<T> Future for AsyncHandlerServiceResponse<T>
|
||||||
where
|
where
|
||||||
T: Future,
|
T: Future,
|
||||||
T::Item: Into<Response>,
|
T::Item: Responder,
|
||||||
T::Error: Into<Error>,
|
T::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
type Item = ServiceResponse;
|
type Item = ServiceResponse;
|
||||||
type Error = Void;
|
type Error = Void;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
match self.fut.poll() {
|
if let Some(ref mut fut) = self.fut2 {
|
||||||
|
return match fut.poll() {
|
||||||
Ok(Async::Ready(res)) => Ok(Async::Ready(ServiceResponse::new(
|
Ok(Async::Ready(res)) => Ok(Async::Ready(ServiceResponse::new(
|
||||||
self.req.take().unwrap(),
|
self.req.take().unwrap(),
|
||||||
res.into(),
|
res,
|
||||||
))),
|
))),
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -237,6 +244,23 @@ where
|
|||||||
res,
|
res,
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.fut.poll() {
|
||||||
|
Ok(Async::Ready(res)) => {
|
||||||
|
self.fut2 =
|
||||||
|
Some(res.respond_to(self.req.as_ref().unwrap()).into_future());
|
||||||
|
return self.poll();
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
|
Err(e) => {
|
||||||
|
let res: Response = e.into().into();
|
||||||
|
Ok(Async::Ready(ServiceResponse::new(
|
||||||
|
self.req.take().unwrap(),
|
||||||
|
res,
|
||||||
|
)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -357,7 +381,7 @@ macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => {
|
|||||||
impl<Func, $($T,)+ Res> AsyncFactory<($($T,)+), Res> for Func
|
impl<Func, $($T,)+ Res> AsyncFactory<($($T,)+), Res> for Func
|
||||||
where Func: Fn($($T,)+) -> Res + Clone + 'static,
|
where Func: Fn($($T,)+) -> Res + Clone + 'static,
|
||||||
Res: IntoFuture,
|
Res: IntoFuture,
|
||||||
Res::Item: Into<Response>,
|
Res::Item: Responder,
|
||||||
Res::Error: Into<Error>,
|
Res::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
fn call(&self, param: ($($T,)+)) -> Res {
|
fn call(&self, param: ($($T,)+)) -> Res {
|
||||||
|
@ -217,7 +217,7 @@ where
|
|||||||
F: AsyncFactory<I, R>,
|
F: AsyncFactory<I, R>,
|
||||||
I: FromRequest + 'static,
|
I: FromRequest + 'static,
|
||||||
R: IntoFuture + 'static,
|
R: IntoFuture + 'static,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
self.routes.push(Route::new().to_async(handler));
|
self.routes.push(Route::new().to_async(handler));
|
||||||
|
35
src/route.rs
35
src/route.rs
@ -1,7 +1,7 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
use actix_http::{http::Method, Error, Extensions, Response};
|
use actix_http::{http::Method, Error, Extensions};
|
||||||
use actix_service::{NewService, Service};
|
use actix_service::{NewService, Service};
|
||||||
use futures::future::{ok, Either, FutureResult};
|
use futures::future::{ok, Either, FutureResult};
|
||||||
use futures::{Async, Future, IntoFuture, Poll};
|
use futures::{Async, Future, IntoFuture, Poll};
|
||||||
@ -278,7 +278,7 @@ impl Route {
|
|||||||
F: AsyncFactory<T, R>,
|
F: AsyncFactory<T, R>,
|
||||||
T: FromRequest + 'static,
|
T: FromRequest + 'static,
|
||||||
R: IntoFuture + 'static,
|
R: IntoFuture + 'static,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
self.service = Box::new(RouteNewService::new(Extract::new(
|
self.service = Box::new(RouteNewService::new(Extract::new(
|
||||||
@ -418,18 +418,25 @@ where
|
|||||||
mod tests {
|
mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
use serde_derive::Serialize;
|
||||||
use tokio_timer::sleep;
|
use tokio_timer::sleep;
|
||||||
|
|
||||||
use crate::http::{Method, StatusCode};
|
use crate::http::{Method, StatusCode};
|
||||||
use crate::test::{call_service, init_service, TestRequest};
|
use crate::test::{call_service, init_service, read_body, TestRequest};
|
||||||
use crate::{error, web, App, HttpResponse};
|
use crate::{error, web, App, HttpResponse};
|
||||||
|
|
||||||
|
#[derive(Serialize, PartialEq, Debug)]
|
||||||
|
struct MyObject {
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_route() {
|
fn test_route() {
|
||||||
let mut srv =
|
let mut srv = init_service(
|
||||||
init_service(
|
App::new()
|
||||||
App::new().service(
|
.service(
|
||||||
web::resource("/test")
|
web::resource("/test")
|
||||||
.route(web::get().to(|| HttpResponse::Ok()))
|
.route(web::get().to(|| HttpResponse::Ok()))
|
||||||
.route(web::put().to(|| {
|
.route(web::put().to(|| {
|
||||||
@ -444,7 +451,14 @@ mod tests {
|
|||||||
Err::<HttpResponse, _>(error::ErrorBadRequest("err"))
|
Err::<HttpResponse, _>(error::ErrorBadRequest("err"))
|
||||||
})
|
})
|
||||||
})),
|
})),
|
||||||
),
|
)
|
||||||
|
.service(web::resource("/json").route(web::get().to_async(|| {
|
||||||
|
sleep(Duration::from_millis(25)).then(|_| {
|
||||||
|
Ok::<_, crate::Error>(web::Json(MyObject {
|
||||||
|
name: "test".to_string(),
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
}))),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::with_uri("/test")
|
let req = TestRequest::with_uri("/test")
|
||||||
@ -476,5 +490,12 @@ mod tests {
|
|||||||
.to_request();
|
.to_request();
|
||||||
let resp = call_service(&mut srv, req);
|
let resp = call_service(&mut srv, req);
|
||||||
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
|
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
|
||||||
|
|
||||||
|
let req = TestRequest::with_uri("/json").to_request();
|
||||||
|
let resp = call_service(&mut srv, req);
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let body = read_body(resp);
|
||||||
|
assert_eq!(body, Bytes::from_static(b"{\"name\":\"test\"}"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
40
src/test.rs
40
src/test.rs
@ -193,6 +193,46 @@ where
|
|||||||
.unwrap_or_else(|_| panic!("read_response failed at block_on unwrap"))
|
.unwrap_or_else(|_| panic!("read_response failed at block_on unwrap"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Helper function that returns a response body of a ServiceResponse.
|
||||||
|
/// This function blocks the current thread until futures complete.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use actix_web::{test, web, App, HttpResponse, http::header};
|
||||||
|
/// use bytes::Bytes;
|
||||||
|
///
|
||||||
|
/// #[test]
|
||||||
|
/// fn test_index() {
|
||||||
|
/// let mut app = test::init_service(
|
||||||
|
/// App::new().service(
|
||||||
|
/// web::resource("/index.html")
|
||||||
|
/// .route(web::post().to(
|
||||||
|
/// || HttpResponse::Ok().body("welcome!")))));
|
||||||
|
///
|
||||||
|
/// let req = test::TestRequest::post()
|
||||||
|
/// .uri("/index.html")
|
||||||
|
/// .header(header::CONTENT_TYPE, "application/json")
|
||||||
|
/// .to_request();
|
||||||
|
///
|
||||||
|
/// let resp = call_service(&mut srv, req);
|
||||||
|
/// let result = test::read_body(resp);
|
||||||
|
/// assert_eq!(result, Bytes::from_static(b"welcome!"));
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
pub fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes
|
||||||
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
{
|
||||||
|
block_on(run_on(move || {
|
||||||
|
res.take_body()
|
||||||
|
.fold(BytesMut::new(), move |mut body, chunk| {
|
||||||
|
body.extend_from_slice(&chunk);
|
||||||
|
Ok::<_, Error>(body)
|
||||||
|
})
|
||||||
|
.map(|body: BytesMut| body.freeze())
|
||||||
|
}))
|
||||||
|
.unwrap_or_else(|_| panic!("read_response failed at block_on unwrap"))
|
||||||
|
}
|
||||||
|
|
||||||
/// Helper function that returns a deserialized response body of a TestRequest
|
/// Helper function that returns a deserialized response body of a TestRequest
|
||||||
/// This function blocks the current thread until futures complete.
|
/// This function blocks the current thread until futures complete.
|
||||||
///
|
///
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
//! Essentials helper functions and types for application registration.
|
//! Essentials helper functions and types for application registration.
|
||||||
use actix_http::{http::Method, Response};
|
use actix_http::http::Method;
|
||||||
use futures::{Future, IntoFuture};
|
use futures::{Future, IntoFuture};
|
||||||
|
|
||||||
pub use actix_http::Response as HttpResponse;
|
pub use actix_http::Response as HttpResponse;
|
||||||
@ -268,7 +268,7 @@ where
|
|||||||
F: AsyncFactory<I, R>,
|
F: AsyncFactory<I, R>,
|
||||||
I: FromRequest + 'static,
|
I: FromRequest + 'static,
|
||||||
R: IntoFuture + 'static,
|
R: IntoFuture + 'static,
|
||||||
R::Item: Into<Response>,
|
R::Item: Responder,
|
||||||
R::Error: Into<Error>,
|
R::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
Route::new().to_async(handler)
|
Route::new().to_async(handler)
|
||||||
|
@ -124,6 +124,7 @@ impl TestServer {
|
|||||||
|e| log::error!("Can not set alpn protocol: {:?}", e),
|
|e| log::error!("Can not set alpn protocol: {:?}", e),
|
||||||
);
|
);
|
||||||
Connector::new()
|
Connector::new()
|
||||||
|
.conn_lifetime(time::Duration::from_secs(0))
|
||||||
.timeout(time::Duration::from_millis(500))
|
.timeout(time::Duration::from_millis(500))
|
||||||
.ssl(builder.build())
|
.ssl(builder.build())
|
||||||
.finish()
|
.finish()
|
||||||
@ -131,6 +132,7 @@ impl TestServer {
|
|||||||
#[cfg(not(feature = "ssl"))]
|
#[cfg(not(feature = "ssl"))]
|
||||||
{
|
{
|
||||||
Connector::new()
|
Connector::new()
|
||||||
|
.conn_lifetime(time::Duration::from_secs(0))
|
||||||
.timeout(time::Duration::from_millis(500))
|
.timeout(time::Duration::from_millis(500))
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
@ -163,6 +165,15 @@ impl TestServerRuntime {
|
|||||||
self.rt.block_on(fut)
|
self.rt.block_on(fut)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Execute future on current core
|
||||||
|
pub fn block_on_fn<F, R>(&mut self, f: F) -> Result<R::Item, R::Error>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
R: Future,
|
||||||
|
{
|
||||||
|
self.rt.block_on(lazy(|| f()))
|
||||||
|
}
|
||||||
|
|
||||||
/// Execute function on current core
|
/// Execute function on current core
|
||||||
pub fn execute<F, R>(&mut self, fut: F) -> R
|
pub fn execute<F, R>(&mut self, fut: F) -> R
|
||||||
where
|
where
|
||||||
|
Reference in New Issue
Block a user