diff --git a/build.rs b/build.rs index 3b916a95..081d2b50 100644 --- a/build.rs +++ b/build.rs @@ -15,10 +15,10 @@ fn main() { "guide/src/qs_1.md", "guide/src/qs_2.md", "guide/src/qs_3.md", + "guide/src/qs_3_5.md", "guide/src/qs_4.md", "guide/src/qs_4_5.md", "guide/src/qs_5.md", - "guide/src/qs_6.md", "guide/src/qs_7.md", "guide/src/qs_9.md", "guide/src/qs_10.md", diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index a9befac8..e260000a 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -3,9 +3,9 @@ [Quickstart](./qs_1.md) - [Getting Started](./qs_2.md) - [Application](./qs_3.md) +- [Server](./qs_3_5.md) - [Handler](./qs_4.md) - [Errors](./qs_4_5.md) -- [State](./qs_6.md) - [URL Dispatch](./qs_5.md) - [Request & Response](./qs_7.md) - [WebSockets](./qs_9.md) diff --git a/guide/src/qs_3.md b/guide/src/qs_3.md index 51e82d49..909ba292 100644 --- a/guide/src/qs_3.md +++ b/guide/src/qs_3.md @@ -56,3 +56,46 @@ fn main() { ``` All `/app1` requests route to first application, `/app2` to second and then all other to third. + +## State + +Application state is shared with all routes and resources within same application. +State could be accessed with `HttpRequest::state()` method as a read-only item +but interior mutability pattern with `RefCell` could be used to archive state mutability. +State could be accessed with `HttpContext::state()` in case of http actor. +State also available to route matching predicates and middlewares. + +Let's write simple application that uses shared state. We are going to store requests count +in the state: + +```rust +# extern crate actix; +# extern crate actix_web; +# +use actix_web::*; +use std::cell::Cell; + +// This struct represents state +struct AppState { + counter: Cell, +} + +fn index(req: HttpRequest) -> String { + let count = req.state().counter.get() + 1; // <- get count + req.state().counter.set(count); // <- store new count in state + + format!("Request number: {}", count) // <- response with count +} + +fn main() { + Application::with_state(AppState{counter: Cell::new(0)}) + .resource("/", |r| r.method(Method::GET).f(index)) + .finish(); +} +``` + +Note on application state, http server accepts application factory rather than application +instance. Http server construct application instance for each thread, so application state +must be constructed multiple times. If you want to share state between different thread +shared object should be used, like `Arc`. Application state does not need to be `Send` and `Sync` +but application factory must be `Send` + `Sync`. diff --git a/guide/src/qs_6.md b/guide/src/qs_6.md deleted file mode 100644 index f7c88946..00000000 --- a/guide/src/qs_6.md +++ /dev/null @@ -1,37 +0,0 @@ -# Application state - -Application state is shared with all routes and resources within same application. -State could be accessed with `HttpRequest::state()` method as a read-only item -but interior mutability pattern with `RefCell` could be used to archive state mutability. -State could be accessed with `HttpContext::state()` in case of http actor. -State also available to route matching predicates. State is not available -to application middlewares, middlewares receives `HttpRequest<()>` object. - -Let's write simple application that uses shared state. We are going to store requests count -in the state: - -```rust -# extern crate actix; -# extern crate actix_web; -# -use actix_web::*; -use std::cell::Cell; - -// This struct represents state -struct AppState { - counter: Cell, -} - -fn index(req: HttpRequest) -> String { - let count = req.state().counter.get() + 1; // <- get count - req.state().counter.set(count); // <- store new count in state - - format!("Request number: {}", count) // <- response with count -} - -fn main() { - Application::with_state(AppState{counter: Cell::new(0)}) - .resource("/", |r| r.method(Method::GET).f(index)) - .finish(); -} -``` diff --git a/src/channel.rs b/src/channel.rs index bf7e24a9..6503e82f 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -11,7 +11,7 @@ use h2; use error::Error; use h1writer::Writer; use httprequest::HttpRequest; -use server::ServerSettings; +use server::{ServerSettings, WorkerSettings}; /// Low level http request handler #[allow(unused_variables)] @@ -67,7 +67,8 @@ pub struct HttpChannel impl HttpChannel where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { - pub fn new(h: Rc>, io: T, peer: Option, http2: bool) -> HttpChannel + pub(crate) fn new(h: Rc>, + io: T, peer: Option, http2: bool) -> HttpChannel { if http2 { HttpChannel { diff --git a/src/h1.rs b/src/h1.rs index 3b47ca84..0f35f131 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -17,12 +17,12 @@ use pipeline::Pipeline; use encoding::PayloadType; use channel::{HttpHandler, HttpHandlerTask}; use h1writer::H1Writer; +use server::WorkerSettings; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use error::{ParseError, PayloadError, ResponseError}; use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE}; -const KEEPALIVE_PERIOD: u64 = 15; // seconds const INIT_BUFFER_SIZE: usize = 8192; const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 100; @@ -59,7 +59,7 @@ enum Item { pub(crate) struct Http1 { flags: Flags, - handlers: Rc>, + settings: Rc>, addr: Option, stream: H1Writer, reader: Reader, @@ -77,9 +77,9 @@ impl Http1 where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { - pub fn new(h: Rc>, stream: T, addr: Option) -> Self { + pub fn new(h: Rc>, stream: T, addr: Option) -> Self { Http1{ flags: Flags::KEEPALIVE, - handlers: h, + settings: h, addr: addr, stream: H1Writer::new(stream), reader: Reader::new(), @@ -88,8 +88,8 @@ impl Http1 keepalive_timer: None } } - pub fn into_inner(self) -> (Rc>, T, Option, Bytes) { - (self.handlers, self.stream.into_inner(), self.addr, self.read_buf.freeze()) + pub fn into_inner(self) -> (Rc>, T, Option, Bytes) { + (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) } pub fn poll(&mut self) -> Poll { @@ -198,7 +198,7 @@ impl Http1 // start request processing let mut pipe = None; - for h in self.handlers.iter() { + for h in self.settings.handlers().iter() { req = match h.handle(req) { Ok(t) => { pipe = Some(t); @@ -249,19 +249,24 @@ impl Http1 Ok(Async::NotReady) => { // start keep-alive timer, this is also slow request timeout if self.tasks.is_empty() { - if self.flags.contains(Flags::KEEPALIVE) { - if self.keepalive_timer.is_none() { - trace!("Start keep-alive timer"); - let mut to = Timeout::new( - Duration::new(KEEPALIVE_PERIOD, 0), - Arbiter::handle()).unwrap(); - // register timeout - let _ = to.poll(); - self.keepalive_timer = Some(to); + if let Some(keep_alive) = self.settings.keep_alive() { + if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) { + if self.keepalive_timer.is_none() { + trace!("Start keep-alive timer"); + let mut to = Timeout::new( + Duration::new(keep_alive as u64, 0), + Arbiter::handle()).unwrap(); + // register timeout + let _ = to.poll(); + self.keepalive_timer = Some(to); + } + } else { + // keep-alive disable, drop connection + return Ok(Async::Ready(Http1Result::Done)) } } else { - // keep-alive disable, drop connection - return Ok(Async::Ready(Http1Result::Done)) + // keep-alive unset, rely on operating system + return Ok(Async::NotReady) } } break diff --git a/src/h2.rs b/src/h2.rs index 62568162..87566277 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -16,6 +16,7 @@ use tokio_core::reactor::Timeout; use pipeline::Pipeline; use h2writer::H2Writer; +use server::WorkerSettings; use channel::{HttpHandler, HttpHandlerTask}; use error::PayloadError; use encoding::PayloadType; @@ -23,8 +24,6 @@ use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use payload::{Payload, PayloadWriter}; -const KEEPALIVE_PERIOD: u64 = 15; // seconds - bitflags! { struct Flags: u8 { const DISCONNECTED = 0b0000_0010; @@ -36,7 +35,7 @@ pub(crate) struct Http2 where T: AsyncRead + AsyncWrite + 'static, H: 'static { flags: Flags, - handlers: Rc>, + settings: Rc>, addr: Option, state: State>, tasks: VecDeque, @@ -53,14 +52,14 @@ impl Http2 where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { - pub fn new(h: Rc>, stream: T, addr: Option, buf: Bytes) -> Self + pub fn new(h: Rc>, io: T, addr: Option, buf: Bytes) -> Self { Http2{ flags: Flags::empty(), - handlers: h, + settings: h, addr: addr, tasks: VecDeque::new(), state: State::Handshake( - Server::handshake(IoWrapper{unread: Some(buf), inner: stream})), + Server::handshake(IoWrapper{unread: Some(buf), inner: io})), keepalive_timer: None, } } @@ -151,18 +150,28 @@ impl Http2 self.keepalive_timer.take(); self.tasks.push_back( - Entry::new(parts, body, resp, self.addr, &self.handlers)); + Entry::new(parts, body, resp, self.addr, &self.settings)); } Ok(Async::NotReady) => { // start keep-alive timer - if self.tasks.is_empty() && self.keepalive_timer.is_none() { - trace!("Start keep-alive timer"); - let mut timeout = Timeout::new( - Duration::new(KEEPALIVE_PERIOD, 0), - Arbiter::handle()).unwrap(); - // register timeout - let _ = timeout.poll(); - self.keepalive_timer = Some(timeout); + if self.tasks.is_empty() { + if let Some(keep_alive) = self.settings.keep_alive() { + if keep_alive > 0 && self.keepalive_timer.is_none() { + trace!("Start keep-alive timer"); + let mut timeout = Timeout::new( + Duration::new(keep_alive as u64, 0), + Arbiter::handle()).unwrap(); + // register timeout + let _ = timeout.poll(); + self.keepalive_timer = Some(timeout); + } + } else { + // keep-alive disable, drop connection + return Ok(Async::Ready(())) + } + } else { + // keep-alive unset, rely on operating system + return Ok(Async::NotReady) } } Err(err) => { @@ -230,7 +239,7 @@ impl Entry { recv: RecvStream, resp: Respond, addr: Option, - handlers: &Rc>) -> Entry + settings: &Rc>) -> Entry where H: HttpHandler + 'static { // Payload and Content-Encoding @@ -247,7 +256,7 @@ impl Entry { // start request processing let mut task = None; - for h in handlers.iter() { + for h in settings.handlers().iter() { req = match h.handle(req) { Ok(t) => { task = Some(t); diff --git a/src/lib.rs b/src/lib.rs index 8f05bc2c..7c05015e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,6 +110,7 @@ pub mod headers { //! Headers implementation pub use encoding::ContentEncoding; + pub use httpresponse::ConnectionType; pub use cookie::Cookie; pub use cookie::CookieBuilder; diff --git a/src/server.rs b/src/server.rs index 4e9c00e5..ee7ad90e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -90,10 +90,11 @@ impl ServerSettings { pub struct HttpServer where H: 'static { - h: Rc>, + h: Option>>, io: PhantomData, addr: PhantomData, threads: usize, + keep_alive: Option, factory: Arc U + Send + Sync>, workers: Vec>>, } @@ -124,10 +125,11 @@ impl HttpServer pub fn new(factory: F) -> Self where F: Sync + Send + 'static + Fn() -> U, { - HttpServer{ h: Rc::new(Vec::new()), + HttpServer{ h: None, io: PhantomData, addr: PhantomData, threads: num_cpus::get(), + keep_alive: None, factory: Arc::new(factory), workers: Vec::new(), } @@ -141,6 +143,20 @@ impl HttpServer self } + /// Set server keep-alive setting. + /// + /// By default keep alive is enabled. + /// + /// - `Some(75)` - enable + /// + /// - `Some(0)` - disable + /// + /// - `None` - use `SO_KEEPALIVE` socket option + pub fn keep_alive(mut self, val: Option) -> Self { + self.keep_alive = val; + self + } + /// Start listening for incomming connections from a stream. /// /// This method uses only one thread for handling incoming connections. @@ -155,7 +171,7 @@ impl HttpServer for app in &mut apps { app.server_settings(settings.clone()); } - self.h = Rc::new(apps); + self.h = Some(Rc::new(WorkerSettings{h: apps, keep_alive: self.keep_alive})); // start server Ok(HttpServer::create(move |ctx| { @@ -215,15 +231,16 @@ impl HttpServer } fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) - -> Vec>> + -> Vec>> { // start workers let mut workers = Vec::new(); for _ in 0..self.threads { let s = settings.clone(); - let (tx, rx) = mpsc::unbounded::>(); + let (tx, rx) = mpsc::unbounded::>(); let h = handler.clone(); + let ka = self.keep_alive.clone(); let factory = Arc::clone(&self.factory); let addr = Arbiter::start(move |ctx: &mut Context<_>| { let mut apps: Vec<_> = (*factory)() @@ -232,7 +249,7 @@ impl HttpServer app.server_settings(s.clone()); } ctx.add_stream(rx); - Worker{h: Rc::new(apps), handler: h} + Worker::new(apps, h, ka) }); workers.push(tx); self.workers.push(addr); @@ -379,7 +396,7 @@ impl Handler, io::Error> for HttpServer -> Response> { Arbiter::handle().spawn( - HttpChannel::new(Rc::clone(&self.h), msg.io, msg.peer, msg.http2)); + HttpChannel::new(Rc::clone(&self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)); Self::empty() } } @@ -389,11 +406,33 @@ impl Handler, io::Error> for HttpServer /// /// Worker accepts Socket objects via unbounded channel and start requests processing. struct Worker { - h: Rc>, + h: Rc>, handler: StreamHandlerType, } +pub(crate) struct WorkerSettings { + h: Vec, + keep_alive: Option, +} + +impl WorkerSettings { + pub fn handlers(&self) -> &Vec { + &self.h + } + pub fn keep_alive(&self) -> Option { + self.keep_alive + } +} + impl Worker { + + fn new(h: Vec, handler: StreamHandlerType, keep_alive: Option) -> Worker { + Worker { + h: Rc::new(WorkerSettings{h: h, keep_alive: keep_alive}), + handler: handler, + } + } + fn update_time(&self, ctx: &mut Context) { utils::update_date(); ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); @@ -408,15 +447,20 @@ impl Actor for Worker { } } -impl StreamHandler> for Worker +impl StreamHandler> for Worker where H: HttpHandler + 'static {} -impl Handler> for Worker +impl Handler> for Worker where H: HttpHandler + 'static, { - fn handle(&mut self, msg: IoStream, _: &mut Context) - -> Response> + fn handle(&mut self, msg: IoStream, _: &mut Context) + -> Response> { + if let None = self.h.keep_alive { + if msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err() { + error!("Can not set socket keep-alive option"); + } + } self.handler.handle(Rc::clone(&self.h), msg); Self::empty() } @@ -432,10 +476,11 @@ enum StreamHandlerType { } impl StreamHandlerType { - fn handle(&mut self, h: Rc>, msg: IoStream) { + + fn handle(&mut self, h: Rc>, msg: IoStream) { match *self { StreamHandlerType::Normal => { - let io = TcpStream::from_stream(msg.io, Arbiter::handle()) + let io = TcpStream::from_stream(msg.io.into_tcp_stream(), Arbiter::handle()) .expect("failed to associate TCP stream"); Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); @@ -443,7 +488,7 @@ impl StreamHandlerType { #[cfg(feature="tls")] StreamHandlerType::Tls(ref acceptor) => { let IoStream { io, peer, http2 } = msg; - let io = TcpStream::from_stream(io, Arbiter::handle()) + let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle()) .expect("failed to associate TCP stream"); Arbiter::handle().spawn( @@ -461,7 +506,7 @@ impl StreamHandlerType { #[cfg(feature="alpn")] StreamHandlerType::Alpn(ref acceptor) => { let IoStream { io, peer, .. } = msg; - let io = TcpStream::from_stream(io, Arbiter::handle()) + let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle()) .expect("failed to associate TCP stream"); Arbiter::handle().spawn( @@ -488,7 +533,7 @@ impl StreamHandlerType { } fn start_accept_thread(sock: Socket, addr: net::SocketAddr, - workers: Vec>>) { + workers: Vec>>) { // start acceptors thread let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { let mut next = 0; @@ -500,8 +545,7 @@ fn start_accept_thread(sock: Socket, addr: net::SocketAddr, } else { net::SocketAddr::V6(addr.as_inet6().unwrap()) }; - let msg = IoStream{ - io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; + let msg = IoStream{io: socket, peer: Some(addr), http2: false}; workers[next].unbounded_send(msg).expect("worker thread died"); next = (next + 1) % workers.len(); }