1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-25 09:59:21 +02:00

refactor keep-alive; update guide

This commit is contained in:
Nikolay Kim
2017-12-13 21:38:47 -08:00
parent 653b431895
commit c2751efa87
9 changed files with 161 additions and 95 deletions

View File

@ -11,7 +11,7 @@ use h2;
use error::Error;
use h1writer::Writer;
use httprequest::HttpRequest;
use server::ServerSettings;
use server::{ServerSettings, WorkerSettings};
/// Low level http request handler
#[allow(unused_variables)]
@ -67,7 +67,8 @@ pub struct HttpChannel<T, H>
impl<T, H> HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
pub fn new(h: Rc<Vec<H>>, io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
pub(crate) fn new(h: Rc<WorkerSettings<H>>,
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
{
if http2 {
HttpChannel {

View File

@ -17,12 +17,12 @@ use pipeline::Pipeline;
use encoding::PayloadType;
use channel::{HttpHandler, HttpHandlerTask};
use h1writer::H1Writer;
use server::WorkerSettings;
use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use error::{ParseError, PayloadError, ResponseError};
use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
const KEEPALIVE_PERIOD: u64 = 15; // seconds
const INIT_BUFFER_SIZE: usize = 8192;
const MAX_BUFFER_SIZE: usize = 131_072;
const MAX_HEADERS: usize = 100;
@ -59,7 +59,7 @@ enum Item {
pub(crate) struct Http1<T: AsyncWrite + 'static, H: 'static> {
flags: Flags,
handlers: Rc<Vec<H>>,
settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>,
stream: H1Writer<T>,
reader: Reader,
@ -77,9 +77,9 @@ impl<T, H> Http1<T, H>
where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static
{
pub fn new(h: Rc<Vec<H>>, stream: T, addr: Option<SocketAddr>) -> Self {
pub fn new(h: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>) -> Self {
Http1{ flags: Flags::KEEPALIVE,
handlers: h,
settings: h,
addr: addr,
stream: H1Writer::new(stream),
reader: Reader::new(),
@ -88,8 +88,8 @@ impl<T, H> Http1<T, H>
keepalive_timer: None }
}
pub fn into_inner(self) -> (Rc<Vec<H>>, T, Option<SocketAddr>, Bytes) {
(self.handlers, self.stream.into_inner(), self.addr, self.read_buf.freeze())
pub fn into_inner(self) -> (Rc<WorkerSettings<H>>, T, Option<SocketAddr>, Bytes) {
(self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze())
}
pub fn poll(&mut self) -> Poll<Http1Result, ()> {
@ -198,7 +198,7 @@ impl<T, H> Http1<T, H>
// start request processing
let mut pipe = None;
for h in self.handlers.iter() {
for h in self.settings.handlers().iter() {
req = match h.handle(req) {
Ok(t) => {
pipe = Some(t);
@ -249,19 +249,24 @@ impl<T, H> Http1<T, H>
Ok(Async::NotReady) => {
// start keep-alive timer, this is also slow request timeout
if self.tasks.is_empty() {
if self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut to = Timeout::new(
Duration::new(KEEPALIVE_PERIOD, 0),
Arbiter::handle()).unwrap();
// register timeout
let _ = to.poll();
self.keepalive_timer = Some(to);
if let Some(keep_alive) = self.settings.keep_alive() {
if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut to = Timeout::new(
Duration::new(keep_alive as u64, 0),
Arbiter::handle()).unwrap();
// register timeout
let _ = to.poll();
self.keepalive_timer = Some(to);
}
} else {
// keep-alive disable, drop connection
return Ok(Async::Ready(Http1Result::Done))
}
} else {
// keep-alive disable, drop connection
return Ok(Async::Ready(Http1Result::Done))
// keep-alive unset, rely on operating system
return Ok(Async::NotReady)
}
}
break

View File

@ -16,6 +16,7 @@ use tokio_core::reactor::Timeout;
use pipeline::Pipeline;
use h2writer::H2Writer;
use server::WorkerSettings;
use channel::{HttpHandler, HttpHandlerTask};
use error::PayloadError;
use encoding::PayloadType;
@ -23,8 +24,6 @@ use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use payload::{Payload, PayloadWriter};
const KEEPALIVE_PERIOD: u64 = 15; // seconds
bitflags! {
struct Flags: u8 {
const DISCONNECTED = 0b0000_0010;
@ -36,7 +35,7 @@ pub(crate) struct Http2<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: 'static
{
flags: Flags,
handlers: Rc<Vec<H>>,
settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>,
state: State<IoWrapper<T>>,
tasks: VecDeque<Entry>,
@ -53,14 +52,14 @@ impl<T, H> Http2<T, H>
where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static
{
pub fn new(h: Rc<Vec<H>>, stream: T, addr: Option<SocketAddr>, buf: Bytes) -> Self
pub fn new(h: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes) -> Self
{
Http2{ flags: Flags::empty(),
handlers: h,
settings: h,
addr: addr,
tasks: VecDeque::new(),
state: State::Handshake(
Server::handshake(IoWrapper{unread: Some(buf), inner: stream})),
Server::handshake(IoWrapper{unread: Some(buf), inner: io})),
keepalive_timer: None,
}
}
@ -151,18 +150,28 @@ impl<T, H> Http2<T, H>
self.keepalive_timer.take();
self.tasks.push_back(
Entry::new(parts, body, resp, self.addr, &self.handlers));
Entry::new(parts, body, resp, self.addr, &self.settings));
}
Ok(Async::NotReady) => {
// start keep-alive timer
if self.tasks.is_empty() && self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut timeout = Timeout::new(
Duration::new(KEEPALIVE_PERIOD, 0),
Arbiter::handle()).unwrap();
// register timeout
let _ = timeout.poll();
self.keepalive_timer = Some(timeout);
if self.tasks.is_empty() {
if let Some(keep_alive) = self.settings.keep_alive() {
if keep_alive > 0 && self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut timeout = Timeout::new(
Duration::new(keep_alive as u64, 0),
Arbiter::handle()).unwrap();
// register timeout
let _ = timeout.poll();
self.keepalive_timer = Some(timeout);
}
} else {
// keep-alive disable, drop connection
return Ok(Async::Ready(()))
}
} else {
// keep-alive unset, rely on operating system
return Ok(Async::NotReady)
}
}
Err(err) => {
@ -230,7 +239,7 @@ impl Entry {
recv: RecvStream,
resp: Respond<Bytes>,
addr: Option<SocketAddr>,
handlers: &Rc<Vec<H>>) -> Entry
settings: &Rc<WorkerSettings<H>>) -> Entry
where H: HttpHandler + 'static
{
// Payload and Content-Encoding
@ -247,7 +256,7 @@ impl Entry {
// start request processing
let mut task = None;
for h in handlers.iter() {
for h in settings.handlers().iter() {
req = match h.handle(req) {
Ok(t) => {
task = Some(t);

View File

@ -110,6 +110,7 @@ pub mod headers {
//! Headers implementation
pub use encoding::ContentEncoding;
pub use httpresponse::ConnectionType;
pub use cookie::Cookie;
pub use cookie::CookieBuilder;

View File

@ -90,10 +90,11 @@ impl ServerSettings {
pub struct HttpServer<T, A, H, U>
where H: 'static
{
h: Rc<Vec<H>>,
h: Option<Rc<WorkerSettings<H>>>,
io: PhantomData<T>,
addr: PhantomData<A>,
threads: usize,
keep_alive: Option<u16>,
factory: Arc<Fn() -> U + Send + Sync>,
workers: Vec<SyncAddress<Worker<H>>>,
}
@ -124,10 +125,11 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
pub fn new<F>(factory: F) -> Self
where F: Sync + Send + 'static + Fn() -> U,
{
HttpServer{ h: Rc::new(Vec::new()),
HttpServer{ h: None,
io: PhantomData,
addr: PhantomData,
threads: num_cpus::get(),
keep_alive: None,
factory: Arc::new(factory),
workers: Vec::new(),
}
@ -141,6 +143,20 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
self
}
/// Set server keep-alive setting.
///
/// By default keep alive is enabled.
///
/// - `Some(75)` - enable
///
/// - `Some(0)` - disable
///
/// - `None` - use `SO_KEEPALIVE` socket option
pub fn keep_alive(mut self, val: Option<u16>) -> Self {
self.keep_alive = val;
self
}
/// Start listening for incomming connections from a stream.
///
/// This method uses only one thread for handling incoming connections.
@ -155,7 +171,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
for app in &mut apps {
app.server_settings(settings.clone());
}
self.h = Rc::new(apps);
self.h = Some(Rc::new(WorkerSettings{h: apps, keep_alive: self.keep_alive}));
// start server
Ok(HttpServer::create(move |ctx| {
@ -215,15 +231,16 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
}
fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType)
-> Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>
-> Vec<mpsc::UnboundedSender<IoStream<Socket>>>
{
// start workers
let mut workers = Vec::new();
for _ in 0..self.threads {
let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>();
let (tx, rx) = mpsc::unbounded::<IoStream<Socket>>();
let h = handler.clone();
let ka = self.keep_alive.clone();
let factory = Arc::clone(&self.factory);
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let mut apps: Vec<_> = (*factory)()
@ -232,7 +249,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
app.server_settings(s.clone());
}
ctx.add_stream(rx);
Worker{h: Rc::new(apps), handler: h}
Worker::new(apps, h, ka)
});
workers.push(tx);
self.workers.push(addr);
@ -379,7 +396,7 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
-> Response<Self, IoStream<T>>
{
Arbiter::handle().spawn(
HttpChannel::new(Rc::clone(&self.h), msg.io, msg.peer, msg.http2));
HttpChannel::new(Rc::clone(&self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2));
Self::empty()
}
}
@ -389,11 +406,33 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
struct Worker<H> {
h: Rc<Vec<H>>,
h: Rc<WorkerSettings<H>>,
handler: StreamHandlerType,
}
pub(crate) struct WorkerSettings<H> {
h: Vec<H>,
keep_alive: Option<u16>,
}
impl<H> WorkerSettings<H> {
pub fn handlers(&self) -> &Vec<H> {
&self.h
}
pub fn keep_alive(&self) -> Option<u16> {
self.keep_alive
}
}
impl<H: 'static> Worker<H> {
fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u16>) -> Worker<H> {
Worker {
h: Rc::new(WorkerSettings{h: h, keep_alive: keep_alive}),
handler: handler,
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
utils::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
@ -408,15 +447,20 @@ impl<H: 'static> Actor for Worker<H> {
}
}
impl<H> StreamHandler<IoStream<net::TcpStream>> for Worker<H>
impl<H> StreamHandler<IoStream<Socket>> for Worker<H>
where H: HttpHandler + 'static {}
impl<H> Handler<IoStream<net::TcpStream>> for Worker<H>
impl<H> Handler<IoStream<Socket>> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, IoStream<net::TcpStream>>
fn handle(&mut self, msg: IoStream<Socket>, _: &mut Context<Self>)
-> Response<Self, IoStream<Socket>>
{
if let None = self.h.keep_alive {
if msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err() {
error!("Can not set socket keep-alive option");
}
}
self.handler.handle(Rc::clone(&self.h), msg);
Self::empty()
}
@ -432,10 +476,11 @@ enum StreamHandlerType {
}
impl StreamHandlerType {
fn handle<H: HttpHandler>(&mut self, h: Rc<Vec<H>>, msg: IoStream<net::TcpStream>) {
fn handle<H: HttpHandler>(&mut self, h: Rc<WorkerSettings<H>>, msg: IoStream<Socket>) {
match *self {
StreamHandlerType::Normal => {
let io = TcpStream::from_stream(msg.io, Arbiter::handle())
let io = TcpStream::from_stream(msg.io.into_tcp_stream(), Arbiter::handle())
.expect("failed to associate TCP stream");
Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
@ -443,7 +488,7 @@ impl StreamHandlerType {
#[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let IoStream { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io, Arbiter::handle())
let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle())
.expect("failed to associate TCP stream");
Arbiter::handle().spawn(
@ -461,7 +506,7 @@ impl StreamHandlerType {
#[cfg(feature="alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let IoStream { io, peer, .. } = msg;
let io = TcpStream::from_stream(io, Arbiter::handle())
let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle())
.expect("failed to associate TCP stream");
Arbiter::handle().spawn(
@ -488,7 +533,7 @@ impl StreamHandlerType {
}
fn start_accept_thread(sock: Socket, addr: net::SocketAddr,
workers: Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>) {
workers: Vec<mpsc::UnboundedSender<IoStream<Socket>>>) {
// start acceptors thread
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
let mut next = 0;
@ -500,8 +545,7 @@ fn start_accept_thread(sock: Socket, addr: net::SocketAddr,
} else {
net::SocketAddr::V6(addr.as_inet6().unwrap())
};
let msg = IoStream{
io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
let msg = IoStream{io: socket, peer: Some(addr), http2: false};
workers[next].unbounded_send(msg).expect("worker thread died");
next = (next + 1) % workers.len();
}