From 81f8da03ae9cb0092ca0b093daeff393120b640d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 13 Dec 2017 12:47:07 -0800 Subject: [PATCH] refactor http workers --- src/encoding.rs | 14 +-- src/h1writer.rs | 4 +- src/server.rs | 306 ++++++++++++++++-------------------------------- 3 files changed, 106 insertions(+), 218 deletions(-) diff --git a/src/encoding.rs b/src/encoding.rs index e4af9592..489dcf25 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -379,8 +379,7 @@ impl PayloadEncoder { error!("Chunked transfer is enabled but body is set to Empty"); } resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0")); - resp.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::length(0) + TransferEncoding::eof() }, Body::Binary(ref mut bytes) => { if compression { @@ -410,8 +409,7 @@ impl PayloadEncoder { resp.headers_mut().insert( CONTENT_LENGTH, HeaderValue::from_str(&bytes.len().to_string()).unwrap()); - resp.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::length(bytes.len() as u64) + TransferEncoding::eof() } } Body::Streaming(_) | Body::StreamingContext => { @@ -555,7 +553,7 @@ impl ContentEncoder { } } - #[inline] + #[inline(always)] pub fn write_eof(&mut self) -> Result<(), io::Error> { let encoder = mem::replace(self, ContentEncoder::Identity(TransferEncoding::eof())); @@ -594,7 +592,7 @@ impl ContentEncoder { } } - #[inline] + #[inline(always)] pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { match *self { ContentEncoder::Br(ref mut encoder) => { @@ -694,7 +692,7 @@ impl TransferEncoding { } /// Encode message. Return `EOF` state of encoder - #[inline] + #[inline(always)] pub fn encode(&mut self, msg: &[u8]) -> bool { match self.kind { TransferEncodingKind::Eof => { @@ -732,7 +730,7 @@ impl TransferEncoding { } /// Encode eof. Return `EOF` state of encoder - #[inline] + #[inline(always)] pub fn encode_eof(&mut self) { match self.kind { TransferEncodingKind::Eof | TransferEncodingKind::Length(_) => (), diff --git a/src/h1writer.rs b/src/h1writer.rs index ec3c6c31..186cdf13 100644 --- a/src/h1writer.rs +++ b/src/h1writer.rs @@ -145,9 +145,9 @@ impl Writer for H1Writer { { let buffer = self.encoder.get_mut(); if let Body::Binary(ref bytes) = *msg.body() { - buffer.reserve(100 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); + buffer.reserve(130 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); } else { - buffer.reserve(100 + msg.headers().len() * AVERAGE_HEADER_SIZE); + buffer.reserve(130 + msg.headers().len() * AVERAGE_HEADER_SIZE); } match version { diff --git a/src/server.rs b/src/server.rs index 3e81e542..433ee5af 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,7 @@ use std::{io, net, thread}; use std::rc::Rc; use std::sync::Arc; +//use std::time::Duration; use std::marker::PhantomData; use actix::dev::*; @@ -207,7 +208,7 @@ impl HttpServer } } - fn start_workers(&mut self, settings: &ServerSettings) + fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) -> Vec>> { // start workers @@ -216,6 +217,7 @@ impl HttpServer let s = settings.clone(); let (tx, rx) = mpsc::unbounded::>(); + let h = handler.clone(); let factory = Arc::clone(&self.factory); let addr = Arbiter::start(move |ctx: &mut Context<_>| { let mut apps: Vec<_> = (*factory)() @@ -224,7 +226,7 @@ impl HttpServer app.server_settings(s.clone()); } ctx.add_stream(rx); - Worker{h: Rc::new(apps)} + Worker{h: Rc::new(apps), handler: h} }); workers.push(tx); self.workers.push(addr); @@ -250,32 +252,12 @@ impl HttpServer { let addrs = self.bind(addr)?; let settings = ServerSettings::new(Some(addrs[0].0), false); - let workers = self.start_workers(&settings); + let workers = self.start_workers(&settings, &StreamHandlerType::Normal); // start acceptors threads for (addr, sock) in addrs { - let wrks = workers.clone(); - let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { - let mut next = 0; - loop { - match sock.accept() { - Ok((socket, addr)) => { - let addr = if let Some(addr) = addr.as_inet() { - net::SocketAddr::V4(addr) - } else { - net::SocketAddr::V6(addr.as_inet6().unwrap()) - }; - let msg = IoStream{ - io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; - println!("next: {}", next); - wrks[next].unbounded_send(msg).expect("worker thread died"); - next = (next + 1) % wrks.len(); - } - Err(err) => error!("Error accepting connection: {:?}", err), - } - } - }); info!("Starting http server on {}", addr); + start_accept_thread(sock, addr, workers.clone()); } // start http server actor @@ -292,7 +274,7 @@ impl HttpServer, net::SocketAddr, H, /// /// This methods converts address to list of `SocketAddr` /// then binds to all available addresses. - pub fn serve_tls(self, addr: S, pkcs12: ::Pkcs12) -> io::Result + pub fn serve_tls(mut self, addr: S, pkcs12: ::Pkcs12) -> io::Result where Self: ActorAddress, S: net::ToSocketAddrs, { @@ -307,52 +289,12 @@ impl HttpServer, net::SocketAddr, H, } Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) }; - - // start workers - let mut workers = Vec::new(); - for _ in 0..self.threads { - let s = settings.clone(); - let (tx, rx) = mpsc::unbounded::>(); - - let acc = acceptor.clone(); - let factory = Arc::clone(&self.factory); - let _addr = Arbiter::start(move |ctx: &mut Context<_>| { - let mut apps: Vec<_> = (*factory)() - .into_iter().map(|h| h.into_handler()).collect(); - for app in &mut apps { - app.server_settings(s.clone()); - } - ctx.add_stream(rx); - TlsWorker{h: Rc::new(apps), acceptor: acc} - }); - workers.push(tx); - // self.workers.push(addr); - } - info!("Starting {} http workers", self.threads); + let workers = self.start_workers(&settings, &StreamHandlerType::Tls(acceptor)); // start acceptors threads for (addr, sock) in addrs { - let wrks = workers.clone(); - let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { - let mut next = 0; - loop { - match sock.accept() { - Ok((socket, addr)) => { - let addr = if let Some(addr) = addr.as_inet() { - net::SocketAddr::V4(addr) - } else { - net::SocketAddr::V6(addr.as_inet6().unwrap()) - }; - let msg = IoStream{ - io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; - wrks[next].unbounded_send(msg).expect("worker thread died"); - next = (next + 1) % wrks.len(); - } - Err(err) => error!("Error accepting connection: {:?}", err), - } - } - }); info!("Starting tls http server on {}", addr); + start_accept_thread(sock, addr, workers.clone()); } // start http server actor @@ -369,7 +311,7 @@ impl HttpServer, net::SocketAddr, H, /// /// This methods converts address to list of `SocketAddr` /// then binds to all available addresses. - pub fn serve_tls(self, addr: S, identity: &ParsedPkcs12) -> io::Result + pub fn serve_tls(mut self, addr: S, identity: &ParsedPkcs12) -> io::Result where Self: ActorAddress, S: net::ToSocketAddrs, { @@ -386,52 +328,12 @@ impl HttpServer, net::SocketAddr, H, }, Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) }; - - // start workers - let mut workers = Vec::new(); - for _ in 0..self.threads { - let s = settings.clone(); - let (tx, rx) = mpsc::unbounded::>(); - - let acc = acceptor.clone(); - let factory = Arc::clone(&self.factory); - let _addr = Arbiter::start(move |ctx: &mut Context<_>| { - let mut apps: Vec<_> = (*factory)() - .into_iter().map(|h| h.into_handler()).collect(); - for app in &mut apps { - app.server_settings(s.clone()); - } - ctx.add_stream(rx); - AlpnWorker{h: Rc::new(apps), acceptor: acc} - }); - workers.push(tx); - // self.workers.push(addr); - } - info!("Starting {} http workers", self.threads); + let workers = self.start_workers(&settings, &StreamHandlerType::Alpn(acceptor)); // start acceptors threads for (addr, sock) in addrs { - let wrks = workers.clone(); - let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { - let mut next = 0; - loop { - match sock.accept() { - Ok((socket, addr)) => { - let addr = if let Some(addr) = addr.as_inet() { - net::SocketAddr::V4(addr) - } else { - net::SocketAddr::V6(addr.as_inet6().unwrap()) - }; - let msg = IoStream{ - io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; - wrks[next].unbounded_send(msg).expect("worker thread died"); - next = (next + 1) % wrks.len(); - } - Err(err) => error!("Error accepting connection: {:?}", err), - } - } - }); info!("Starting tls http server on {}", addr); + start_accept_thread(sock, addr, workers.clone()); } // start http server actor @@ -482,10 +384,15 @@ impl Handler, io::Error> for HttpServer /// Worker accepts Socket objects via unbounded channel and start requests processing. struct Worker { h: Rc>, + handler: StreamHandlerType, } impl Actor for Worker { type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + + } } impl StreamHandler> for Worker @@ -497,113 +404,96 @@ impl Handler> for Worker fn handle(&mut self, msg: IoStream, _: &mut Context) -> Response> { - let io = TcpStream::from_stream(msg.io, Arbiter::handle()) - .expect("failed to associate TCP stream"); - - Arbiter::handle().spawn( - HttpChannel::new(Rc::clone(&self.h), io, msg.peer, msg.http2)); + self.handler.handle(Rc::clone(&self.h), msg); Self::empty() } } -/// Tls http workers -/// -/// Worker accepts Socket objects via unbounded channel and start requests processing. -#[cfg(feature="tls")] -struct TlsWorker { - h: Rc>, - acceptor: TlsAcceptor, +#[derive(Clone)] +enum StreamHandlerType { + Normal, + #[cfg(feature="tls")] + Tls(TlsAcceptor), + #[cfg(feature="alpn")] + Alpn(SslAcceptor), } -#[cfg(feature="tls")] -impl Actor for TlsWorker { - type Context = Context; -} +impl StreamHandlerType { + fn handle(&mut self, h: Rc>, msg: IoStream) { + match *self { + StreamHandlerType::Normal => { + let io = TcpStream::from_stream(msg.io, Arbiter::handle()) + .expect("failed to associate TCP stream"); -#[cfg(feature="tls")] -impl StreamHandler> for TlsWorker - where H: HttpHandler + 'static {} + Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); + } + #[cfg(feature="tls")] + StreamHandlerType::Tls(ref acceptor) => { + let IoStream { io, peer, http2 } = msg; + let io = TcpStream::from_stream(io, Arbiter::handle()) + .expect("failed to associate TCP stream"); -#[cfg(feature="tls")] -impl Handler> for TlsWorker - where H: HttpHandler + 'static, -{ - fn handle(&mut self, msg: IoStream, _: &mut Context) - -> Response> - { - let IoStream { io, peer, http2 } = msg; - let io = TcpStream::from_stream(io, Arbiter::handle()) - .expect("failed to associate TCP stream"); - - let h = Rc::clone(&self.h); - - Arbiter::handle().spawn( - TlsAcceptorExt::accept_async(&self.acceptor, io).then(move |res| { - match res { - Ok(io) => Arbiter::handle().spawn( - HttpChannel::new(h, io, peer, http2)), - Err(err) => - trace!("Error during handling tls connection: {}", err), - }; - future::result(Ok(())) - }) - ); - - Self::empty() - } -} - -/// Tls http workers with alpn support -/// -/// Worker accepts Socket objects via unbounded channel and start requests processing. -#[cfg(feature="alpn")] -struct AlpnWorker { - h: Rc>, - acceptor: SslAcceptor, -} - -#[cfg(feature="alpn")] -impl Actor for AlpnWorker { - type Context = Context; -} - -#[cfg(feature="alpn")] -impl StreamHandler> for AlpnWorker - where H: HttpHandler + 'static {} - -#[cfg(feature="alpn")] -impl Handler> for AlpnWorker - where H: HttpHandler + 'static, -{ - fn handle(&mut self, msg: IoStream, _: &mut Context) - -> Response> - { - let IoStream { io, peer, .. } = msg; - let io = TcpStream::from_stream(io, Arbiter::handle()) - .expect("failed to associate TCP stream"); - - let h = Rc::clone(&self.h); - - Arbiter::handle().spawn( - SslAcceptorExt::accept_async(&self.acceptor, io).then(move |res| { - match res { - Ok(io) => { - let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol() - { - p.len() == 2 && &p == b"h2" - } else { - false + Arbiter::handle().spawn( + TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { + match res { + Ok(io) => Arbiter::handle().spawn( + HttpChannel::new(h, io, peer, http2)), + Err(err) => + trace!("Error during handling tls connection: {}", err), }; - Arbiter::handle().spawn( - HttpChannel::new(h, io, peer, http2)); - }, - Err(err) => - trace!("Error during handling tls connection: {}", err), - }; - future::result(Ok(())) - }) - ); + future::result(Ok(())) + }) + ); + } + #[cfg(feature="alpn")] + StreamHandlerType::Alpn(ref acceptor) => { + let IoStream { io, peer, .. } = msg; + let io = TcpStream::from_stream(io, Arbiter::handle()) + .expect("failed to associate TCP stream"); - Self::empty() + Arbiter::handle().spawn( + SslAcceptorExt::accept_async(acceptor, io).then(move |res| { + match res { + Ok(io) => { + let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol() + { + p.len() == 2 && &p == b"h2" + } else { + false + }; + Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2)); + }, + Err(err) => + trace!("Error during handling tls connection: {}", err), + }; + future::result(Ok(())) + }) + ); + } + } } } + +fn start_accept_thread(sock: Socket, addr: net::SocketAddr, + workers: Vec>>) { + // start acceptors thread + let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { + let mut next = 0; + loop { + match sock.accept() { + Ok((socket, addr)) => { + let addr = if let Some(addr) = addr.as_inet() { + net::SocketAddr::V4(addr) + } else { + net::SocketAddr::V6(addr.as_inet6().unwrap()) + }; + let msg = IoStream{ + io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; + workers[next].unbounded_send(msg).expect("worker thread died"); + next = (next + 1) % workers.len(); + } + Err(err) => error!("Error accepting connection: {:?}", err), + } + } + }); +}