1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

split worker code to separate module

This commit is contained in:
Nikolay Kim 2017-12-28 12:38:37 -08:00
parent d8b0ce88a5
commit 6a2bb9a473
7 changed files with 194 additions and 181 deletions

View File

@ -11,7 +11,8 @@ use h2;
use error::Error; use error::Error;
use h1writer::Writer; use h1writer::Writer;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use server::{ServerSettings, WorkerSettings}; use server::ServerSettings;
use worker::WorkerSettings;
/// Low level http request handler /// Low level http request handler
#[allow(unused_variables)] #[allow(unused_variables)]

View File

@ -17,7 +17,7 @@ use pipeline::Pipeline;
use encoding::PayloadType; use encoding::PayloadType;
use channel::{HttpHandler, HttpHandlerTask}; use channel::{HttpHandler, HttpHandlerTask};
use h1writer::{Writer, H1Writer}; use h1writer::{Writer, H1Writer};
use server::WorkerSettings; use worker::WorkerSettings;
use httpcodes::HTTPNotFound; use httpcodes::HTTPNotFound;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use error::{ParseError, PayloadError, ResponseError}; use error::{ParseError, PayloadError, ResponseError};
@ -888,7 +888,7 @@ mod tests {
use http::{Version, Method}; use http::{Version, Method};
use super::*; use super::*;
use application::HttpApplication; use application::HttpApplication;
use server::WorkerSettings; use worker::WorkerSettings;
struct Buffer { struct Buffer {
buf: Bytes, buf: Bytes,

View File

@ -16,7 +16,7 @@ use tokio_core::reactor::Timeout;
use pipeline::Pipeline; use pipeline::Pipeline;
use h2writer::H2Writer; use h2writer::H2Writer;
use server::WorkerSettings; use worker::WorkerSettings;
use channel::{HttpHandler, HttpHandlerTask}; use channel::{HttpHandler, HttpHandlerTask};
use error::PayloadError; use error::PayloadError;
use encoding::PayloadType; use encoding::PayloadType;

View File

@ -103,6 +103,7 @@ mod resource;
mod handler; mod handler;
mod pipeline; mod pipeline;
mod server; mod server;
mod worker;
mod channel; mod channel;
mod wsframe; mod wsframe;
mod wsproto; mod wsproto;

View File

@ -1,6 +1,5 @@
use std::{io, net, thread}; use std::{io, net, thread};
use std::rc::Rc; use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use std::sync::{Arc, mpsc as sync_mpsc}; use std::sync::{Arc, mpsc as sync_mpsc};
use std::time::Duration; use std::time::Duration;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -11,11 +10,10 @@ use actix::System;
use futures::Stream; use futures::Stream;
use futures::sync::mpsc; use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Handle;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use mio; use mio;
use num_cpus; use num_cpus;
use net2::{TcpBuilder, TcpStreamExt}; use net2::TcpBuilder;
#[cfg(feature="tls")] #[cfg(feature="tls")]
use futures::{future, Future}; use futures::{future, Future};
@ -38,6 +36,7 @@ use actix::actors::signal;
use helpers; use helpers;
use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; use channel::{HttpChannel, HttpHandler, IntoHttpHandler};
use worker::{Conn, Worker, WorkerSettings, StreamHandlerType};
/// Various server settings /// Various server settings
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -248,13 +247,13 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
} }
fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType)
-> Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>> -> Vec<mpsc::UnboundedSender<Conn<net::TcpStream>>>
{ {
// start workers // start workers
let mut workers = Vec::new(); let mut workers = Vec::new();
for _ in 0..self.threads { for _ in 0..self.threads {
let s = settings.clone(); let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>(); let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let h = handler.clone(); let h = handler.clone();
let ka = self.keep_alive; let ka = self.keep_alive;
@ -483,7 +482,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
// start server // start server
HttpServer::create(move |ctx| { HttpServer::create(move |ctx| {
ctx.add_stream(stream.map( ctx.add_stream(stream.map(
move |(t, _)| IoStream{io: t, peer: None, http2: false})); move |(t, _)| Conn{io: t, peer: None, http2: false}));
self self
}) })
} }
@ -524,20 +523,13 @@ impl<T, A, H, U> Handler<signal::Signal> for HttpServer<T, A, H, U>
} }
} }
#[derive(Message)] impl<T, A, H, U> StreamHandler<Conn<T>, io::Error> for HttpServer<T, A, H, U>
struct IoStream<T> {
io: T,
peer: Option<net::SocketAddr>,
http2: bool,
}
impl<T, A, H, U> StreamHandler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static {} A: 'static {}
impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<Conn<T>, io::Error> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
@ -547,8 +539,7 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
debug!("Error handling request: {}", err) debug!("Error handling request: {}", err)
} }
fn handle(&mut self, msg: IoStream<T>, _: &mut Context<Self>) fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Response<Self, Conn<T>>
-> Response<Self, IoStream<T>>
{ {
Arbiter::handle().spawn( Arbiter::handle().spawn(
HttpChannel::new(Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)); HttpChannel::new(Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2));
@ -629,163 +620,6 @@ impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U>
} }
} }
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
struct Worker<H> {
h: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
}
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: Option<u64>) -> WorkerSettings<H> {
WorkerSettings {
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
}
}
pub fn handlers(&self) -> RefMut<Vec<H>> {
self.h.borrow_mut()
}
pub fn keep_alive(&self) -> u64 {
self.keep_alive
}
pub fn keep_alive_enabled(&self) -> bool {
self.enabled
}
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
}
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages))
}
}
impl<H: 'static> Worker<H> {
fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>) -> Worker<H> {
Worker {
h: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler: handler,
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
}
impl<H: 'static> Actor for Worker<H> {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<H> StreamHandler<IoStream<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static {}
impl<H> Handler<IoStream<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, IoStream<net::TcpStream>>
{
if !self.h.keep_alive_enabled() &&
msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err()
{
error!("Can not set socket keep-alive option");
}
self.handler.handle(Rc::clone(&self.h), &self.hnd, msg);
Self::empty()
}
}
#[derive(Clone)]
enum StreamHandlerType {
Normal,
#[cfg(feature="tls")]
Tls(TlsAcceptor),
#[cfg(feature="alpn")]
Alpn(SslAcceptor),
}
impl StreamHandlerType {
fn handle<H: HttpHandler>(&mut self,
h: Rc<WorkerSettings<H>>,
hnd: &Handle,
msg: IoStream<net::TcpStream>) {
match *self {
StreamHandlerType::Normal => {
let io = TcpStream::from_stream(msg.io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
}
#[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let IoStream { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2)),
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
#[cfg(feature="alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let IoStream { io, peer, .. } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
}
}
}
enum Command { enum Command {
Pause, Pause,
Resume, Resume,
@ -793,7 +627,7 @@ enum Command {
} }
fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32,
workers: Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>) workers: Vec<mpsc::UnboundedSender<Conn<net::TcpStream>>>)
-> (mio::SetReadiness, sync_mpsc::Sender<Command>) -> (mio::SetReadiness, sync_mpsc::Sender<Command>)
{ {
let (tx, rx) = sync_mpsc::channel(); let (tx, rx) = sync_mpsc::channel();
@ -844,7 +678,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
loop { loop {
match server.accept_std() { match server.accept_std() {
Ok((sock, addr)) => { Ok((sock, addr)) => {
let msg = IoStream{ let msg = Conn{
io: sock, peer: Some(addr), http2: false}; io: sock, peer: Some(addr), http2: false};
workers[next].unbounded_send(msg) workers[next].unbounded_send(msg)
.expect("worker thread died"); .expect("worker thread died");

177
src/worker.rs Normal file
View File

@ -0,0 +1,177 @@
use std::{net, time};
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use net2::TcpStreamExt;
use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Response, StreamHandler};
use helpers;
use channel::{HttpChannel, HttpHandler};
#[derive(Message)]
pub(crate) struct Conn<T> {
pub io: T,
pub peer: Option<net::SocketAddr>,
pub http2: bool,
}
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
enabled: bool,
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(h: Vec<H>, keep_alive: Option<u64>) -> WorkerSettings<H> {
WorkerSettings {
h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
}
}
pub fn handlers(&self) -> RefMut<Vec<H>> {
self.h.borrow_mut()
}
pub fn keep_alive(&self) -> u64 {
self.keep_alive
}
pub fn keep_alive_enabled(&self) -> bool {
self.enabled
}
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
}
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages))
}
}
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
pub(crate) struct Worker<H> {
h: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
}
impl<H: 'static> Worker<H> {
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>)
-> Worker<H>
{
Worker {
h: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler: handler,
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
}
impl<H: 'static> Actor for Worker<H> {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<H> StreamHandler<Conn<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static {}
impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, Conn<net::TcpStream>>
{
if !self.h.keep_alive_enabled() &&
msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err()
{
error!("Can not set socket keep-alive option");
}
self.handler.handle(Rc::clone(&self.h), &self.hnd, msg);
Self::empty()
}
}
#[derive(Clone)]
pub(crate) enum StreamHandlerType {
Normal,
#[cfg(feature="tls")]
Tls(TlsAcceptor),
#[cfg(feature="alpn")]
Alpn(SslAcceptor),
}
impl StreamHandlerType {
fn handle<H: HttpHandler>(&mut self,
h: Rc<WorkerSettings<H>>,
hnd: &Handle, msg: Conn<net::TcpStream>) {
match *self {
StreamHandlerType::Normal => {
let io = TcpStream::from_stream(msg.io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
}
#[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let Conn { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2)),
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
#[cfg(feature="alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let Conn { io, peer, .. } = msg;
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
}
}
}

View File

@ -24,7 +24,7 @@ fn test_start() {
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]); .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]);
let srv = srv.bind("127.0.0.1:0").unwrap(); let srv = srv.bind("127.0.0.1:0").unwrap();
let addr = srv.addrs()[0].clone(); let addr = srv.addrs()[0];
let srv_addr = srv.start(); let srv_addr = srv.start();
let _ = tx.send((addr, srv_addr)); let _ = tx.send((addr, srv_addr));
sys.run(); sys.run();