1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-16 22:55:47 +02:00

Compare commits

...

12 Commits

Author SHA1 Message Date
Nikolay Kim
67f383f346 typo 2018-03-11 16:53:46 -07:00
Nikolay Kim
49f5c335f6 better sleep on error 2018-03-11 16:52:20 -07:00
Nikolay Kim
692e11a584 bump version 2018-03-11 16:40:25 -07:00
Nikolay Kim
208117ca6f Merge pull request #118 from messense/feature/sockets-vec
Use Vec instead of HashMap to store sockets in HttpServer
2018-03-11 16:38:23 -07:00
Nikolay Kim
3e276ac921 Merge branch 'master' into feature/sockets-vec 2018-03-11 16:38:17 -07:00
Nikolay Kim
4af115a19c Fix steraming response handling for http/2 2018-03-11 16:37:44 -07:00
Nikolay Kim
051703eb2c Fix connection get closed too early 2018-03-11 15:37:33 -07:00
Nikolay Kim
31fbbd3168 Fix panic on unknown content encoding 2018-03-11 14:50:13 -07:00
Nikolay Kim
fee1e255ac add comments 2018-03-11 10:10:30 -07:00
Nikolay Kim
a4c933e56e update doc string 2018-03-11 09:36:54 -07:00
Nikolay Kim
9ddf5a3550 better doc string for Either 2018-03-11 09:28:22 -07:00
messense
9ab0fa604d Use Vec instead of HashMap to store sockets in HttpServer 2018-03-11 17:29:44 +08:00
11 changed files with 133 additions and 112 deletions

View File

@@ -1,5 +1,16 @@
# Changes
## 0.4.7 (2018-03-11)
* Fix panic on unknown content encoding
* Fix connection get closed too early
* Fix streaming response handling for http/2
* Better sleep on error support
## 0.4.6 (2018-03-10)
* Fix client cookie handling

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.4.6"
version = "0.4.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
readme = "README.md"

View File

@@ -1,8 +1,8 @@
//! Actix web diesel example
//!
//! Diesel does not support tokio, so we have to run it in separate threads.
//! Actix supports sync actors by default, so we going to create sync actor that will
//! use diesel. Technically sync actors are worker style actors, multiple of them
//! Actix supports sync actors by default, so we going to create sync actor that use diesel.
//! Technically sync actors are worker style actors, multiple of them
//! can run in parallel and process messages from same queue.
extern crate serde;
extern crate serde_json;
@@ -38,6 +38,7 @@ struct State {
fn index(req: HttpRequest<State>) -> Box<Future<Item=HttpResponse, Error=Error>> {
let name = &req.match_info()["name"];
// send async `CreateUser` message to a `DbExecutor`
req.state().db.send(CreateUser{name: name.to_owned()})
.from_err()
.and_then(|res| {
@@ -54,7 +55,7 @@ fn main() {
let _ = env_logger::init();
let sys = actix::System::new("diesel-example");
// Start db executor actors
// Start 3 db executor actors
let addr = SyncArbiter::start(3, || {
DbExecutor(SqliteConnection::establish("test.db").unwrap())
});

View File

@@ -238,9 +238,10 @@ Both methods could be combined. (i.e Async response with streaming body)
## Different return types (Either)
Sometimes you need to return different types of responses. For example
you can do error check and return error, otherwise return async response.
you can do error check and return error and return async response otherwise.
Or any result that requires two different types.
For this case [*Either*](../actix_web/enum.Either.html) type can be used.
*Either* allows to combine two different responder types into a single type.
```rust
# extern crate actix_web;
@@ -253,23 +254,23 @@ use actix_web::{Either, Error, HttpResponse, httpcodes};
type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
fn index(req: HttpRequest) -> RegisterResult {
if true { // <- choose variant A
if is_a_variant() { // <- choose variant A
Either::A(
httpcodes::HttpBadRequest.with_body("Bad data"))
} else {
Either::B( // <- variant B
Either::B( // <- variant B
result(HttpResponse::Ok()
.content_type("text/html")
.body(format!("Hello!"))
.map_err(|e| e.into())).responder())
}
}
fn main() {
Application::new()
.resource("/register", |r| r.f(index))
.finish();
}
# fn is_a_variant() -> bool { true }
# fn main() {
# Application::new()
# .resource("/register", |r| r.f(index))
# .finish();
# }
```
## Tokio core handle

View File

@@ -34,7 +34,7 @@ pub trait Responder {
fn respond_to(self, req: HttpRequest) -> Result<Self::Item, Self::Error>;
}
/// Combines two different responders types into a single type
/// Combines two different responder types into a single type
///
/// ```rust
/// # extern crate actix_web;
@@ -46,18 +46,20 @@ pub trait Responder {
///
/// type RegisterResult = Either<HttpResponse, Box<Future<Item=HttpResponse, Error=Error>>>;
///
///
/// fn index(req: HttpRequest) -> RegisterResult {
/// if true { // <- choose variant A
/// if is_a_variant() { // <- choose variant A
/// Either::A(
/// httpcodes::HttpBadRequest.with_body("Bad data"))
/// } else {
/// Either::B( // <- variant B
/// Either::B( // <- variant B
/// result(HttpResponse::Ok()
/// .content_type("text/html")
/// .body(format!("Hello!"))
/// .map_err(|e| e.into())).responder())
/// }
/// }
/// # fn is_a_variant() -> bool { true }
/// # fn main() {}
/// ```
#[derive(Debug)]

View File

@@ -165,8 +165,7 @@ impl<'a> From<&'a str> for ContentEncoding {
"br" => ContentEncoding::Br,
"gzip" => ContentEncoding::Gzip,
"deflate" => ContentEncoding::Deflate,
"identity" => ContentEncoding::Identity,
_ => ContentEncoding::Auto,
_ => ContentEncoding::Identity,
}
}
}

View File

@@ -466,8 +466,8 @@ impl ContentEncoder {
GzEncoder::new(transfer, Compression::default())),
ContentEncoding::Br => ContentEncoder::Br(
BrotliEncoder::new(transfer, 5)),
ContentEncoding::Identity => ContentEncoder::Identity(transfer),
ContentEncoding::Auto => unreachable!()
ContentEncoding::Identity | ContentEncoding::Auto =>
ContentEncoder::Identity(transfer),
}
}

View File

@@ -110,18 +110,6 @@ impl<T, H> Http1<T, H>
}
}
fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> {
// check stream state
match self.stream.poll_completed(shutdown) {
Ok(Async::Ready(_)) => Ok(true),
Ok(Async::NotReady) => Ok(false),
Err(err) => {
debug!("Error sending data: {}", err);
Err(())
}
}
}
// TODO: refactor
pub fn poll_io(&mut self) -> Poll<bool, ()> {
// read incoming data
@@ -272,8 +260,13 @@ impl<T, H> Http1<T, H>
}
// check stream state
if !self.poll_completed(true)? {
return Ok(Async::NotReady)
match self.stream.poll_completed(self.tasks.is_empty()) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
_ => (),
}
// deal with keep-alive

View File

@@ -26,7 +26,7 @@ use payload::{Payload, PayloadWriter, PayloadStatus};
use super::h2writer::H2Writer;
use super::encoding::PayloadType;
use super::settings::WorkerSettings;
use super::{HttpHandler, HttpHandlerTask};
use super::{HttpHandler, HttpHandlerTask, Writer};
bitflags! {
struct Flags: u8 {
@@ -109,22 +109,27 @@ impl<T, H> Http2<T, H>
loop {
match item.task.poll_io(&mut item.stream) {
Ok(Async::Ready(ready)) => {
item.flags.insert(EntryFlags::EOF);
if ready {
item.flags.insert(EntryFlags::FINISHED);
item.flags.insert(
EntryFlags::EOF | EntryFlags::FINISHED);
} else {
item.flags.insert(EntryFlags::EOF);
}
not_ready = false;
},
Ok(Async::NotReady) => {
if item.payload.need_read() == PayloadStatus::Read && !retry
if item.payload.need_read() == PayloadStatus::Read
&& !retry
{
continue
}
},
Err(err) => {
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::EOF);
item.flags.insert(EntryFlags::ERROR);
item.flags.insert(
EntryFlags::EOF |
EntryFlags::ERROR |
EntryFlags::WRITE_DONE);
item.stream.reset(Reason::INTERNAL_ERROR);
}
}
@@ -138,18 +143,32 @@ impl<T, H> Http2<T, H>
item.flags.insert(EntryFlags::FINISHED);
},
Err(err) => {
item.flags.insert(EntryFlags::ERROR);
item.flags.insert(EntryFlags::FINISHED);
item.flags.insert(
EntryFlags::ERROR | EntryFlags::WRITE_DONE |
EntryFlags::FINISHED);
error!("Unhandled error: {}", err);
}
}
}
if !item.flags.contains(EntryFlags::WRITE_DONE) {
match item.stream.poll_completed(false) {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
not_ready = false;
item.flags.insert(EntryFlags::WRITE_DONE);
}
Err(_err) => {
item.flags.insert(EntryFlags::ERROR);
}
}
}
}
// cleanup finished tasks
while !self.tasks.is_empty() {
if self.tasks[0].flags.contains(EntryFlags::EOF) &&
self.tasks[0].flags.contains(EntryFlags::FINISHED) ||
self.tasks[0].flags.contains(EntryFlags::WRITE_DONE) ||
self.tasks[0].flags.contains(EntryFlags::ERROR)
{
self.tasks.pop_front();
@@ -251,6 +270,7 @@ bitflags! {
const REOF = 0b0000_0010;
const ERROR = 0b0000_0100;
const FINISHED = 0b0000_1000;
const WRITE_DONE = 0b0001_0000;
}
}

View File

@@ -24,6 +24,7 @@ bitflags! {
const STARTED = 0b0000_0001;
const DISCONNECTED = 0b0000_0010;
const EOF = 0b0000_0100;
const RESERVED = 0b0000_1000;
}
}
@@ -56,55 +57,6 @@ impl H2Writer {
stream.send_reset(reason)
}
}
fn write_to_stream(&mut self) -> io::Result<WriterState> {
if !self.flags.contains(Flags::STARTED) {
return Ok(WriterState::Done)
}
if let Some(ref mut stream) = self.stream {
if self.buffer.is_empty() {
if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true);
}
return Ok(WriterState::Done)
}
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
}
}
Ok(Async::Ready(None)) => {
return Ok(WriterState::Done)
}
Ok(Async::Ready(Some(cap))) => {
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(WriterState::Pause)
}
}
Err(_) => {
return Err(io::Error::new(io::ErrorKind::Other, ""))
}
}
}
}
Ok(WriterState::Done)
}
}
impl Writer for H2Writer {
@@ -172,6 +124,7 @@ impl Writer for H2Writer {
self.written = bytes.len() as u64;
self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream {
self.flags.insert(Flags::RESERVED);
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
}
Ok(WriterState::Pause)
@@ -195,7 +148,7 @@ impl Writer for H2Writer {
}
}
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
@@ -217,10 +170,40 @@ impl Writer for H2Writer {
}
fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> {
match self.write_to_stream() {
Ok(WriterState::Done) => Ok(Async::Ready(())),
Ok(WriterState::Pause) => Ok(Async::NotReady),
Err(err) => Err(err)
if !self.flags.contains(Flags::STARTED) {
return Ok(Async::NotReady);
}
if let Some(ref mut stream) = self.stream {
// reserve capacity
if !self.flags.contains(Flags::RESERVED) && !self.buffer.is_empty() {
self.flags.insert(Flags::RESERVED);
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
}
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(cap))) => {
let len = self.buffer.len();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(e) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, e))
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(Async::NotReady)
}
}
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
}
return Ok(Async::NotReady)
}
}

View File

@@ -2,7 +2,6 @@ use std::{io, net, thread};
use std::rc::Rc;
use std::sync::{Arc, mpsc as sync_mpsc};
use std::time::Duration;
use std::collections::HashMap;
use actix::prelude::*;
use actix::actors::signal;
@@ -37,7 +36,7 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
#[cfg_attr(feature="cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Syn, Worker<H::Handler>>)>,
sockets: HashMap<net::SocketAddr, net::TcpListener>,
sockets: Vec<(net::SocketAddr, net::TcpListener)>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool,
shutdown_timeout: u16,
@@ -77,7 +76,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
let f = move || {
(factory)().into_iter().collect()
};
HttpServer{ h: None,
threads: num_cpus::get(),
backlog: 2048,
@@ -85,7 +84,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
keep_alive: KeepAlive::Os,
factory: Arc::new(f),
workers: Vec::new(),
sockets: HashMap::new(),
sockets: Vec::new(),
accept: Vec::new(),
exit: false,
shutdown_timeout: 30,
@@ -173,7 +172,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
/// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.keys().cloned().collect()
self.sockets.iter().map(|s| s.0.clone()).collect()
}
/// Use listener for accepting incoming connection requests
@@ -181,7 +180,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
/// HttpServer does not change any configuration for TcpListener,
/// it needs to be configured before passing it to listen() method.
pub fn listen(mut self, lst: net::TcpListener) -> Self {
self.sockets.insert(lst.local_addr().unwrap(), lst);
self.sockets.push((lst.local_addr().unwrap(), lst));
self
}
@@ -195,7 +194,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
match create_tcp_listener(addr, self.backlog) {
Ok(lst) => {
succ = true;
self.sockets.insert(lst.local_addr().unwrap(), lst);
self.sockets.push((lst.local_addr().unwrap(), lst));
},
Err(e) => err = Some(e),
}
@@ -288,7 +287,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
} else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal};
@@ -357,7 +356,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(
&settings, &StreamHandlerType::Tls(acceptor.clone()));
@@ -409,7 +408,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
let (tx, rx) = mpsc::unbounded();
let acceptor = builder.build();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(
&settings, &StreamHandlerType::Alpn(acceptor.clone()));
@@ -451,7 +450,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
if !self.sockets.is_empty() {
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal};
@@ -742,10 +741,9 @@ fn start_accept_thread(
break
}
},
Err(err) => {
if err.kind() != io::ErrorKind::WouldBlock {
error!("Error accepting connection: {:?}", err);
}
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {:?}", e);
// sleep after error
thread::sleep(sleep);
break
@@ -819,3 +817,16 @@ fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result<net::T
builder.bind(addr)?;
Ok(builder.listen(backlog)?)
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused ||
e.kind() == io::ErrorKind::ConnectionAborted ||
e.kind() == io::ErrorKind::ConnectionReset
}