1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-05 18:35:22 +02:00

Compare commits

..

23 Commits

Author SHA1 Message Date
3b3dbb4f40 add raw services support 2019-04-24 15:29:15 -07:00
7300002226 grammar fixes (#796) 2019-04-24 13:21:42 -07:00
5426413cb6 update dependencies 2019-04-24 13:00:30 -07:00
2bc937f6c3 prepare release 2019-04-24 12:50:44 -07:00
60fa0d5427 Store visit and login timestamp in the identity cookie (#502)
This allows to verify time of login or last visit and therfore limiting
the danger of leaked cookies.
2019-04-24 12:49:56 -07:00
f429d3319f Read until eof for http/1.0 responses #771 2019-04-24 11:57:40 -07:00
2e19f572ee add tests for camel case headers rendering 2019-04-24 11:27:57 -07:00
64f603b076 Support to set header names of ClientRequest as Camel-Case (#713)
* Support to set header names of `ClientRequest` as Camel-Case

This is the case for supporting to request for servers which don't
perfectly implement the `RFC 7230`. It is important for an app
which uses `ClientRequest` as core part.

* Add field `upper_camel_case_headers` to `ClientRequest`.

* Add function `set_upper_camel_case_headers` to `ClientRequest`
  and `ClientRequestBuilder` to set field `upper_camel_case_headers`.

* Add trait `client::writer::UpperCamelCaseHeader` for
  `http::header::HeaderName`, let it can be converted to Camel-Case
  then writed to buffer.

* Add test `test_client::test_upper_camel_case_headers`.

* Support upper Camel-Case headers

* [actix-http] Add field `upper_camel_case_headers` for `RequestHead`
* [actix-http] Add code for `MessageType` to support upper camel case
* [awc] Add functions for `ClientRequest` to set upper camel case

* Use `Flags::CAMEL_CASE` for upper camel case of headers
2019-04-24 10:48:49 -07:00
679d1cd513 allow to override responder's status code and headers 2019-04-24 10:25:46 -07:00
42644dac3f prepare actix-http-test release 2019-04-24 07:31:33 -07:00
898ef57080 Fix async web::Data factory handling 2019-04-23 21:21:49 -07:00
9702b2d88e add client h2 reuse test 2019-04-23 15:06:30 -07:00
d2b0afd859 Fix http client pool and wait queue management 2019-04-23 14:57:03 -07:00
5f6a1a8249 update version 2019-04-23 09:45:39 -07:00
5d531989e7 Fix BorrowMutError panic in client connector #793 2019-04-23 09:42:19 -07:00
3532602299 Added support for remainder match (i.e /path/{tail}*) 2019-04-22 21:22:17 -07:00
48bee55087 .to_async() handler can return Responder type #792 2019-04-22 14:22:08 -07:00
d00c9bb844 do not consume boundary 2019-04-21 16:14:09 -07:00
895e409d57 Optimize multipart handling #634, #769 2019-04-21 15:41:01 -07:00
f0789aad05 update dep versions 2019-04-21 09:03:46 -07:00
7e480ab2f7 beta.1 release 2019-04-20 21:16:51 -07:00
891f857547 update changes 2019-04-20 11:18:04 -07:00
01b1350dcc update versions 2019-04-19 18:16:01 -07:00
41 changed files with 1980 additions and 252 deletions

View File

@ -1,6 +1,27 @@
# Changes
## [1.0.0-beta.1] - 2019-04-xx
## [1.0.0-beta.2] - 2019-04-24
### Added
* Add raw services support via `web::service()`
* Add helper functions for reading response body `test::read_body()`
* Add support for `remainder match` (i.e "/path/{tail}*")
* Extend `Responder` trait, allow to override status code and headers.
### Changed
* `.to_async()` handler can return `Responder` type #792
### Fixed
* Fix async web::Data factory handling
## [1.0.0-beta.1] - 2019-04-20
### Added
@ -9,6 +30,8 @@
* Add `.peer_addr()` #744
* Add `NormalizePath` middleware
### Changed
* Rename `RouterConfig` to `ServiceConfig`

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "1.0.0-beta.1"
version = "1.0.0-beta.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
readme = "README.md"
@ -68,14 +68,15 @@ rust-tls = ["rustls", "actix-server/rust-tls"]
actix-codec = "0.1.2"
actix-service = "0.3.6"
actix-utils = "0.3.4"
actix-router = "0.1.2"
actix-router = "0.1.3"
actix-rt = "0.2.2"
actix-web-codegen = "0.1.0-alpha.6"
actix-http = { version = "0.1.0", features=["fail"] }
actix-web-codegen = "0.1.0-beta.1"
actix-http = { version = "0.1.4", features=["fail"] }
actix-server = "0.4.3"
actix-server-config = "0.1.1"
actix-threadpool = "0.1.0"
awc = { version = "0.1.0", optional = true }
actix = { version = "0.8.1", features=["http"], optional = true }
awc = { version = "0.1.1", optional = true }
bytes = "0.4"
derive_more = "0.14"
@ -98,9 +99,9 @@ openssl = { version="0.10", optional = true }
rustls = { version = "^0.15", optional = true }
[dev-dependencies]
actix-http = { version = "0.1.0", features=["ssl", "brotli", "flate2-zlib"] }
actix-http-test = { version = "0.1.0", features=["ssl"] }
actix-files = { version = "0.1.0-alpha.6" }
actix-http = { version = "0.1.4", features=["ssl", "brotli", "flate2-zlib"] }
actix-http-test = { version = "0.1.1", features=["ssl"] }
actix-files = { version = "0.1.0-beta.1" }
rand = "0.6"
env_logger = "0.6"
serde_derive = "1.0"

View File

@ -1,5 +1,9 @@
# Changes
## [0.1.0-beta.1] - 2019-04-20
* Update actix-web to beta.1
## [0.1.0-alpha.6] - 2019-04-14
* Update actix-web to alpha6

View File

@ -1,6 +1,6 @@
[package]
name = "actix-files"
version = "0.1.0-alpha.6"
version = "0.1.0-beta.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Static files support for actix web."
readme = "README.md"
@ -18,7 +18,7 @@ name = "actix_files"
path = "src/lib.rs"
[dependencies]
actix-web = "1.0.0-alpha.6"
actix-web = "1.0.0-beta.1"
actix-service = "0.3.4"
bitflags = "1"
bytes = "0.4"
@ -31,4 +31,4 @@ percent-encoding = "1.0"
v_htmlescape = "0.4"
[dev-dependencies]
actix-web = { version = "1.0.0-alpha.6", features=["ssl"] }
actix-web = { version = "1.0.0-beta.1", features=["ssl"] }

View File

@ -1,5 +1,32 @@
# Changes
## [0.1.4] - 2019-04-24
### Added
* Allow to render h1 request headers in `Camel-Case`
### Fixed
* Read until eof for http/1.0 responses #771
## [0.1.3] - 2019-04-23
### Fixed
* Fix http client pool management
* Fix http client wait queue management #794
## [0.1.2] - 2019-04-23
### Fixed
* Fix BorrowMutError panic in client connector #793
## [0.1.1] - 2019-04-19
### Changes

View File

@ -1,6 +1,6 @@
[package]
name = "actix-http"
version = "0.1.1"
version = "0.1.4"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http primitives"
readme = "README.md"

View File

@ -114,7 +114,8 @@ where
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U>,
Error = actix_connect::ConnectError,
> + Clone,
> + Clone
+ 'static,
{
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
/// Set to 1 second by default.
@ -284,7 +285,9 @@ mod connect_impl {
pub(crate) struct InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
pub(crate) tcp_pool: ConnectionPool<T, Io>,
}
@ -293,7 +296,8 @@ mod connect_impl {
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone,
+ Clone
+ 'static,
{
fn clone(&self) -> Self {
InnerConnector {
@ -305,7 +309,9 @@ mod connect_impl {
impl<T, Io> Service for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
type Request = Connect;
type Response = IoConnection<Io>;
@ -356,9 +362,11 @@ mod connect_impl {
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone,
+ Clone
+ 'static,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone,
+ Clone
+ 'static,
{
fn clone(&self) -> Self {
InnerConnector {
@ -372,8 +380,12 @@ mod connect_impl {
where
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T1: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
T2: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
type Request = Connect;
type Response = EitherConnection<Io1, Io2>;
@ -409,7 +421,9 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2>
where
Io1: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
fut: <ConnectionPool<T, Io1> as Service>::Future,
_t: PhantomData<Io2>,
@ -417,7 +431,9 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
where
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io1, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{
@ -435,7 +451,9 @@ mod connect_impl {
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
where
Io2: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
fut: <ConnectionPool<T, Io2> as Service>::Future,
_t: PhantomData<Io1>,
@ -443,7 +461,9 @@ mod connect_impl {
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
where
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io2, Protocol), Error = ConnectError>
+ Clone
+ 'static,
Io1: AsyncRead + AsyncWrite + 'static,
Io2: AsyncRead + AsyncWrite + 'static,
{

View File

@ -274,7 +274,7 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
Ok(Async::Ready(Some(chunk)))
} else {
let framed = self.framed.take().unwrap();
let force_close = framed.get_codec().keepalive();
let force_close = !framed.get_codec().keepalive();
release_connection(framed, force_close);
Ok(Async::Ready(None))
}

View File

@ -49,7 +49,9 @@ pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
impl<T, Io> ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
pub(crate) fn new(
connector: T,
@ -69,7 +71,7 @@ where
waiters: Slab::new(),
waiters_queue: IndexSet::new(),
available: HashMap::new(),
task: AtomicTask::new(),
task: None,
})),
)
}
@ -88,7 +90,9 @@ where
impl<T, Io> Service for ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
+ Clone
+ 'static,
{
type Request = Connect;
type Response = IoConnection<Io>;
@ -113,31 +117,41 @@ where
match self.1.as_ref().borrow_mut().acquire(&key) {
Acquire::Acquired(io, created) => {
// use existing connection
Either::A(ok(IoConnection::new(
return Either::A(ok(IoConnection::new(
io,
created,
Some(Acquired(key, Some(self.1.clone()))),
)))
}
Acquire::NotAvailable => {
// connection is not available, wait
let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req);
Either::B(Either::A(WaitForConnection {
rx,
key,
token,
inner: Some(self.1.clone()),
}))
)));
}
Acquire::Available => {
// open new connection
Either::B(Either::B(OpenConnection::new(
return Either::B(Either::B(OpenConnection::new(
key,
self.1.clone(),
self.0.call(req),
)))
)));
}
_ => (),
}
// connection is not available, wait
let (rx, token, support) = self.1.as_ref().borrow_mut().wait_for(req);
// start support future
if !support {
self.1.as_ref().borrow_mut().task = Some(AtomicTask::new());
tokio_current_thread::spawn(ConnectorPoolSupport {
connector: self.0.clone(),
inner: self.1.clone(),
})
}
Either::B(Either::A(WaitForConnection {
rx,
key,
token,
inner: Some(self.1.clone()),
}))
}
}
@ -245,7 +259,7 @@ where
Ok(Async::Ready(IoConnection::new(
ConnectionType::H2(snd),
Instant::now(),
Some(Acquired(self.key.clone(), self.inner.clone())),
Some(Acquired(self.key.clone(), self.inner.take())),
)))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
@ -256,12 +270,11 @@ where
match self.fut.poll() {
Err(err) => Err(err),
Ok(Async::Ready((io, proto))) => {
let _ = self.inner.take();
if proto == Protocol::Http1 {
Ok(Async::Ready(IoConnection::new(
ConnectionType::H1(io),
Instant::now(),
Some(Acquired(self.key.clone(), self.inner.clone())),
Some(Acquired(self.key.clone(), self.inner.take())),
)))
} else {
self.h2 = Some(handshake(io));
@ -279,7 +292,6 @@ enum Acquire<T> {
NotAvailable,
}
// #[derive(Debug)]
struct AvailableConnection<Io> {
io: ConnectionType<Io>,
used: Instant,
@ -298,7 +310,7 @@ pub(crate) struct Inner<Io> {
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
)>,
waiters_queue: IndexSet<(Key, usize)>,
task: AtomicTask,
task: Option<AtomicTask>,
}
impl<Io> Inner<Io> {
@ -314,18 +326,6 @@ impl<Io> Inner<Io> {
self.waiters.remove(token);
self.waiters_queue.remove(&(key.clone(), token));
}
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
self.acquired -= 1;
self.available
.entry(key.clone())
.or_insert_with(VecDeque::new)
.push_back(AvailableConnection {
io,
created,
used: Instant::now(),
});
}
}
impl<Io> Inner<Io>
@ -339,6 +339,7 @@ where
) -> (
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
usize,
bool,
) {
let (tx, rx) = oneshot::channel();
@ -346,8 +347,9 @@ where
let entry = self.waiters.vacant_entry();
let token = entry.key();
entry.insert((connect, tx));
assert!(!self.waiters_queue.insert((key, token)));
(rx, token)
assert!(self.waiters_queue.insert((key, token)));
(rx, token, self.task.is_some())
}
fn acquire(&mut self, key: &Key) -> Acquire<Io> {
@ -400,6 +402,19 @@ where
Acquire::Available
}
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
self.acquired -= 1;
self.available
.entry(key.clone())
.or_insert_with(VecDeque::new)
.push_back(AvailableConnection {
io,
created,
used: Instant::now(),
});
self.check_availibility();
}
fn release_close(&mut self, io: ConnectionType<Io>) {
self.acquired -= 1;
if let Some(timeout) = self.disconnect_timeout {
@ -407,11 +422,12 @@ where
tokio_current_thread::spawn(CloseConnection::new(io, timeout))
}
}
self.check_availibility();
}
fn check_availibility(&self) {
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
self.task.notify()
self.task.as_ref().map(|t| t.notify());
}
}
}
@ -451,6 +467,147 @@ where
}
}
struct ConnectorPoolSupport<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
connector: T,
inner: Rc<RefCell<Inner<Io>>>,
}
impl<T, Io> Future for ConnectorPoolSupport<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>,
T::Future: 'static,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut inner = self.inner.as_ref().borrow_mut();
inner.task.as_ref().unwrap().register();
// check waiters
loop {
let (key, token) = {
if let Some((key, token)) = inner.waiters_queue.get_index(0) {
(key.clone(), *token)
} else {
break;
}
};
match inner.acquire(&key) {
Acquire::NotAvailable => break,
Acquire::Acquired(io, created) => {
let (_, tx) = inner.waiters.remove(token);
if let Err(conn) = tx.send(Ok(IoConnection::new(
io,
created,
Some(Acquired(key.clone(), Some(self.inner.clone()))),
))) {
let (io, created) = conn.unwrap().into_inner();
inner.release_conn(&key, io, created);
}
}
Acquire::Available => {
let (connect, tx) = inner.waiters.remove(token);
OpenWaitingConnection::spawn(
key.clone(),
tx,
self.inner.clone(),
self.connector.call(connect),
);
}
}
let _ = inner.waiters_queue.swap_remove_index(0);
}
Ok(Async::NotReady)
}
}
struct OpenWaitingConnection<F, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
fut: F,
key: Key,
h2: Option<Handshake<Io, Bytes>>,
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
inner: Option<Rc<RefCell<Inner<Io>>>>,
}
impl<F, Io> OpenWaitingConnection<F, Io>
where
F: Future<Item = (Io, Protocol), Error = ConnectError> + 'static,
Io: AsyncRead + AsyncWrite + 'static,
{
fn spawn(
key: Key,
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
inner: Rc<RefCell<Inner<Io>>>,
fut: F,
) {
tokio_current_thread::spawn(OpenWaitingConnection {
key,
fut,
h2: None,
rx: Some(rx),
inner: Some(inner),
})
}
}
impl<F, Io> Drop for OpenWaitingConnection<F, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let mut inner = inner.as_ref().borrow_mut();
inner.release();
inner.check_availibility();
}
}
}
impl<F, Io> Future for OpenWaitingConnection<F, Io>
where
F: Future<Item = (Io, Protocol), Error = ConnectError>,
Io: AsyncRead + AsyncWrite,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Err(err) => {
let _ = self.inner.take();
if let Some(rx) = self.rx.take() {
let _ = rx.send(Err(err));
}
Err(())
}
Ok(Async::Ready((io, proto))) => {
if proto == Protocol::Http1 {
let rx = self.rx.take().unwrap();
let _ = rx.send(Ok(IoConnection::new(
ConnectionType::H1(io),
Instant::now(),
Some(Acquired(self.key.clone(), self.inner.take())),
)));
Ok(Async::Ready(()))
} else {
self.h2 = Some(handshake(io));
self.poll()
}
}
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
impl<T> Acquired<T>

View File

@ -300,7 +300,13 @@ impl MessageType for ResponseHead {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
} else {
PayloadType::None
// for HTTP/1.0 read to eof and close connection
if msg.version == Version::HTTP_10 {
msg.set_connection_type(ConnectionType::Close);
PayloadType::Payload(PayloadDecoder::eof())
} else {
PayloadType::None
}
};
Ok(Some((msg, decoder)))
@ -331,7 +337,7 @@ impl HeaderIndex {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
/// Http payload item
pub enum PayloadItem {
Chunk(Bytes),
@ -1191,4 +1197,16 @@ mod tests {
let msg = pl.decode(&mut buf).unwrap().unwrap();
assert!(msg.eof());
}
#[test]
fn test_response_http10_read_until_eof() {
let mut buf = BytesMut::from(&"HTTP/1.0 200 Ok\r\n\r\ntest data"[..]);
let mut reader = MessageDecoder::<ResponseHead>::default();
let (_msg, pl) = reader.decode(&mut buf).unwrap().unwrap();
let mut pl = pl.unwrap();
let chunk = pl.decode(&mut buf).unwrap().unwrap();
assert_eq!(chunk, PayloadItem::Chunk(Bytes::from_static(b"test data")));
}
}

View File

@ -43,6 +43,10 @@ pub(crate) trait MessageType: Sized {
fn headers(&self) -> &HeaderMap;
fn camel_case(&self) -> bool {
false
}
fn chunked(&self) -> bool;
fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()>;
@ -57,6 +61,7 @@ pub(crate) trait MessageType: Sized {
) -> io::Result<()> {
let chunked = self.chunked();
let mut skip_len = length != BodySize::Stream;
let camel_case = self.camel_case();
// Content length
if let Some(status) = self.status() {
@ -74,18 +79,30 @@ pub(crate) trait MessageType: Sized {
match length {
BodySize::Stream => {
if chunked {
dst.put_slice(b"\r\ntransfer-encoding: chunked\r\n")
if camel_case {
dst.put_slice(b"\r\nTransfer-Encoding: chunked\r\n")
} else {
dst.put_slice(b"\r\nTransfer-Encoding: chunked\r\n")
}
} else {
skip_len = false;
dst.put_slice(b"\r\n");
}
}
BodySize::Empty => {
dst.put_slice(b"\r\ncontent-length: 0\r\n");
if camel_case {
dst.put_slice(b"\r\nContent-Length: 0\r\n");
} else {
dst.put_slice(b"\r\ncontent-length: 0\r\n");
}
}
BodySize::Sized(len) => helpers::write_content_length(len, dst),
BodySize::Sized64(len) => {
dst.put_slice(b"\r\ncontent-length: ");
if camel_case {
dst.put_slice(b"\r\nContent-Length: ");
} else {
dst.put_slice(b"\r\ncontent-length: ");
}
write!(dst.writer(), "{}\r\n", len)?;
}
BodySize::None => dst.put_slice(b"\r\n"),
@ -95,10 +112,18 @@ pub(crate) trait MessageType: Sized {
match ctype {
ConnectionType::Upgrade => dst.put_slice(b"connection: upgrade\r\n"),
ConnectionType::KeepAlive if version < Version::HTTP_11 => {
dst.put_slice(b"connection: keep-alive\r\n")
if camel_case {
dst.put_slice(b"Connection: keep-alive\r\n")
} else {
dst.put_slice(b"connection: keep-alive\r\n")
}
}
ConnectionType::Close if version >= Version::HTTP_11 => {
dst.put_slice(b"connection: close\r\n")
if camel_case {
dst.put_slice(b"Connection: close\r\n")
} else {
dst.put_slice(b"connection: close\r\n")
}
}
_ => (),
}
@ -133,7 +158,12 @@ pub(crate) trait MessageType: Sized {
buf = &mut *(dst.bytes_mut() as *mut _);
}
}
buf[pos..pos + k.len()].copy_from_slice(k);
// use upper Camel-Case
if camel_case {
write_camel_case(k, &mut buf[pos..pos + k.len()]);
} else {
buf[pos..pos + k.len()].copy_from_slice(k);
}
pos += k.len();
buf[pos..pos + 2].copy_from_slice(b": ");
pos += 2;
@ -158,7 +188,12 @@ pub(crate) trait MessageType: Sized {
buf = &mut *(dst.bytes_mut() as *mut _);
}
}
buf[pos..pos + k.len()].copy_from_slice(k);
// use upper Camel-Case
if camel_case {
write_camel_case(k, &mut buf[pos..pos + k.len()]);
} else {
buf[pos..pos + k.len()].copy_from_slice(k);
}
pos += k.len();
buf[pos..pos + 2].copy_from_slice(b": ");
pos += 2;
@ -221,6 +256,10 @@ impl MessageType for RequestHead {
self.chunked()
}
fn camel_case(&self) -> bool {
RequestHead::camel_case_headers(self)
}
fn headers(&self) -> &HeaderMap {
&self.headers
}
@ -418,11 +457,41 @@ impl<'a> io::Write for Writer<'a> {
}
}
fn write_camel_case(value: &[u8], buffer: &mut [u8]) {
let mut index = 0;
let key = value;
let mut key_iter = key.iter();
if let Some(c) = key_iter.next() {
if *c >= b'a' && *c <= b'z' {
buffer[index] = *c ^ b' ';
index += 1;
}
} else {
return;
}
while let Some(c) = key_iter.next() {
buffer[index] = *c;
index += 1;
if *c == b'-' {
if let Some(c) = key_iter.next() {
if *c >= b'a' && *c <= b'z' {
buffer[index] = *c ^ b' ';
index += 1;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use super::*;
use crate::http::header::{HeaderValue, CONTENT_TYPE};
#[test]
fn test_chunked_te() {
let mut bytes = BytesMut::new();
@ -436,4 +505,64 @@ mod tests {
Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n")
);
}
#[test]
fn test_camel_case() {
let mut bytes = BytesMut::with_capacity(2048);
let mut head = RequestHead::default();
head.set_camel_case_headers(true);
head.headers.insert(DATE, HeaderValue::from_static("date"));
head.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("plain/text"));
let _ = head.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Empty,
ConnectionType::Close,
&ServiceConfig::default(),
);
assert_eq!(
bytes.take().freeze(),
Bytes::from_static(b"\r\nContent-Length: 0\r\nConnection: close\r\nDate: date\r\nContent-Type: plain/text\r\n\r\n")
);
let _ = head.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Stream,
ConnectionType::KeepAlive,
&ServiceConfig::default(),
);
assert_eq!(
bytes.take().freeze(),
Bytes::from_static(b"\r\nTransfer-Encoding: chunked\r\nDate: date\r\nContent-Type: plain/text\r\n\r\n")
);
let _ = head.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Sized64(100),
ConnectionType::KeepAlive,
&ServiceConfig::default(),
);
assert_eq!(
bytes.take().freeze(),
Bytes::from_static(b"\r\nContent-Length: 100\r\nDate: date\r\nContent-Type: plain/text\r\n\r\n")
);
head.headers
.append(CONTENT_TYPE, HeaderValue::from_static("xml"));
let _ = head.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Stream,
ConnectionType::KeepAlive,
&ServiceConfig::default(),
);
assert_eq!(
bytes.take().freeze(),
Bytes::from_static(b"\r\nTransfer-Encoding: chunked\r\nDate: date\r\nContent-Type: xml\r\nContent-Type: plain/text\r\n\r\n")
);
}
}

View File

@ -27,6 +27,7 @@ bitflags! {
const UPGRADE = 0b0000_0100;
const EXPECT = 0b0000_1000;
const NO_CHUNKING = 0b0001_0000;
const CAMEL_CASE = 0b0010_0000;
}
}
@ -97,6 +98,23 @@ impl RequestHead {
&mut self.headers
}
/// Is to uppercase headers with Camel-Case.
/// Befault is `false`
#[inline]
pub fn camel_case_headers(&self) -> bool {
self.flags.contains(Flags::CAMEL_CASE)
}
/// Set `true` to send headers which are uppercased with Camel-Case.
#[inline]
pub fn set_camel_case_headers(&mut self, val: bool) {
if val {
self.flags.insert(Flags::CAMEL_CASE);
} else {
self.flags.remove(Flags::CAMEL_CASE);
}
}
#[inline]
/// Set connection type of the message
pub fn set_connection_type(&mut self, ctype: ConnectionType) {

View File

@ -59,9 +59,7 @@ fn test_connection_close() {
.finish(|_| ok::<_, ()>(Response::Ok().body(STR)))
.map(|_| ())
});
println!("REQ: {:?}", srv.get("/").force_close());
let response = srv.block_on(srv.get("/").force_close().send()).unwrap();
println!("RES: {:?}", response);
assert!(response.status().is_success());
}

View File

@ -1,7 +1,9 @@
# Changes
## [0.1.0-alpha.1] - 2019-04-xx
## [0.1.0-beta.1] - 2019-04-21
* Do not support nested multipart
* Split multipart support to separate crate
* Optimize multipart handling #634, #769

View File

@ -1,6 +1,6 @@
[package]
name = "actix-multipart"
version = "0.1.0-alpha.1"
version = "0.1.0-beta.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Multipart support for actix web framework."
readme = "README.md"
@ -18,8 +18,8 @@ name = "actix_multipart"
path = "src/lib.rs"
[dependencies]
actix-web = "1.0.0-alpha.6"
actix-service = "0.3.4"
actix-web = "1.0.0-beta.1"
actix-service = "0.3.6"
bytes = "0.4"
derive_more = "0.14"
httparse = "1.3"
@ -31,4 +31,4 @@ twoway = "0.2"
[dev-dependencies]
actix-rt = "0.2.2"
actix-http = "0.1.0"
actix-http = "0.1.1"

View File

@ -168,7 +168,7 @@ impl InnerMultipart {
match payload.readline() {
None => {
if payload.eof {
Err(MultipartError::Incomplete)
Ok(Some(true))
} else {
Ok(None)
}
@ -201,8 +201,7 @@ impl InnerMultipart {
match payload.readline() {
Some(chunk) => {
if chunk.is_empty() {
//ValueError("Could not find starting boundary %r"
//% (self._boundary))
return Err(MultipartError::Boundary);
}
if chunk.len() < boundary.len() {
continue;
@ -505,47 +504,72 @@ impl InnerField {
payload: &mut PayloadBuffer,
boundary: &str,
) -> Poll<Option<Bytes>, MultipartError> {
match payload.read_until(b"\r") {
None => {
if payload.eof {
Err(MultipartError::Incomplete)
let mut pos = 0;
let len = payload.buf.len();
if len == 0 {
return Ok(Async::NotReady);
}
// check boundary
if len > 4 && payload.buf[0] == b'\r' {
let b_len = if &payload.buf[..2] == b"\r\n" && &payload.buf[2..4] == b"--" {
Some(4)
} else if &payload.buf[1..3] == b"--" {
Some(3)
} else {
None
};
if let Some(b_len) = b_len {
let b_size = boundary.len() + b_len;
if len < b_size {
return Ok(Async::NotReady);
} else {
Ok(Async::NotReady)
if &payload.buf[b_len..b_size] == boundary.as_bytes() {
// found boundary
return Ok(Async::Ready(None));
} else {
pos = b_size;
}
}
}
Some(mut chunk) => {
if chunk.len() == 1 {
payload.unprocessed(chunk);
match payload.read_exact(boundary.len() + 4) {
None => {
if payload.eof {
Err(MultipartError::Incomplete)
} else {
Ok(Async::NotReady)
}
}
Some(mut chunk) => {
if &chunk[..2] == b"\r\n"
&& &chunk[2..4] == b"--"
&& &chunk[4..] == boundary.as_bytes()
{
payload.unprocessed(chunk);
Ok(Async::Ready(None))
} else {
// \r might be part of data stream
let ch = chunk.split_to(1);
payload.unprocessed(chunk);
Ok(Async::Ready(Some(ch)))
}
}
}
loop {
return if let Some(idx) = twoway::find_bytes(&payload.buf[pos..], b"\r") {
let cur = pos + idx;
// check if we have enough data for boundary detection
if cur + 4 > len {
if cur > 0 {
Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze())))
} else {
Ok(Async::NotReady)
}
} else {
let to = chunk.len() - 1;
let ch = chunk.split_to(to);
payload.unprocessed(chunk);
Ok(Async::Ready(Some(ch)))
// check boundary
if (&payload.buf[cur..cur + 2] == b"\r\n"
&& &payload.buf[cur + 2..cur + 4] == b"--")
|| (&payload.buf[cur..cur + 1] == b"\r"
&& &payload.buf[cur + 1..cur + 3] == b"--")
{
if cur != 0 {
// return buffer
Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze())))
} else {
pos = cur + 1;
continue;
}
} else {
// not boundary
pos = cur + 1;
continue;
}
}
}
} else {
return Ok(Async::Ready(Some(payload.buf.take().freeze())));
};
}
}
@ -555,26 +579,27 @@ impl InnerField {
}
let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) {
let res = if let Some(ref mut len) = self.length {
InnerField::read_len(payload, len)?
} else {
InnerField::read_stream(payload, &self.boundary)?
};
if !self.eof {
let res = if let Some(ref mut len) = self.length {
InnerField::read_len(payload, len)?
} else {
InnerField::read_stream(payload, &self.boundary)?
};
match res {
Async::NotReady => Async::NotReady,
Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)),
Async::Ready(None) => {
self.eof = true;
match payload.readline() {
None => Async::Ready(None),
Some(line) => {
if line.as_ref() != b"\r\n" {
log::warn!("multipart field did not read all the data or it is malformed");
}
Async::Ready(None)
}
match res {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(Some(bytes)) => return Ok(Async::Ready(Some(bytes))),
Async::Ready(None) => self.eof = true,
}
}
match payload.readline() {
None => Async::Ready(None),
Some(line) => {
if line.as_ref() != b"\r\n" {
log::warn!("multipart field did not read all the data or it is malformed");
}
Async::Ready(None)
}
}
} else {
@ -704,7 +729,7 @@ impl PayloadBuffer {
}
/// Read exact number of bytes
#[inline]
#[cfg(test)]
fn read_exact(&mut self, size: usize) -> Option<Bytes> {
if size <= self.buf.len() {
Some(self.buf.split_to(size).freeze())
@ -875,6 +900,78 @@ mod tests {
});
}
#[test]
fn test_stream() {
run_on(|| {
let (sender, payload) = create_stream();
let bytes = Bytes::from(
"testasdadsad\r\n\
--abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
Content-Type: text/plain; charset=utf-8\r\n\r\n\
test\r\n\
--abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
Content-Type: text/plain; charset=utf-8\r\n\r\n\
data\r\n\
--abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
);
sender.unbounded_send(Ok(bytes)).unwrap();
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static(
"multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
),
);
let mut multipart = Multipart::new(&headers, payload);
match multipart.poll().unwrap() {
Async::Ready(Some(mut field)) => {
let cd = field.content_disposition().unwrap();
assert_eq!(cd.disposition, DispositionType::FormData);
assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll().unwrap() {
Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"),
_ => unreachable!(),
}
match field.poll().unwrap() {
Async::Ready(None) => (),
_ => unreachable!(),
}
}
_ => unreachable!(),
}
match multipart.poll().unwrap() {
Async::Ready(Some(mut field)) => {
assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll() {
Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "data"),
_ => unreachable!(),
}
match field.poll() {
Ok(Async::Ready(None)) => (),
_ => unreachable!(),
}
}
_ => unreachable!(),
}
match multipart.poll().unwrap() {
Async::Ready(None) => (),
_ => unreachable!(),
}
});
}
#[test]
fn test_basic() {
run_on(|| {

View File

@ -1,5 +1,8 @@
# Changes
## [0.1.0-beta.1] - 2019-04-20
* Update actix-web to beta.1
* `CookieSession::max_age()` accepts value in seconds

View File

@ -1,6 +1,6 @@
[package]
name = "actix-session"
version = "0.1.0-alpha.6"
version = "0.1.0-beta.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Session for actix web framework."
readme = "README.md"
@ -24,12 +24,12 @@ default = ["cookie-session"]
cookie-session = ["actix-web/secure-cookies"]
[dependencies]
actix-web = "1.0.0-alpha.6"
actix-web = "1.0.0-beta.1"
actix-service = "0.3.4"
bytes = "0.4"
derive_more = "0.14"
futures = "0.1.25"
hashbrown = "0.2.0"
hashbrown = "0.2.2"
serde = "1.0"
serde_json = "1.0"
time = "0.1.42"

View File

@ -19,8 +19,8 @@ path = "src/lib.rs"
[dependencies]
actix = "0.8.0"
actix-web = "1.0.0-alpha.5"
actix-http = "0.1.0"
actix-web = "1.0.0-beta.1"
actix-http = "0.1.1"
actix-codec = "0.1.2"
bytes = "0.4"
futures = "0.1.25"

View File

@ -1,5 +1,9 @@
# Changes
## [0.1.0-beta.1] - 2019-04-20
* Gen code for actix-web 1.0.0-beta.1
## [0.1.0-alpha.6] - 2019-04-14
* Gen code for actix-web 1.0.0-alpha.6

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web-codegen"
version = "0.1.0-alpha.6"
version = "0.1.0-beta.1"
description = "Actix web proc macros"
readme = "README.md"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
@ -17,6 +17,6 @@ syn = { version = "0.15", features = ["full", "parsing"] }
[dev-dependencies]
actix-web = { version = "1.0.0-alpha.6" }
actix-http = { version = "0.1.0", features=["ssl"] }
actix-http = { version = "0.1.1", features=["ssl"] }
actix-http-test = { version = "0.1.0", features=["ssl"] }
futures = { version = "0.1" }

View File

@ -1,5 +1,10 @@
# Changes
### Added
* Allow to send headers in `Camel-Case` form.
## [0.1.1] - 2019-04-19
### Added

View File

@ -38,7 +38,7 @@ flate2-rust = ["actix-http/flate2-rust"]
[dependencies]
actix-codec = "0.1.2"
actix-service = "0.3.6"
actix-http = "0.1.1"
actix-http = "0.1.4"
base64 = "0.10.1"
bytes = "0.4"
derive_more = "0.14"
@ -55,9 +55,9 @@ openssl = { version="0.10", optional = true }
[dev-dependencies]
actix-rt = "0.2.2"
actix-web = { version = "1.0.0-alpha.6", features=["ssl"] }
actix-http = { version = "0.1.1", features=["ssl"] }
actix-http-test = { version = "0.1.0", features=["ssl"] }
actix-web = { version = "1.0.0-beta.1", features=["ssl"] }
actix-http = { version = "0.1.4", features=["ssl"] }
actix-http-test = { version = "0.1.1", features=["ssl"] }
actix-utils = "0.3.4"
actix-server = { version = "0.4.3", features=["ssl"] }
brotli2 = { version="0.3.2" }

View File

@ -235,6 +235,13 @@ impl ClientRequest {
self
}
/// Send headers in `Camel-Case` form.
#[inline]
pub fn camel_case(mut self) -> Self {
self.head.set_camel_case_headers(true);
self
}
/// Force close connection instead of returning it back to connections pool.
/// This setting affect only http/1 connections.
#[inline]

View File

@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use brotli2::write::BrotliEncoder;
@ -7,12 +9,14 @@ use bytes::Bytes;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::future::Future;
use futures::Future;
use rand::Rng;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_http::HttpService;
use actix_http_test::TestServer;
use actix_web::http::Cookie;
use actix_service::{fn_service, NewService};
use actix_web::http::{Cookie, Version};
use actix_web::middleware::{BodyEncoding, Compress};
use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse};
use awc::error::SendRequestError;
@ -39,6 +43,30 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World";
#[cfg(feature = "ssl")]
fn ssl_acceptor<T: AsyncRead + AsyncWrite>(
) -> std::io::Result<actix_server::ssl::OpensslAcceptor<T, ()>> {
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder
.set_private_key_file("../tests/key.pem", SslFiletype::PEM)
.unwrap();
builder
.set_certificate_chain_file("../tests/cert.pem")
.unwrap();
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(openssl::ssl::AlpnError::NOACK)
}
});
builder.set_alpn_protos(b"\x02h2")?;
Ok(actix_server::ssl::OpensslAcceptor::new(builder.build()))
}
#[test]
fn test_simple() {
let mut srv =
@ -62,6 +90,10 @@ fn test_simple() {
// read response
let bytes = srv.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
// camel case
let response = srv.block_on(srv.post("/").camel_case().send()).unwrap();
assert!(response.status().is_success());
}
#[test]
@ -144,17 +176,249 @@ fn test_timeout_override() {
}
#[test]
fn test_connection_close() {
let mut srv = TestServer::new(|| {
HttpService::new(
App::new().service(web::resource("/").to(|| HttpResponse::Ok())),
fn test_connection_reuse() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))),
))
});
let client = awc::Client::default();
// req 1
let request = client.get(srv.url("/")).send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.url("/"));
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
// one connection
assert_eq!(num.load(Ordering::Relaxed), 1);
}
#[cfg(feature = "ssl")]
#[test]
fn test_connection_reuse_h2() {
let openssl = ssl_acceptor().unwrap();
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(
openssl
.clone()
.map_err(|e| println!("Openssl error: {}", e)),
)
.and_then(
HttpService::build()
.h2(App::new()
.service(web::resource("/").route(web::to(|| HttpResponse::Ok()))))
.map_err(|_| ()),
)
});
let res = srv
.block_on(awc::Client::new().get(srv.url("/")).force_close().send())
.unwrap();
assert!(res.status().is_success());
// disable ssl verification
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
let client = awc::Client::build()
.connector(awc::Connector::new().ssl(builder.build()).finish())
.finish();
// req 1
let request = client.get(srv.surl("/")).send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.surl("/"));
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
assert_eq!(response.version(), Version::HTTP_2);
// one connection
assert_eq!(num.load(Ordering::Relaxed), 1);
}
#[test]
fn test_connection_force_close() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))),
))
});
let client = awc::Client::default();
// req 1
let request = client.get(srv.url("/")).force_close().send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.url("/")).force_close();
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 2);
}
#[test]
fn test_connection_server_close() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(
web::resource("/")
.route(web::to(|| HttpResponse::Ok().force_close().finish())),
),
))
});
let client = awc::Client::default();
// req 1
let request = client.get(srv.url("/")).send();
let response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req = client.post(srv.url("/"));
let response = srv.block_on_fn(move || req.send()).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 2);
}
#[test]
fn test_connection_wait_queue() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(App::new().service(
web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))),
)))
});
let client = awc::Client::build()
.connector(awc::Connector::new().limit(1).finish())
.finish();
// req 1
let request = client.get(srv.url("/")).send();
let mut response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req2 = client.post(srv.url("/"));
let req2_fut = srv.execute(move || {
let mut fut = req2.send();
assert!(fut.poll().unwrap().is_not_ready());
fut
});
// read response 1
let bytes = srv.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
// req 2
let response = srv.block_on(req2_fut).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 1);
}
#[test]
fn test_connection_wait_queue_force_close() {
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let mut srv = TestServer::new(move || {
let num2 = num2.clone();
fn_service(move |io| {
num2.fetch_add(1, Ordering::Relaxed);
Ok(io)
})
.and_then(HttpService::new(
App::new().service(
web::resource("/")
.route(web::to(|| HttpResponse::Ok().force_close().body(STR))),
),
))
});
let client = awc::Client::build()
.connector(awc::Connector::new().limit(1).finish())
.finish();
// req 1
let request = client.get(srv.url("/")).send();
let mut response = srv.block_on(request).unwrap();
assert!(response.status().is_success());
// req 2
let req2 = client.post(srv.url("/"));
let req2_fut = srv.execute(move || {
let mut fut = req2.send();
assert!(fut.poll().unwrap().is_not_ready());
fut
});
// read response 1
let bytes = srv.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
// req 2
let response = srv.block_on(req2_fut).unwrap();
assert!(response.status().is_success());
// two connection
assert_eq!(num.load(Ordering::Relaxed), 2);
}
#[test]

View File

@ -431,13 +431,14 @@ where
#[cfg(test)]
mod tests {
use actix_service::Service;
use bytes::Bytes;
use futures::{Future, IntoFuture};
use super::*;
use crate::http::{header, HeaderValue, Method, StatusCode};
use crate::service::{ServiceRequest, ServiceResponse};
use crate::test::{block_on, call_service, init_service, TestRequest};
use crate::{web, Error, HttpResponse};
use crate::test::{block_on, call_service, init_service, read_body, TestRequest};
use crate::{web, Error, HttpRequest, HttpResponse};
#[test]
fn test_default_resource() {
@ -598,4 +599,26 @@ mod tests {
HeaderValue::from_static("0001")
);
}
#[test]
fn test_external_resource() {
let mut srv = init_service(
App::new()
.external_resource("youtube", "https://youtube.com/watch/{video_id}")
.route(
"/test",
web::get().to(|req: HttpRequest| {
HttpResponse::Ok().body(format!(
"{}",
req.url_for("youtube", &["12345"]).unwrap()
))
}),
),
);
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), StatusCode::OK);
let body = read_body(resp);
assert_eq!(body, Bytes::from_static(b"https://youtube.com/watch/12345"));
}
}

View File

@ -162,7 +162,7 @@ where
}
}
if self.endpoint.is_some() {
if self.endpoint.is_some() && self.data.is_empty() {
Ok(Async::Ready(AppInitService {
service: self.endpoint.take().unwrap(),
rmap: self.rmap.clone(),

View File

@ -253,11 +253,14 @@ impl ServiceConfig {
#[cfg(test)]
mod tests {
use actix_service::Service;
use bytes::Bytes;
use futures::Future;
use tokio_timer::sleep;
use super::*;
use crate::http::{Method, StatusCode};
use crate::test::{block_on, call_service, init_service, TestRequest};
use crate::{web, App, HttpResponse};
use crate::test::{block_on, call_service, init_service, read_body, TestRequest};
use crate::{web, App, HttpRequest, HttpResponse};
#[test]
fn test_data() {
@ -277,7 +280,12 @@ mod tests {
#[test]
fn test_data_factory() {
let cfg = |cfg: &mut ServiceConfig| {
cfg.data_factory(|| Ok::<_, ()>(10usize));
cfg.data_factory(|| {
sleep(std::time::Duration::from_millis(50)).then(|_| {
println!("READY");
Ok::<_, ()>(10usize)
})
});
};
let mut srv =
@ -301,6 +309,33 @@ mod tests {
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[test]
fn test_external_resource() {
let mut srv = init_service(
App::new()
.configure(|cfg| {
cfg.external_resource(
"youtube",
"https://youtube.com/watch/{video_id}",
);
})
.route(
"/test",
web::get().to(|req: HttpRequest| {
HttpResponse::Ok().body(format!(
"{}",
req.url_for("youtube", &["12345"]).unwrap()
))
}),
),
);
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), StatusCode::OK);
let body = read_body(resp);
assert_eq!(body, Bytes::from_static(b"https://youtube.com/watch/12345"));
}
#[test]
fn test_service() {
let mut srv = init_service(App::new().configure(|cfg| {

View File

@ -124,7 +124,7 @@ where
pub trait AsyncFactory<T, R>: Clone + 'static
where
R: IntoFuture,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
fn call(&self, param: T) -> R;
@ -134,7 +134,7 @@ impl<F, R> AsyncFactory<(), R> for F
where
F: Fn() -> R + Clone + 'static,
R: IntoFuture,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
fn call(&self, _: ()) -> R {
@ -147,7 +147,7 @@ pub struct AsyncHandler<F, T, R>
where
F: AsyncFactory<T, R>,
R: IntoFuture,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
hnd: F,
@ -158,7 +158,7 @@ impl<F, T, R> AsyncHandler<F, T, R>
where
F: AsyncFactory<T, R>,
R: IntoFuture,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
pub fn new(hnd: F) -> Self {
@ -173,7 +173,7 @@ impl<F, T, R> Clone for AsyncHandler<F, T, R>
where
F: AsyncFactory<T, R>,
R: IntoFuture,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
fn clone(&self) -> Self {
@ -188,7 +188,7 @@ impl<F, T, R> Service for AsyncHandler<F, T, R>
where
F: AsyncFactory<T, R>,
R: IntoFuture,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
type Request = (T, HttpRequest);
@ -203,32 +203,56 @@ where
fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future {
AsyncHandlerServiceResponse {
fut: self.hnd.call(param).into_future(),
fut2: None,
req: Some(req),
}
}
}
#[doc(hidden)]
pub struct AsyncHandlerServiceResponse<T> {
pub struct AsyncHandlerServiceResponse<T>
where
T: Future,
T::Item: Responder,
{
fut: T,
fut2: Option<<<T::Item as Responder>::Future as IntoFuture>::Future>,
req: Option<HttpRequest>,
}
impl<T> Future for AsyncHandlerServiceResponse<T>
where
T: Future,
T::Item: Into<Response>,
T::Item: Responder,
T::Error: Into<Error>,
{
type Item = ServiceResponse;
type Error = Void;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut2 {
return match fut.poll() {
Ok(Async::Ready(res)) => Ok(Async::Ready(ServiceResponse::new(
self.req.take().unwrap(),
res,
))),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
let res: Response = e.into().into();
Ok(Async::Ready(ServiceResponse::new(
self.req.take().unwrap(),
res,
)))
}
};
}
match self.fut.poll() {
Ok(Async::Ready(res)) => Ok(Async::Ready(ServiceResponse::new(
self.req.take().unwrap(),
res.into(),
))),
Ok(Async::Ready(res)) => {
self.fut2 =
Some(res.respond_to(self.req.as_ref().unwrap()).into_future());
return self.poll();
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
let res: Response = e.into().into();
@ -357,7 +381,7 @@ macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => {
impl<Func, $($T,)+ Res> AsyncFactory<($($T,)+), Res> for Func
where Func: Fn($($T,)+) -> Res + Clone + 'static,
Res: IntoFuture,
Res::Item: Into<Response>,
Res::Item: Responder,
Res::Error: Into<Error>,
{
fn call(&self, param: ($($T,)+)) -> Res {

View File

@ -136,7 +136,9 @@ pub mod dev {
pub use crate::config::{AppConfig, AppService};
pub use crate::info::ConnectionInfo;
pub use crate::rmap::ResourceMap;
pub use crate::service::{HttpServiceFactory, ServiceRequest, ServiceResponse};
pub use crate::service::{
HttpServiceFactory, ServiceRequest, ServiceResponse, WebService,
};
pub use crate::types::form::UrlEncoded;
pub use crate::types::json::JsonBody;
pub use crate::types::readlines::Readlines;

View File

@ -49,10 +49,12 @@
//! ```
use std::cell::RefCell;
use std::rc::Rc;
use std::time::SystemTime;
use actix_service::{Service, Transform};
use futures::future::{ok, Either, FutureResult};
use futures::{Future, IntoFuture, Poll};
use serde::{Deserialize, Serialize};
use time::Duration;
use crate::cookie::{Cookie, CookieJar, Key, SameSite};
@ -284,84 +286,149 @@ where
struct CookieIdentityInner {
key: Key,
key_v2: Key,
name: String,
path: String,
domain: Option<String>,
secure: bool,
max_age: Option<Duration>,
same_site: Option<SameSite>,
visit_deadline: Option<Duration>,
login_deadline: Option<Duration>,
}
#[derive(Deserialize, Serialize, Debug)]
struct CookieValue {
identity: String,
#[serde(skip_serializing_if = "Option::is_none")]
login_timestamp: Option<SystemTime>,
#[serde(skip_serializing_if = "Option::is_none")]
visit_timestamp: Option<SystemTime>,
}
#[derive(Debug)]
struct CookieIdentityExtention {
login_timestamp: Option<SystemTime>,
}
impl CookieIdentityInner {
fn new(key: &[u8]) -> CookieIdentityInner {
let key_v2: Vec<u8> =
key.iter().chain([1, 0, 0, 0].iter()).map(|e| *e).collect();
CookieIdentityInner {
key: Key::from_master(key),
key_v2: Key::from_master(&key_v2),
name: "actix-identity".to_owned(),
path: "/".to_owned(),
domain: None,
secure: true,
max_age: None,
same_site: None,
visit_deadline: None,
login_deadline: None,
}
}
fn set_cookie<B>(
&self,
resp: &mut ServiceResponse<B>,
id: Option<String>,
value: Option<CookieValue>,
) -> Result<()> {
let some = id.is_some();
{
let id = id.unwrap_or_else(String::new);
let mut cookie = Cookie::new(self.name.clone(), id);
cookie.set_path(self.path.clone());
cookie.set_secure(self.secure);
cookie.set_http_only(true);
if let Some(ref domain) = self.domain {
cookie.set_domain(domain.clone());
}
if let Some(max_age) = self.max_age {
cookie.set_max_age(max_age);
}
if let Some(same_site) = self.same_site {
cookie.set_same_site(same_site);
}
let mut jar = CookieJar::new();
if some {
jar.private(&self.key).add(cookie);
let add_cookie = value.is_some();
let val = value.map(|val| {
if !self.legacy_supported() {
serde_json::to_string(&val)
} else {
jar.add_original(cookie.clone());
jar.private(&self.key).remove(cookie);
Ok(val.identity)
}
});
let mut cookie =
Cookie::new(self.name.clone(), val.unwrap_or_else(|| Ok(String::new()))?);
cookie.set_path(self.path.clone());
cookie.set_secure(self.secure);
cookie.set_http_only(true);
for cookie in jar.delta() {
let val = HeaderValue::from_str(&cookie.to_string())?;
resp.headers_mut().append(header::SET_COOKIE, val);
}
if let Some(ref domain) = self.domain {
cookie.set_domain(domain.clone());
}
if let Some(max_age) = self.max_age {
cookie.set_max_age(max_age);
}
if let Some(same_site) = self.same_site {
cookie.set_same_site(same_site);
}
let mut jar = CookieJar::new();
let key = if self.legacy_supported() {
&self.key
} else {
&self.key_v2
};
if add_cookie {
jar.private(&key).add(cookie);
} else {
jar.add_original(cookie.clone());
jar.private(&key).remove(cookie);
}
for cookie in jar.delta() {
let val = HeaderValue::from_str(&cookie.to_string())?;
resp.headers_mut().append(header::SET_COOKIE, val);
}
Ok(())
}
fn load(&self, req: &ServiceRequest) -> Option<String> {
if let Ok(cookies) = req.cookies() {
for cookie in cookies.iter() {
if cookie.name() == self.name {
let mut jar = CookieJar::new();
jar.add_original(cookie.clone());
fn load(&self, req: &ServiceRequest) -> Option<CookieValue> {
let cookie = req.cookie(&self.name)?;
let mut jar = CookieJar::new();
jar.add_original(cookie.clone());
let res = if self.legacy_supported() {
jar.private(&self.key).get(&self.name).map(|n| CookieValue {
identity: n.value().to_string(),
login_timestamp: None,
visit_timestamp: None,
})
} else {
None
};
res.or_else(|| {
jar.private(&self.key_v2)
.get(&self.name)
.and_then(|c| self.parse(c))
})
}
let cookie_opt = jar.private(&self.key).get(&self.name);
if let Some(cookie) = cookie_opt {
return Some(cookie.value().into());
}
}
fn parse(&self, cookie: Cookie) -> Option<CookieValue> {
let value: CookieValue = serde_json::from_str(cookie.value()).ok()?;
let now = SystemTime::now();
if let Some(visit_deadline) = self.visit_deadline {
if now.duration_since(value.visit_timestamp?).ok()?
> visit_deadline.to_std().ok()?
{
return None;
}
}
None
if let Some(login_deadline) = self.login_deadline {
if now.duration_since(value.login_timestamp?).ok()?
> login_deadline.to_std().ok()?
{
return None;
}
}
Some(value)
}
fn legacy_supported(&self) -> bool {
self.visit_deadline.is_none() && self.login_deadline.is_none()
}
fn always_update_cookie(&self) -> bool {
self.visit_deadline.is_some()
}
fn requires_oob_data(&self) -> bool {
self.login_deadline.is_some()
}
}
@ -443,6 +510,18 @@ impl CookieIdentityPolicy {
Rc::get_mut(&mut self.0).unwrap().same_site = Some(same_site);
self
}
/// Accepts only users whose cookie has been seen before the given deadline
pub fn visit_deadline(mut self, value: Duration) -> CookieIdentityPolicy {
Rc::get_mut(&mut self.0).unwrap().visit_deadline = Some(value);
self
}
/// Accepts only users which has been authenticated before the given deadline
pub fn login_deadline(mut self, value: Duration) -> CookieIdentityPolicy {
Rc::get_mut(&mut self.0).unwrap().login_deadline = Some(value);
self
}
}
impl IdentityPolicy for CookieIdentityPolicy {
@ -450,7 +529,19 @@ impl IdentityPolicy for CookieIdentityPolicy {
type ResponseFuture = Result<(), Error>;
fn from_request(&self, req: &mut ServiceRequest) -> Self::Future {
Ok(self.0.load(req))
Ok(self.0.load(req).map(
|CookieValue {
identity,
login_timestamp,
..
}| {
if self.0.requires_oob_data() {
req.extensions_mut()
.insert(CookieIdentityExtention { login_timestamp });
}
identity
},
))
}
fn to_response<B>(
@ -459,9 +550,36 @@ impl IdentityPolicy for CookieIdentityPolicy {
changed: bool,
res: &mut ServiceResponse<B>,
) -> Self::ResponseFuture {
if changed {
let _ = self.0.set_cookie(res, id);
}
let _ = if changed {
let login_timestamp = SystemTime::now();
self.0.set_cookie(
res,
id.map(|identity| CookieValue {
identity,
login_timestamp: self.0.login_deadline.map(|_| login_timestamp),
visit_timestamp: self.0.visit_deadline.map(|_| login_timestamp),
}),
)
} else if self.0.always_update_cookie() && id.is_some() {
let visit_timestamp = SystemTime::now();
let mut login_timestamp = None;
if self.0.requires_oob_data() {
let CookieIdentityExtention {
login_timestamp: lt,
} = res.request().extensions_mut().remove().unwrap();
login_timestamp = lt;
}
self.0.set_cookie(
res,
Some(CookieValue {
identity: id.unwrap(),
login_timestamp,
visit_timestamp: self.0.visit_deadline.map(|_| visit_timestamp),
}),
)
} else {
Ok(())
};
Ok(())
}
}
@ -473,14 +591,20 @@ mod tests {
use crate::test::{self, TestRequest};
use crate::{web, App, HttpResponse};
use std::borrow::Borrow;
const COOKIE_KEY_MASTER: [u8; 32] = [0; 32];
const COOKIE_NAME: &'static str = "actix_auth";
const COOKIE_LOGIN: &'static str = "test";
#[test]
fn test_identity() {
let mut srv = test::init_service(
App::new()
.wrap(IdentityService::new(
CookieIdentityPolicy::new(&[0; 32])
CookieIdentityPolicy::new(&COOKIE_KEY_MASTER)
.domain("www.rust-lang.org")
.name("actix_auth")
.name(COOKIE_NAME)
.path("/")
.secure(true),
))
@ -492,7 +616,7 @@ mod tests {
}
}))
.service(web::resource("/login").to(|id: Identity| {
id.remember("test".to_string());
id.remember(COOKIE_LOGIN.to_string());
HttpResponse::Ok()
}))
.service(web::resource("/logout").to(|id: Identity| {
@ -537,9 +661,9 @@ mod tests {
let mut srv = test::init_service(
App::new()
.wrap(IdentityService::new(
CookieIdentityPolicy::new(&[0; 32])
CookieIdentityPolicy::new(&COOKIE_KEY_MASTER)
.domain("www.rust-lang.org")
.name("actix_auth")
.name(COOKIE_NAME)
.path("/")
.max_age_time(duration)
.secure(true),
@ -563,9 +687,9 @@ mod tests {
let mut srv = test::init_service(
App::new()
.wrap(IdentityService::new(
CookieIdentityPolicy::new(&[0; 32])
CookieIdentityPolicy::new(&COOKIE_KEY_MASTER)
.domain("www.rust-lang.org")
.name("actix_auth")
.name(COOKIE_NAME)
.path("/")
.max_age(seconds)
.secure(true),
@ -582,4 +706,328 @@ mod tests {
let c = resp.response().cookies().next().unwrap().to_owned();
assert_eq!(Duration::seconds(seconds as i64), c.max_age().unwrap());
}
fn create_identity_server<
F: Fn(CookieIdentityPolicy) -> CookieIdentityPolicy + Sync + Send + Clone + 'static,
>(
f: F,
) -> impl actix_service::Service<
Request = actix_http::Request,
Response = ServiceResponse<actix_http::body::Body>,
Error = actix_http::Error,
> {
test::init_service(
App::new()
.wrap(IdentityService::new(f(CookieIdentityPolicy::new(
&COOKIE_KEY_MASTER,
)
.secure(false)
.name(COOKIE_NAME))))
.service(web::resource("/").to(|id: Identity| {
let identity = id.identity();
if identity.is_none() {
id.remember(COOKIE_LOGIN.to_string())
}
web::Json(identity)
})),
)
}
fn legacy_login_cookie(identity: &'static str) -> Cookie<'static> {
let mut jar = CookieJar::new();
jar.private(&Key::from_master(&COOKIE_KEY_MASTER))
.add(Cookie::new(COOKIE_NAME, identity));
jar.get(COOKIE_NAME).unwrap().clone()
}
fn login_cookie(
identity: &'static str,
login_timestamp: Option<SystemTime>,
visit_timestamp: Option<SystemTime>,
) -> Cookie<'static> {
let mut jar = CookieJar::new();
let key: Vec<u8> = COOKIE_KEY_MASTER
.iter()
.chain([1, 0, 0, 0].iter())
.map(|e| *e)
.collect();
jar.private(&Key::from_master(&key)).add(Cookie::new(
COOKIE_NAME,
serde_json::to_string(&CookieValue {
identity: identity.to_string(),
login_timestamp,
visit_timestamp,
})
.unwrap(),
));
jar.get(COOKIE_NAME).unwrap().clone()
}
fn assert_logged_in(response: &mut ServiceResponse, identity: Option<&str>) {
use bytes::BytesMut;
use futures::Stream;
let bytes =
test::block_on(response.take_body().fold(BytesMut::new(), |mut b, c| {
b.extend(c);
Ok::<_, Error>(b)
}))
.unwrap();
let resp: Option<String> = serde_json::from_slice(&bytes[..]).unwrap();
assert_eq!(resp.as_ref().map(|s| s.borrow()), identity);
}
fn assert_legacy_login_cookie(response: &mut ServiceResponse, identity: &str) {
let mut cookies = CookieJar::new();
for cookie in response.headers().get_all(header::SET_COOKIE) {
cookies.add(Cookie::parse(cookie.to_str().unwrap().to_string()).unwrap());
}
let cookie = cookies
.private(&Key::from_master(&COOKIE_KEY_MASTER))
.get(COOKIE_NAME)
.unwrap();
assert_eq!(cookie.value(), identity);
}
enum LoginTimestampCheck {
NoTimestamp,
NewTimestamp,
OldTimestamp(SystemTime),
}
enum VisitTimeStampCheck {
NoTimestamp,
NewTimestamp,
}
fn assert_login_cookie(
response: &mut ServiceResponse,
identity: &str,
login_timestamp: LoginTimestampCheck,
visit_timestamp: VisitTimeStampCheck,
) {
let mut cookies = CookieJar::new();
for cookie in response.headers().get_all(header::SET_COOKIE) {
cookies.add(Cookie::parse(cookie.to_str().unwrap().to_string()).unwrap());
}
let key: Vec<u8> = COOKIE_KEY_MASTER
.iter()
.chain([1, 0, 0, 0].iter())
.map(|e| *e)
.collect();
let cookie = cookies
.private(&Key::from_master(&key))
.get(COOKIE_NAME)
.unwrap();
let cv: CookieValue = serde_json::from_str(cookie.value()).unwrap();
assert_eq!(cv.identity, identity);
let now = SystemTime::now();
let t30sec_ago = now - Duration::seconds(30).to_std().unwrap();
match login_timestamp {
LoginTimestampCheck::NoTimestamp => assert_eq!(cv.login_timestamp, None),
LoginTimestampCheck::NewTimestamp => assert!(
t30sec_ago <= cv.login_timestamp.unwrap()
&& cv.login_timestamp.unwrap() <= now
),
LoginTimestampCheck::OldTimestamp(old_timestamp) => {
assert_eq!(cv.login_timestamp, Some(old_timestamp))
}
}
match visit_timestamp {
VisitTimeStampCheck::NoTimestamp => assert_eq!(cv.visit_timestamp, None),
VisitTimeStampCheck::NewTimestamp => assert!(
t30sec_ago <= cv.visit_timestamp.unwrap()
&& cv.visit_timestamp.unwrap() <= now
),
}
}
fn assert_no_login_cookie(response: &mut ServiceResponse) {
let mut cookies = CookieJar::new();
for cookie in response.headers().get_all(header::SET_COOKIE) {
cookies.add(Cookie::parse(cookie.to_str().unwrap().to_string()).unwrap());
}
assert!(cookies.get(COOKIE_NAME).is_none());
}
#[test]
fn test_identity_legacy_cookie_is_set() {
let mut srv = create_identity_server(|c| c);
let mut resp =
test::call_service(&mut srv, TestRequest::with_uri("/").to_request());
assert_logged_in(&mut resp, None);
assert_legacy_login_cookie(&mut resp, COOKIE_LOGIN);
}
#[test]
fn test_identity_legacy_cookie_works() {
let mut srv = create_identity_server(|c| c);
let cookie = legacy_login_cookie(COOKIE_LOGIN);
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, Some(COOKIE_LOGIN));
assert_no_login_cookie(&mut resp);
}
#[test]
fn test_identity_legacy_cookie_rejected_if_visit_timestamp_needed() {
let mut srv = create_identity_server(|c| c.visit_deadline(Duration::days(90)));
let cookie = legacy_login_cookie(COOKIE_LOGIN);
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, None);
assert_login_cookie(
&mut resp,
COOKIE_LOGIN,
LoginTimestampCheck::NoTimestamp,
VisitTimeStampCheck::NewTimestamp,
);
}
#[test]
fn test_identity_legacy_cookie_rejected_if_login_timestamp_needed() {
let mut srv = create_identity_server(|c| c.login_deadline(Duration::days(90)));
let cookie = legacy_login_cookie(COOKIE_LOGIN);
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, None);
assert_login_cookie(
&mut resp,
COOKIE_LOGIN,
LoginTimestampCheck::NewTimestamp,
VisitTimeStampCheck::NoTimestamp,
);
}
#[test]
fn test_identity_cookie_rejected_if_login_timestamp_needed() {
let mut srv = create_identity_server(|c| c.login_deadline(Duration::days(90)));
let cookie = login_cookie(COOKIE_LOGIN, None, Some(SystemTime::now()));
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, None);
assert_login_cookie(
&mut resp,
COOKIE_LOGIN,
LoginTimestampCheck::NewTimestamp,
VisitTimeStampCheck::NoTimestamp,
);
}
#[test]
fn test_identity_cookie_rejected_if_visit_timestamp_needed() {
let mut srv = create_identity_server(|c| c.visit_deadline(Duration::days(90)));
let cookie = login_cookie(COOKIE_LOGIN, Some(SystemTime::now()), None);
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, None);
assert_login_cookie(
&mut resp,
COOKIE_LOGIN,
LoginTimestampCheck::NoTimestamp,
VisitTimeStampCheck::NewTimestamp,
);
}
#[test]
fn test_identity_cookie_rejected_if_login_timestamp_too_old() {
let mut srv = create_identity_server(|c| c.login_deadline(Duration::days(90)));
let cookie = login_cookie(
COOKIE_LOGIN,
Some(SystemTime::now() - Duration::days(180).to_std().unwrap()),
None,
);
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, None);
assert_login_cookie(
&mut resp,
COOKIE_LOGIN,
LoginTimestampCheck::NewTimestamp,
VisitTimeStampCheck::NoTimestamp,
);
}
#[test]
fn test_identity_cookie_rejected_if_visit_timestamp_too_old() {
let mut srv = create_identity_server(|c| c.visit_deadline(Duration::days(90)));
let cookie = login_cookie(
COOKIE_LOGIN,
None,
Some(SystemTime::now() - Duration::days(180).to_std().unwrap()),
);
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, None);
assert_login_cookie(
&mut resp,
COOKIE_LOGIN,
LoginTimestampCheck::NoTimestamp,
VisitTimeStampCheck::NewTimestamp,
);
}
#[test]
fn test_identity_cookie_not_updated_on_login_deadline() {
let mut srv = create_identity_server(|c| c.login_deadline(Duration::days(90)));
let cookie = login_cookie(COOKIE_LOGIN, Some(SystemTime::now()), None);
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, Some(COOKIE_LOGIN));
assert_no_login_cookie(&mut resp);
}
#[test]
fn test_identity_cookie_updated_on_visit_deadline() {
let mut srv = create_identity_server(|c| {
c.visit_deadline(Duration::days(90))
.login_deadline(Duration::days(90))
});
let timestamp = SystemTime::now() - Duration::days(1).to_std().unwrap();
let cookie = login_cookie(COOKIE_LOGIN, Some(timestamp), Some(timestamp));
let mut resp = test::call_service(
&mut srv,
TestRequest::with_uri("/")
.cookie(cookie.clone())
.to_request(),
);
assert_logged_in(&mut resp, Some(COOKIE_LOGIN));
assert_login_cookie(
&mut resp,
COOKIE_LOGIN,
LoginTimestampCheck::OldTimestamp(timestamp),
VisitTimeStampCheck::NewTimestamp,
);
}
}

View File

@ -6,10 +6,11 @@ pub mod cors;
mod defaultheaders;
pub mod errhandlers;
mod logger;
pub mod normalize;
mod normalize;
pub use self::defaultheaders::DefaultHeaders;
pub use self::logger::Logger;
pub use self::normalize::NormalizePath;
#[cfg(feature = "secure-cookies")]
pub mod identity;

View File

@ -217,7 +217,7 @@ where
F: AsyncFactory<I, R>,
I: FromRequest + 'static,
R: IntoFuture + 'static,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
self.routes.push(Route::new().to_async(handler));

View File

@ -1,8 +1,12 @@
use actix_http::error::InternalError;
use actix_http::{http::StatusCode, Error, Response, ResponseBuilder};
use actix_http::http::{
header::IntoHeaderValue, Error as HttpError, HeaderMap, HeaderName, HttpTryFrom,
StatusCode,
};
use actix_http::{Error, Response, ResponseBuilder};
use bytes::{Bytes, BytesMut};
use futures::future::{err, ok, Either as EitherFuture, FutureResult};
use futures::{Future, IntoFuture, Poll};
use futures::{try_ready, Async, Future, IntoFuture, Poll};
use crate::request::HttpRequest;
@ -18,6 +22,51 @@ pub trait Responder {
/// Convert itself to `AsyncResult` or `Error`.
fn respond_to(self, req: &HttpRequest) -> Self::Future;
/// Override a status code for a Responder.
///
/// ```rust
/// use actix_web::{HttpRequest, Responder, http::StatusCode};
///
/// fn index(req: HttpRequest) -> impl Responder {
/// "Welcome!".with_status(StatusCode::OK)
/// }
/// # fn main() {}
/// ```
fn with_status(self, status: StatusCode) -> CustomResponder<Self>
where
Self: Sized,
{
CustomResponder::new(self).with_status(status)
}
/// Add header to the Responder's response.
///
/// ```rust
/// use actix_web::{web, HttpRequest, Responder};
/// use serde::Serialize;
///
/// #[derive(Serialize)]
/// struct MyObj {
/// name: String,
/// }
///
/// fn index(req: HttpRequest) -> impl Responder {
/// web::Json(
/// MyObj{name: "Name".to_string()}
/// )
/// .with_header("x-version", "1.2.3")
/// }
/// # fn main() {}
/// ```
fn with_header<K, V>(self, key: K, value: V) -> CustomResponder<Self>
where
Self: Sized,
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
CustomResponder::new(self).with_header(key, value)
}
}
impl Responder for Response {
@ -154,6 +203,117 @@ impl Responder for BytesMut {
}
}
/// Allows to override status code and headers for a responder.
pub struct CustomResponder<T> {
responder: T,
status: Option<StatusCode>,
headers: Option<HeaderMap>,
error: Option<HttpError>,
}
impl<T: Responder> CustomResponder<T> {
fn new(responder: T) -> Self {
CustomResponder {
responder,
status: None,
headers: None,
error: None,
}
}
/// Override a status code for the Responder's response.
///
/// ```rust
/// use actix_web::{HttpRequest, Responder, http::StatusCode};
///
/// fn index(req: HttpRequest) -> impl Responder {
/// "Welcome!".with_status(StatusCode::OK)
/// }
/// # fn main() {}
/// ```
pub fn with_status(mut self, status: StatusCode) -> Self {
self.status = Some(status);
self
}
/// Add header to the Responder's response.
///
/// ```rust
/// use actix_web::{web, HttpRequest, Responder};
/// use serde::Serialize;
///
/// #[derive(Serialize)]
/// struct MyObj {
/// name: String,
/// }
///
/// fn index(req: HttpRequest) -> impl Responder {
/// web::Json(
/// MyObj{name: "Name".to_string()}
/// )
/// .with_header("x-version", "1.2.3")
/// }
/// # fn main() {}
/// ```
pub fn with_header<K, V>(mut self, key: K, value: V) -> Self
where
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
if self.headers.is_none() {
self.headers = Some(HeaderMap::new());
}
match HeaderName::try_from(key) {
Ok(key) => match value.try_into() {
Ok(value) => {
self.headers.as_mut().unwrap().append(key, value);
}
Err(e) => self.error = Some(e.into()),
},
Err(e) => self.error = Some(e.into()),
};
self
}
}
impl<T: Responder> Responder for CustomResponder<T> {
type Error = T::Error;
type Future = CustomResponderFut<T>;
fn respond_to(self, req: &HttpRequest) -> Self::Future {
CustomResponderFut {
fut: self.responder.respond_to(req).into_future(),
status: self.status,
headers: self.headers,
}
}
}
pub struct CustomResponderFut<T: Responder> {
fut: <T::Future as IntoFuture>::Future,
status: Option<StatusCode>,
headers: Option<HeaderMap>,
}
impl<T: Responder> Future for CustomResponderFut<T> {
type Item = Response;
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut res = try_ready!(self.fut.poll());
if let Some(status) = self.status {
*res.status_mut() = status;
}
if let Some(ref headers) = self.headers {
for (k, v) in headers {
res.headers_mut().insert(k.clone(), v.clone());
}
}
Ok(Async::Ready(res))
}
}
/// Combines two different responder types into a single type
///
/// ```rust
@ -435,4 +595,33 @@ pub(crate) mod tests {
);
assert!(res.is_err());
}
#[test]
fn test_custom_responder() {
let req = TestRequest::default().to_http_request();
let res = block_on(
"test"
.to_string()
.with_status(StatusCode::BAD_REQUEST)
.respond_to(&req),
)
.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
assert_eq!(res.body().bin_ref(), b"test");
let res = block_on(
"test"
.to_string()
.with_header("content-type", "json")
.respond_to(&req),
)
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(res.body().bin_ref(), b"test");
assert_eq!(
res.headers().get(CONTENT_TYPE).unwrap(),
HeaderValue::from_static("json")
);
}
}

View File

@ -1,7 +1,7 @@
use std::cell::RefCell;
use std::rc::Rc;
use actix_http::{http::Method, Error, Extensions, Response};
use actix_http::{http::Method, Error, Extensions};
use actix_service::{NewService, Service};
use futures::future::{ok, Either, FutureResult};
use futures::{Async, Future, IntoFuture, Poll};
@ -278,7 +278,7 @@ impl Route {
F: AsyncFactory<T, R>,
T: FromRequest + 'static,
R: IntoFuture + 'static,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
self.service = Box::new(RouteNewService::new(Extract::new(
@ -418,18 +418,25 @@ where
mod tests {
use std::time::Duration;
use bytes::Bytes;
use futures::Future;
use serde_derive::Serialize;
use tokio_timer::sleep;
use crate::http::{Method, StatusCode};
use crate::test::{call_service, init_service, TestRequest};
use crate::test::{call_service, init_service, read_body, TestRequest};
use crate::{error, web, App, HttpResponse};
#[derive(Serialize, PartialEq, Debug)]
struct MyObject {
name: String,
}
#[test]
fn test_route() {
let mut srv =
init_service(
App::new().service(
let mut srv = init_service(
App::new()
.service(
web::resource("/test")
.route(web::get().to(|| HttpResponse::Ok()))
.route(web::put().to(|| {
@ -444,8 +451,15 @@ mod tests {
Err::<HttpResponse, _>(error::ErrorBadRequest("err"))
})
})),
),
);
)
.service(web::resource("/json").route(web::get().to_async(|| {
sleep(Duration::from_millis(25)).then(|_| {
Ok::<_, crate::Error>(web::Json(MyObject {
name: "test".to_string(),
}))
})
}))),
);
let req = TestRequest::with_uri("/test")
.method(Method::GET)
@ -476,5 +490,12 @@ mod tests {
.to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
let req = TestRequest::with_uri("/json").to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), StatusCode::OK);
let body = read_body(resp);
assert_eq!(body, Bytes::from_static(b"{\"name\":\"test\"}"));
}
}

View File

@ -7,11 +7,14 @@ use actix_http::{
Error, Extensions, HttpMessage, Payload, PayloadStream, RequestHead, Response,
ResponseHead,
};
use actix_router::{Path, Resource, Url};
use actix_router::{Path, Resource, ResourceDef, Url};
use actix_service::{IntoNewService, NewService};
use futures::future::{ok, FutureResult, IntoFuture};
use crate::config::{AppConfig, AppService};
use crate::data::Data;
use crate::dev::insert_slash;
use crate::guard::Guard;
use crate::info::ConnectionInfo;
use crate::request::HttpRequest;
@ -380,10 +383,136 @@ impl<B: MessageBody> fmt::Debug for ServiceResponse<B> {
}
}
pub struct WebService {
rdef: String,
name: Option<String>,
guards: Vec<Box<Guard>>,
}
impl WebService {
/// Create new `WebService` instance.
pub fn new(path: &str) -> Self {
WebService {
rdef: path.to_string(),
name: None,
guards: Vec::new(),
}
}
/// Set service name.
///
/// Name is used for url generation.
pub fn name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
/// Add match guard to a web service.
///
/// ```rust
/// use actix_web::{web, guard, dev, App, HttpResponse};
///
/// fn index(req: dev::ServiceRequest) -> dev::ServiceResponse {
/// req.into_response(HttpResponse::Ok().finish())
/// }
///
/// fn main() {
/// let app = App::new()
/// .service(
/// web::service("/app")
/// .guard(guard::Header("content-type", "text/plain"))
/// .finish(index)
/// );
/// }
/// ```
pub fn guard<G: Guard + 'static>(mut self, guard: G) -> Self {
self.guards.push(Box::new(guard));
self
}
/// Set a service factory implementation and generate web service.
pub fn finish<T, F>(self, service: F) -> impl HttpServiceFactory
where
F: IntoNewService<T>,
T: NewService<
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
{
WebServiceImpl {
srv: service.into_new_service(),
rdef: self.rdef,
name: self.name,
guards: self.guards,
}
}
}
struct WebServiceImpl<T> {
srv: T,
rdef: String,
name: Option<String>,
guards: Vec<Box<Guard>>,
}
impl<T> HttpServiceFactory for WebServiceImpl<T>
where
T: NewService<
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
{
fn register(mut self, config: &mut AppService) {
let guards = if self.guards.is_empty() {
None
} else {
Some(std::mem::replace(&mut self.guards, Vec::new()))
};
let mut rdef = if config.is_root() || !self.rdef.is_empty() {
ResourceDef::new(&insert_slash(&self.rdef))
} else {
ResourceDef::new(&self.rdef)
};
if let Some(ref name) = self.name {
*rdef.name_mut() = name.clone();
}
config.register_service(rdef, guards, self.srv, None)
}
}
#[cfg(test)]
mod tests {
use crate::test::TestRequest;
use crate::HttpResponse;
use super::*;
use crate::test::{call_service, init_service, TestRequest};
use crate::{guard, http, web, App, HttpResponse};
#[test]
fn test_service() {
let mut srv = init_service(
App::new().service(web::service("/test").name("test").finish(
|req: ServiceRequest| req.into_response(HttpResponse::Ok().finish()),
)),
);
let req = TestRequest::with_uri("/test").to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), http::StatusCode::OK);
let mut srv = init_service(
App::new().service(web::service("/test").guard(guard::Get()).finish(
|req: ServiceRequest| req.into_response(HttpResponse::Ok().finish()),
)),
);
let req = TestRequest::with_uri("/test")
.method(http::Method::PUT)
.to_request();
let resp = call_service(&mut srv, req);
assert_eq!(resp.status(), http::StatusCode::NOT_FOUND);
}
#[test]
fn test_fmt_debug() {

View File

@ -193,6 +193,46 @@ where
.unwrap_or_else(|_| panic!("read_response failed at block_on unwrap"))
}
/// Helper function that returns a response body of a ServiceResponse.
/// This function blocks the current thread until futures complete.
///
/// ```rust
/// use actix_web::{test, web, App, HttpResponse, http::header};
/// use bytes::Bytes;
///
/// #[test]
/// fn test_index() {
/// let mut app = test::init_service(
/// App::new().service(
/// web::resource("/index.html")
/// .route(web::post().to(
/// || HttpResponse::Ok().body("welcome!")))));
///
/// let req = test::TestRequest::post()
/// .uri("/index.html")
/// .header(header::CONTENT_TYPE, "application/json")
/// .to_request();
///
/// let resp = call_service(&mut srv, req);
/// let result = test::read_body(resp);
/// assert_eq!(result, Bytes::from_static(b"welcome!"));
/// }
/// ```
pub fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes
where
B: MessageBody,
{
block_on(run_on(move || {
res.take_body()
.fold(BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, Error>(body)
})
.map(|body: BytesMut| body.freeze())
}))
.unwrap_or_else(|_| panic!("read_response failed at block_on unwrap"))
}
/// Helper function that returns a deserialized response body of a TestRequest
/// This function blocks the current thread until futures complete.
///

View File

@ -1,5 +1,5 @@
//! Essentials helper functions and types for application registration.
use actix_http::{http::Method, Response};
use actix_http::http::Method;
use futures::{Future, IntoFuture};
pub use actix_http::Response as HttpResponse;
@ -12,6 +12,7 @@ use crate::resource::Resource;
use crate::responder::Responder;
use crate::route::Route;
use crate::scope::Scope;
use crate::service::WebService;
pub use crate::config::ServiceConfig;
pub use crate::data::{Data, RouteData};
@ -268,12 +269,34 @@ where
F: AsyncFactory<I, R>,
I: FromRequest + 'static,
R: IntoFuture + 'static,
R::Item: Into<Response>,
R::Item: Responder,
R::Error: Into<Error>,
{
Route::new().to_async(handler)
}
/// Create raw service for a specific path.
///
/// ```rust
/// # extern crate actix_web;
/// use actix_web::{dev, web, guard, App, HttpResponse};
///
/// fn my_service(req: dev::ServiceRequest) -> dev::ServiceResponse {
/// req.into_response(HttpResponse::Ok().finish())
/// }
///
/// fn main() {
/// let app = App::new().service(
/// web::service("/users/*")
/// .guard(guard::Header("content-type", "text/plain"))
/// .finish(my_service)
/// );
/// }
/// ```
pub fn service(path: &str) -> WebService {
WebService::new(path)
}
/// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution.
pub fn block<F, I, E>(f: F) -> impl Future<Item = I, Error = BlockingError<E>>

View File

@ -1,5 +1,10 @@
# Changes
## [0.1.1] - 2019-04-24
* Always make new connection for http client
## [0.1.0] - 2019-04-16
* No changes

View File

@ -1,6 +1,6 @@
[package]
name = "actix-http-test"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http test server"
readme = "README.md"
@ -35,7 +35,7 @@ actix-rt = "0.2.2"
actix-service = "0.3.6"
actix-server = "0.4.3"
actix-utils = "0.3.5"
awc = "0.1.0"
awc = "0.1.1"
base64 = "0.10"
bytes = "0.4"
@ -55,5 +55,5 @@ tokio-timer = "0.2"
openssl = { version="0.10", optional = true }
[dev-dependencies]
actix-web = "1.0.0-alpha.5"
actix-http = "0.1.0"
actix-web = "1.0.0-beta.1"
actix-http = "0.1.2"

View File

@ -124,6 +124,7 @@ impl TestServer {
|e| log::error!("Can not set alpn protocol: {:?}", e),
);
Connector::new()
.conn_lifetime(time::Duration::from_secs(0))
.timeout(time::Duration::from_millis(500))
.ssl(builder.build())
.finish()
@ -131,6 +132,7 @@ impl TestServer {
#[cfg(not(feature = "ssl"))]
{
Connector::new()
.conn_lifetime(time::Duration::from_secs(0))
.timeout(time::Duration::from_millis(500))
.finish()
}
@ -163,6 +165,15 @@ impl TestServerRuntime {
self.rt.block_on(fut)
}
/// Execute future on current core
pub fn block_on_fn<F, R>(&mut self, f: F) -> Result<R::Item, R::Error>
where
F: FnOnce() -> R,
R: Future,
{
self.rt.block_on(lazy(|| f()))
}
/// Execute function on current core
pub fn execute<F, R>(&mut self, fut: F) -> R
where