1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

refactor http workers

This commit is contained in:
Nikolay Kim 2017-12-13 12:47:07 -08:00
parent 6b61041aec
commit 81f8da03ae
3 changed files with 106 additions and 218 deletions

View File

@ -379,8 +379,7 @@ impl PayloadEncoder {
error!("Chunked transfer is enabled but body is set to Empty"); error!("Chunked transfer is enabled but body is set to Empty");
} }
resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0")); resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
resp.headers_mut().remove(TRANSFER_ENCODING); TransferEncoding::eof()
TransferEncoding::length(0)
}, },
Body::Binary(ref mut bytes) => { Body::Binary(ref mut bytes) => {
if compression { if compression {
@ -410,8 +409,7 @@ impl PayloadEncoder {
resp.headers_mut().insert( resp.headers_mut().insert(
CONTENT_LENGTH, CONTENT_LENGTH,
HeaderValue::from_str(&bytes.len().to_string()).unwrap()); HeaderValue::from_str(&bytes.len().to_string()).unwrap());
resp.headers_mut().remove(TRANSFER_ENCODING); TransferEncoding::eof()
TransferEncoding::length(bytes.len() as u64)
} }
} }
Body::Streaming(_) | Body::StreamingContext => { Body::Streaming(_) | Body::StreamingContext => {
@ -555,7 +553,7 @@ impl ContentEncoder {
} }
} }
#[inline] #[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> { pub fn write_eof(&mut self) -> Result<(), io::Error> {
let encoder = mem::replace(self, ContentEncoder::Identity(TransferEncoding::eof())); let encoder = mem::replace(self, ContentEncoder::Identity(TransferEncoding::eof()));
@ -594,7 +592,7 @@ impl ContentEncoder {
} }
} }
#[inline] #[inline(always)]
pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
match *self { match *self {
ContentEncoder::Br(ref mut encoder) => { ContentEncoder::Br(ref mut encoder) => {
@ -694,7 +692,7 @@ impl TransferEncoding {
} }
/// Encode message. Return `EOF` state of encoder /// Encode message. Return `EOF` state of encoder
#[inline] #[inline(always)]
pub fn encode(&mut self, msg: &[u8]) -> bool { pub fn encode(&mut self, msg: &[u8]) -> bool {
match self.kind { match self.kind {
TransferEncodingKind::Eof => { TransferEncodingKind::Eof => {
@ -732,7 +730,7 @@ impl TransferEncoding {
} }
/// Encode eof. Return `EOF` state of encoder /// Encode eof. Return `EOF` state of encoder
#[inline] #[inline(always)]
pub fn encode_eof(&mut self) { pub fn encode_eof(&mut self) {
match self.kind { match self.kind {
TransferEncodingKind::Eof | TransferEncodingKind::Length(_) => (), TransferEncodingKind::Eof | TransferEncodingKind::Length(_) => (),

View File

@ -145,9 +145,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
{ {
let buffer = self.encoder.get_mut(); let buffer = self.encoder.get_mut();
if let Body::Binary(ref bytes) = *msg.body() { if let Body::Binary(ref bytes) = *msg.body() {
buffer.reserve(100 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); buffer.reserve(130 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else { } else {
buffer.reserve(100 + msg.headers().len() * AVERAGE_HEADER_SIZE); buffer.reserve(130 + msg.headers().len() * AVERAGE_HEADER_SIZE);
} }
match version { match version {

View File

@ -1,6 +1,7 @@
use std::{io, net, thread}; use std::{io, net, thread};
use std::rc::Rc; use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
//use std::time::Duration;
use std::marker::PhantomData; use std::marker::PhantomData;
use actix::dev::*; use actix::dev::*;
@ -207,7 +208,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
} }
} }
fn start_workers(&mut self, settings: &ServerSettings) fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType)
-> Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>> -> Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>
{ {
// start workers // start workers
@ -216,6 +217,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
let s = settings.clone(); let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>(); let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>();
let h = handler.clone();
let factory = Arc::clone(&self.factory); let factory = Arc::clone(&self.factory);
let addr = Arbiter::start(move |ctx: &mut Context<_>| { let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let mut apps: Vec<_> = (*factory)() let mut apps: Vec<_> = (*factory)()
@ -224,7 +226,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
app.server_settings(s.clone()); app.server_settings(s.clone());
} }
ctx.add_stream(rx); ctx.add_stream(rx);
Worker{h: Rc::new(apps)} Worker{h: Rc::new(apps), handler: h}
}); });
workers.push(tx); workers.push(tx);
self.workers.push(addr); self.workers.push(addr);
@ -250,32 +252,12 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
{ {
let addrs = self.bind(addr)?; let addrs = self.bind(addr)?;
let settings = ServerSettings::new(Some(addrs[0].0), false); let settings = ServerSettings::new(Some(addrs[0].0), false);
let workers = self.start_workers(&settings); let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
// start acceptors threads // start acceptors threads
for (addr, sock) in addrs { for (addr, sock) in addrs {
let wrks = workers.clone();
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
let mut next = 0;
loop {
match sock.accept() {
Ok((socket, addr)) => {
let addr = if let Some(addr) = addr.as_inet() {
net::SocketAddr::V4(addr)
} else {
net::SocketAddr::V6(addr.as_inet6().unwrap())
};
let msg = IoStream{
io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
println!("next: {}", next);
wrks[next].unbounded_send(msg).expect("worker thread died");
next = (next + 1) % wrks.len();
}
Err(err) => error!("Error accepting connection: {:?}", err),
}
}
});
info!("Starting http server on {}", addr); info!("Starting http server on {}", addr);
start_accept_thread(sock, addr, workers.clone());
} }
// start http server actor // start http server actor
@ -292,7 +274,7 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
/// ///
/// This methods converts address to list of `SocketAddr` /// This methods converts address to list of `SocketAddr`
/// then binds to all available addresses. /// then binds to all available addresses.
pub fn serve_tls<S, Addr>(self, addr: S, pkcs12: ::Pkcs12) -> io::Result<Addr> pub fn serve_tls<S, Addr>(mut self, addr: S, pkcs12: ::Pkcs12) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>, where Self: ActorAddress<Self, Addr>,
S: net::ToSocketAddrs, S: net::ToSocketAddrs,
{ {
@ -307,52 +289,12 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
} }
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err))
}; };
let workers = self.start_workers(&settings, &StreamHandlerType::Tls(acceptor));
// start workers
let mut workers = Vec::new();
for _ in 0..self.threads {
let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>();
let acc = acceptor.clone();
let factory = Arc::clone(&self.factory);
let _addr = Arbiter::start(move |ctx: &mut Context<_>| {
let mut apps: Vec<_> = (*factory)()
.into_iter().map(|h| h.into_handler()).collect();
for app in &mut apps {
app.server_settings(s.clone());
}
ctx.add_stream(rx);
TlsWorker{h: Rc::new(apps), acceptor: acc}
});
workers.push(tx);
// self.workers.push(addr);
}
info!("Starting {} http workers", self.threads);
// start acceptors threads // start acceptors threads
for (addr, sock) in addrs { for (addr, sock) in addrs {
let wrks = workers.clone();
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
let mut next = 0;
loop {
match sock.accept() {
Ok((socket, addr)) => {
let addr = if let Some(addr) = addr.as_inet() {
net::SocketAddr::V4(addr)
} else {
net::SocketAddr::V6(addr.as_inet6().unwrap())
};
let msg = IoStream{
io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
wrks[next].unbounded_send(msg).expect("worker thread died");
next = (next + 1) % wrks.len();
}
Err(err) => error!("Error accepting connection: {:?}", err),
}
}
});
info!("Starting tls http server on {}", addr); info!("Starting tls http server on {}", addr);
start_accept_thread(sock, addr, workers.clone());
} }
// start http server actor // start http server actor
@ -369,7 +311,7 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
/// ///
/// This methods converts address to list of `SocketAddr` /// This methods converts address to list of `SocketAddr`
/// then binds to all available addresses. /// then binds to all available addresses.
pub fn serve_tls<S, Addr>(self, addr: S, identity: &ParsedPkcs12) -> io::Result<Addr> pub fn serve_tls<S, Addr>(mut self, addr: S, identity: &ParsedPkcs12) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>, where Self: ActorAddress<Self, Addr>,
S: net::ToSocketAddrs, S: net::ToSocketAddrs,
{ {
@ -386,52 +328,12 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
}, },
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err))
}; };
let workers = self.start_workers(&settings, &StreamHandlerType::Alpn(acceptor));
// start workers
let mut workers = Vec::new();
for _ in 0..self.threads {
let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>();
let acc = acceptor.clone();
let factory = Arc::clone(&self.factory);
let _addr = Arbiter::start(move |ctx: &mut Context<_>| {
let mut apps: Vec<_> = (*factory)()
.into_iter().map(|h| h.into_handler()).collect();
for app in &mut apps {
app.server_settings(s.clone());
}
ctx.add_stream(rx);
AlpnWorker{h: Rc::new(apps), acceptor: acc}
});
workers.push(tx);
// self.workers.push(addr);
}
info!("Starting {} http workers", self.threads);
// start acceptors threads // start acceptors threads
for (addr, sock) in addrs { for (addr, sock) in addrs {
let wrks = workers.clone();
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
let mut next = 0;
loop {
match sock.accept() {
Ok((socket, addr)) => {
let addr = if let Some(addr) = addr.as_inet() {
net::SocketAddr::V4(addr)
} else {
net::SocketAddr::V6(addr.as_inet6().unwrap())
};
let msg = IoStream{
io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
wrks[next].unbounded_send(msg).expect("worker thread died");
next = (next + 1) % wrks.len();
}
Err(err) => error!("Error accepting connection: {:?}", err),
}
}
});
info!("Starting tls http server on {}", addr); info!("Starting tls http server on {}", addr);
start_accept_thread(sock, addr, workers.clone());
} }
// start http server actor // start http server actor
@ -482,10 +384,15 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
/// Worker accepts Socket objects via unbounded channel and start requests processing. /// Worker accepts Socket objects via unbounded channel and start requests processing.
struct Worker<H> { struct Worker<H> {
h: Rc<Vec<H>>, h: Rc<Vec<H>>,
handler: StreamHandlerType,
} }
impl<H: 'static> Actor for Worker<H> { impl<H: 'static> Actor for Worker<H> {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
}
} }
impl<H> StreamHandler<IoStream<net::TcpStream>> for Worker<H> impl<H> StreamHandler<IoStream<net::TcpStream>> for Worker<H>
@ -497,113 +404,96 @@ impl<H> Handler<IoStream<net::TcpStream>> for Worker<H>
fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>) fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, IoStream<net::TcpStream>> -> Response<Self, IoStream<net::TcpStream>>
{ {
let io = TcpStream::from_stream(msg.io, Arbiter::handle()) self.handler.handle(Rc::clone(&self.h), msg);
.expect("failed to associate TCP stream");
Arbiter::handle().spawn(
HttpChannel::new(Rc::clone(&self.h), io, msg.peer, msg.http2));
Self::empty() Self::empty()
} }
} }
/// Tls http workers #[derive(Clone)]
/// enum StreamHandlerType {
/// Worker accepts Socket objects via unbounded channel and start requests processing. Normal,
#[cfg(feature="tls")] #[cfg(feature="tls")]
struct TlsWorker<H> { Tls(TlsAcceptor),
h: Rc<Vec<H>>, #[cfg(feature="alpn")]
acceptor: TlsAcceptor, Alpn(SslAcceptor),
} }
#[cfg(feature="tls")] impl StreamHandlerType {
impl<H: 'static> Actor for TlsWorker<H> { fn handle<H: HttpHandler>(&mut self, h: Rc<Vec<H>>, msg: IoStream<net::TcpStream>) {
type Context = Context<Self>; match *self {
} StreamHandlerType::Normal => {
let io = TcpStream::from_stream(msg.io, Arbiter::handle())
.expect("failed to associate TCP stream");
#[cfg(feature="tls")] Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
impl<H> StreamHandler<IoStream<net::TcpStream>> for TlsWorker<H> }
where H: HttpHandler + 'static {} #[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let IoStream { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io, Arbiter::handle())
.expect("failed to associate TCP stream");
#[cfg(feature="tls")] Arbiter::handle().spawn(
impl<H> Handler<IoStream<net::TcpStream>> for TlsWorker<H> TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
where H: HttpHandler + 'static, match res {
{ Ok(io) => Arbiter::handle().spawn(
fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>) HttpChannel::new(h, io, peer, http2)),
-> Response<Self, IoStream<net::TcpStream>> Err(err) =>
{ trace!("Error during handling tls connection: {}", err),
let IoStream { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io, Arbiter::handle())
.expect("failed to associate TCP stream");
let h = Rc::clone(&self.h);
Arbiter::handle().spawn(
TlsAcceptorExt::accept_async(&self.acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2)),
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
Self::empty()
}
}
/// Tls http workers with alpn support
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
#[cfg(feature="alpn")]
struct AlpnWorker<H> {
h: Rc<Vec<H>>,
acceptor: SslAcceptor,
}
#[cfg(feature="alpn")]
impl<H: 'static> Actor for AlpnWorker<H> {
type Context = Context<Self>;
}
#[cfg(feature="alpn")]
impl<H> StreamHandler<IoStream<net::TcpStream>> for AlpnWorker<H>
where H: HttpHandler + 'static {}
#[cfg(feature="alpn")]
impl<H> Handler<IoStream<net::TcpStream>> for AlpnWorker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, IoStream<net::TcpStream>>
{
let IoStream { io, peer, .. } = msg;
let io = TcpStream::from_stream(io, Arbiter::handle())
.expect("failed to associate TCP stream");
let h = Rc::clone(&self.h);
Arbiter::handle().spawn(
SslAcceptorExt::accept_async(&self.acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
}; };
Arbiter::handle().spawn( future::result(Ok(()))
HttpChannel::new(h, io, peer, http2)); })
}, );
Err(err) => }
trace!("Error during handling tls connection: {}", err), #[cfg(feature="alpn")]
}; StreamHandlerType::Alpn(ref acceptor) => {
future::result(Ok(())) let IoStream { io, peer, .. } = msg;
}) let io = TcpStream::from_stream(io, Arbiter::handle())
); .expect("failed to associate TCP stream");
Self::empty() Arbiter::handle().spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
}
} }
} }
fn start_accept_thread(sock: Socket, addr: net::SocketAddr,
workers: Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>) {
// start acceptors thread
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
let mut next = 0;
loop {
match sock.accept() {
Ok((socket, addr)) => {
let addr = if let Some(addr) = addr.as_inet() {
net::SocketAddr::V4(addr)
} else {
net::SocketAddr::V6(addr.as_inet6().unwrap())
};
let msg = IoStream{
io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
workers[next].unbounded_send(msg).expect("worker thread died");
next = (next + 1) % workers.len();
}
Err(err) => error!("Error accepting connection: {:?}", err),
}
}
});
}