1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-12-01 02:44:37 +01:00

Merge branch 'master' into feature/sockets-vec

This commit is contained in:
Nikolay Kim 2018-03-11 16:38:17 -07:00 committed by GitHub
commit 3e276ac921
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 104 additions and 96 deletions

View File

@ -1,5 +1,14 @@
# Changes # Changes
## 0.4.7 (2018-03-11)
* Fix panic on unknown content encoding
* Fix connection get closed too early
* Fix steraming response handling for http/2
## 0.4.6 (2018-03-10) ## 0.4.6 (2018-03-10)
* Fix client cookie handling * Fix client cookie handling

View File

@ -1,8 +1,8 @@
//! Actix web diesel example //! Actix web diesel example
//! //!
//! Diesel does not support tokio, so we have to run it in separate threads. //! Diesel does not support tokio, so we have to run it in separate threads.
//! Actix supports sync actors by default, so we going to create sync actor that will //! Actix supports sync actors by default, so we going to create sync actor that use diesel.
//! use diesel. Technically sync actors are worker style actors, multiple of them //! Technically sync actors are worker style actors, multiple of them
//! can run in parallel and process messages from same queue. //! can run in parallel and process messages from same queue.
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
@ -38,6 +38,7 @@ struct State {
fn index(req: HttpRequest<State>) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index(req: HttpRequest<State>) -> Box<Future<Item=HttpResponse, Error=Error>> {
let name = &req.match_info()["name"]; let name = &req.match_info()["name"];
// send async `CreateUser` message to a `DbExecutor`
req.state().db.send(CreateUser{name: name.to_owned()}) req.state().db.send(CreateUser{name: name.to_owned()})
.from_err() .from_err()
.and_then(|res| { .and_then(|res| {
@ -54,7 +55,7 @@ fn main() {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("diesel-example"); let sys = actix::System::new("diesel-example");
// Start db executor actors // Start 3 db executor actors
let addr = SyncArbiter::start(3, || { let addr = SyncArbiter::start(3, || {
DbExecutor(SqliteConnection::establish("test.db").unwrap()) DbExecutor(SqliteConnection::establish("test.db").unwrap())
}); });

View File

@ -238,9 +238,10 @@ Both methods could be combined. (i.e Async response with streaming body)
## Different return types (Either) ## Different return types (Either)
Sometimes you need to return different types of responses. For example Sometimes you need to return different types of responses. For example
you can do error check and return error, otherwise return async response. you can do error check and return error and return async response otherwise.
Or any result that requires two different types. Or any result that requires two different types.
For this case [*Either*](../actix_web/enum.Either.html) type can be used. For this case [*Either*](../actix_web/enum.Either.html) type can be used.
*Either* allows to combine two different responder types into a single type.
```rust ```rust
# extern crate actix_web; # extern crate actix_web;
@ -253,7 +254,7 @@ use actix_web::{Either, Error, HttpResponse, httpcodes};
type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>; type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
fn index(req: HttpRequest) -> RegisterResult { fn index(req: HttpRequest) -> RegisterResult {
if true { // <- choose variant A if is_a_variant() { // <- choose variant A
Either::A( Either::A(
httpcodes::HttpBadRequest.with_body("Bad data")) httpcodes::HttpBadRequest.with_body("Bad data"))
} else { } else {
@ -264,12 +265,12 @@ fn index(req: HttpRequest) -> RegisterResult {
.map_err(|e| e.into())).responder()) .map_err(|e| e.into())).responder())
} }
} }
# fn is_a_variant() -> bool { true }
fn main() { # fn main() {
Application::new() # Application::new()
.resource("/register", |r| r.f(index)) # .resource("/register", |r| r.f(index))
.finish(); # .finish();
} # }
``` ```
## Tokio core handle ## Tokio core handle

View File

@ -34,7 +34,7 @@ pub trait Responder {
fn respond_to(self, req: HttpRequest) -> Result<Self::Item, Self::Error>; fn respond_to(self, req: HttpRequest) -> Result<Self::Item, Self::Error>;
} }
/// Combines two different responders types into a single type /// Combines two different responder types into a single type
/// ///
/// ```rust /// ```rust
/// # extern crate actix_web; /// # extern crate actix_web;
@ -46,8 +46,9 @@ pub trait Responder {
/// ///
/// type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>; /// type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
/// ///
///
/// fn index(req: HttpRequest) -> RegisterResult { /// fn index(req: HttpRequest) -> RegisterResult {
/// if true { // <- choose variant A /// if is_a_variant() { // <- choose variant A
/// Either::A( /// Either::A(
/// httpcodes::HttpBadRequest.with_body("Bad data")) /// httpcodes::HttpBadRequest.with_body("Bad data"))
/// } else { /// } else {
@ -58,6 +59,7 @@ pub trait Responder {
/// .map_err(|e| e.into())).responder()) /// .map_err(|e| e.into())).responder())
/// } /// }
/// } /// }
/// # fn is_a_variant() -> bool { true }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
#[derive(Debug)] #[derive(Debug)]

View File

@ -165,8 +165,7 @@ impl<'a> From<&'a str> for ContentEncoding {
"br" => ContentEncoding::Br, "br" => ContentEncoding::Br,
"gzip" => ContentEncoding::Gzip, "gzip" => ContentEncoding::Gzip,
"deflate" => ContentEncoding::Deflate, "deflate" => ContentEncoding::Deflate,
"identity" => ContentEncoding::Identity, _ => ContentEncoding::Identity,
_ => ContentEncoding::Auto,
} }
} }
} }

View File

@ -466,8 +466,8 @@ impl ContentEncoder {
GzEncoder::new(transfer, Compression::default())), GzEncoder::new(transfer, Compression::default())),
ContentEncoding::Br => ContentEncoder::Br( ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)), BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer), ContentEncoding::Identity | ContentEncoding::Auto =>
ContentEncoding::Auto => unreachable!() ContentEncoder::Identity(transfer),
} }
} }

View File

@ -110,18 +110,6 @@ impl<T, H> Http1<T, H>
} }
} }
fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> {
// check stream state
match self.stream.poll_completed(shutdown) {
Ok(Async::Ready(_)) => Ok(true),
Ok(Async::NotReady) => Ok(false),
Err(err) => {
debug!("Error sending data: {}", err);
Err(())
}
}
}
// TODO: refactor // TODO: refactor
pub fn poll_io(&mut self) -> Poll<bool, ()> { pub fn poll_io(&mut self) -> Poll<bool, ()> {
// read incoming data // read incoming data
@ -272,8 +260,13 @@ impl<T, H> Http1<T, H>
} }
// check stream state // check stream state
if !self.poll_completed(true)? { match self.stream.poll_completed(self.tasks.is_empty()) {
return Ok(Async::NotReady) Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
_ => (),
} }
// deal with keep-alive // deal with keep-alive

View File

@ -26,7 +26,7 @@ use payload::{Payload, PayloadWriter, PayloadStatus};
use super::h2writer::H2Writer; use super::h2writer::H2Writer;
use super::encoding::PayloadType; use super::encoding::PayloadType;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{HttpHandler, HttpHandlerTask}; use super::{HttpHandler, HttpHandlerTask, Writer};
bitflags! { bitflags! {
struct Flags: u8 { struct Flags: u8 {
@ -109,22 +109,27 @@ impl<T, H> Http2<T, H>
loop { loop {
match item.task.poll_io(&mut item.stream) { match item.task.poll_io(&mut item.stream) {
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
item.flags.insert(EntryFlags::EOF);
if ready { if ready {
item.flags.insert(EntryFlags::FINISHED); item.flags.insert(
EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::EOF);
} }
not_ready = false; not_ready = false;
}, },
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
if item.payload.need_read() == PayloadStatus::Read && !retry if item.payload.need_read() == PayloadStatus::Read
&& !retry
{ {
continue continue
} }
}, },
Err(err) => { Err(err) => {
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::EOF); item.flags.insert(
item.flags.insert(EntryFlags::ERROR); EntryFlags::EOF |
EntryFlags::ERROR |
EntryFlags::WRITE_DONE);
item.stream.reset(Reason::INTERNAL_ERROR); item.stream.reset(Reason::INTERNAL_ERROR);
} }
} }
@ -138,18 +143,32 @@ impl<T, H> Http2<T, H>
item.flags.insert(EntryFlags::FINISHED); item.flags.insert(EntryFlags::FINISHED);
}, },
Err(err) => { Err(err) => {
item.flags.insert(EntryFlags::ERROR); item.flags.insert(
item.flags.insert(EntryFlags::FINISHED); EntryFlags::ERROR | EntryFlags::WRITE_DONE |
EntryFlags::FINISHED);
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
} }
} }
} }
if !item.flags.contains(EntryFlags::WRITE_DONE) {
match item.stream.poll_completed(false) {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
not_ready = false;
item.flags.insert(EntryFlags::WRITE_DONE);
}
Err(_err) => {
item.flags.insert(EntryFlags::ERROR);
}
}
}
} }
// cleanup finished tasks // cleanup finished tasks
while !self.tasks.is_empty() { while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF) && if self.tasks[0].flags.contains(EntryFlags::EOF) &&
self.tasks[0].flags.contains(EntryFlags::FINISHED) || self.tasks[0].flags.contains(EntryFlags::WRITE_DONE) ||
self.tasks[0].flags.contains(EntryFlags::ERROR) self.tasks[0].flags.contains(EntryFlags::ERROR)
{ {
self.tasks.pop_front(); self.tasks.pop_front();
@ -251,6 +270,7 @@ bitflags! {
const REOF = 0b0000_0010; const REOF = 0b0000_0010;
const ERROR = 0b0000_0100; const ERROR = 0b0000_0100;
const FINISHED = 0b0000_1000; const FINISHED = 0b0000_1000;
const WRITE_DONE = 0b0001_0000;
} }
} }

View File

@ -24,6 +24,7 @@ bitflags! {
const STARTED = 0b0000_0001; const STARTED = 0b0000_0001;
const DISCONNECTED = 0b0000_0010; const DISCONNECTED = 0b0000_0010;
const EOF = 0b0000_0100; const EOF = 0b0000_0100;
const RESERVED = 0b0000_1000;
} }
} }
@ -56,55 +57,6 @@ impl H2Writer {
stream.send_reset(reason) stream.send_reset(reason)
} }
} }
fn write_to_stream(&mut self) -> io::Result<WriterState> {
if !self.flags.contains(Flags::STARTED) {
return Ok(WriterState::Done)
}
if let Some(ref mut stream) = self.stream {
if self.buffer.is_empty() {
if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true);
}
return Ok(WriterState::Done)
}
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
}
}
Ok(Async::Ready(None)) => {
return Ok(WriterState::Done)
}
Ok(Async::Ready(Some(cap))) => {
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(WriterState::Pause)
}
}
Err(_) => {
return Err(io::Error::new(io::ErrorKind::Other, ""))
}
}
}
}
Ok(WriterState::Done)
}
} }
impl Writer for H2Writer { impl Writer for H2Writer {
@ -172,6 +124,7 @@ impl Writer for H2Writer {
self.written = bytes.len() as u64; self.written = bytes.len() as u64;
self.encoder.write(bytes)?; self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
self.flags.insert(Flags::RESERVED);
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
} }
Ok(WriterState::Pause) Ok(WriterState::Pause)
@ -195,7 +148,7 @@ impl Writer for H2Writer {
} }
} }
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)
@ -217,10 +170,40 @@ impl Writer for H2Writer {
} }
fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> { fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> {
match self.write_to_stream() { if !self.flags.contains(Flags::STARTED) {
Ok(WriterState::Done) => Ok(Async::Ready(())), return Ok(Async::NotReady);
Ok(WriterState::Pause) => Ok(Async::NotReady),
Err(err) => Err(err)
} }
if let Some(ref mut stream) = self.stream {
// reserve capacity
if !self.flags.contains(Flags::RESERVED) && !self.buffer.is_empty() {
self.flags.insert(Flags::RESERVED);
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
}
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(cap))) => {
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(e) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, e))
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(Async::NotReady)
}
}
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
}
return Ok(Async::NotReady)
} }
} }