From ab6efd2421e82e8edfb82f108c61ae996bd4510f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 12 Dec 2017 17:21:00 -0800 Subject: [PATCH] handle http connections in different threads --- Cargo.toml | 2 + examples/tls/src/main.rs | 4 +- guide/src/qs_3.md | 2 +- src/lib.rs | 2 + src/server.rs | 497 ++++++++++++++++++++++++++++++--------- tests/test_server.rs | 4 +- 6 files changed, 394 insertions(+), 117 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7af676c5b..014f7701b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ regex = "0.2" sha1 = "0.2" url = "1.5" libc = "0.2" +socket2 = "0.2" serde = "1.0" serde_json = "1.0" flate2 = "0.2" @@ -53,6 +54,7 @@ brotli2 = "^0.3.2" percent-encoding = "1.0" smallvec = "0.6" bitflags = "1.0" +num_cpus = "1.0" cookie = { version="0.10", features=["percent-encode", "secure"] } # tokio diff --git a/examples/tls/src/main.rs b/examples/tls/src/main.rs index d40719e56..78720c0c9 100644 --- a/examples/tls/src/main.rs +++ b/examples/tls/src/main.rs @@ -30,7 +30,7 @@ fn main() { let pkcs12 = Pkcs12::from_der(&pkcs12).unwrap().parse("12345").unwrap(); HttpServer::new( - Application::new("/") + || Application::new() // enable logger .middleware(middlewares::Logger::default()) // register simple handler, handle all methods @@ -42,7 +42,7 @@ fn main() { .header("LOCATION", "/index.html") .body(Body::Empty) }))) - .serve_tls::<_, ()>("127.0.0.1:8443", pkcs12).unwrap(); + .serve_tls::<_, ()>("127.0.0.1:8443", &pkcs12).unwrap(); println!("Started http server: 127.0.0.1:8443"); let _ = sys.run(); diff --git a/guide/src/qs_3.md b/guide/src/qs_3.md index b6a83cf1a..51e82d493 100644 --- a/guide/src/qs_3.md +++ b/guide/src/qs_3.md @@ -42,7 +42,7 @@ Multiple applications could be served with one server: use actix_web::*; fn main() { - HttpServer::::new(|| vec![ + HttpServer::::new(|| vec![ Application::new() .prefix("/app1") .resource("/", |r| r.f(|r| httpcodes::HTTPOk)), diff --git a/src/lib.rs b/src/lib.rs index 8d1ba6bec..3fb4c8a59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,8 @@ extern crate flate2; extern crate brotli2; extern crate percent_encoding; extern crate smallvec; +extern crate num_cpus; +extern crate socket2; extern crate actix; extern crate h2 as http2; diff --git a/src/server.rs b/src/server.rs index a5862c34e..5d03d4f9f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,24 +1,27 @@ -use std::{io, net}; +use std::{io, net, thread}; use std::rc::Rc; -use std::net::SocketAddr; +use std::sync::Arc; use std::marker::PhantomData; use actix::dev::*; use futures::Stream; +use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_core::net::{TcpListener, TcpStream}; +use tokio_core::net::TcpStream; +use num_cpus; +use socket2::{Socket, Domain, Type}; #[cfg(feature="tls")] -use futures::Future; +use futures::{future, Future}; #[cfg(feature="tls")] use native_tls::TlsAcceptor; #[cfg(feature="tls")] use tokio_tls::{TlsStream, TlsAcceptorExt}; #[cfg(feature="alpn")] -use futures::Future; +use futures::{future, Future}; #[cfg(feature="alpn")] -use openssl::ssl::{SslMethod, SslAcceptorBuilder}; +use openssl::ssl::{SslMethod, SslAcceptor, SslAcceptorBuilder}; #[cfg(feature="alpn")] use openssl::pkcs12::ParsedPkcs12; #[cfg(feature="alpn")] @@ -29,7 +32,7 @@ use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; /// Various server settings #[derive(Debug, Clone)] pub struct ServerSettings { - addr: Option, + addr: Option, secure: bool, host: String, } @@ -46,7 +49,7 @@ impl Default for ServerSettings { impl ServerSettings { /// Crate server settings instance - fn new(addr: Option, secure: bool) -> Self { + fn new(addr: Option, secure: bool) -> Self { let host = if let Some(ref addr) = addr { format!("{}", addr) } else { @@ -60,7 +63,7 @@ impl ServerSettings { } /// Returns the socket address of the local half of this TCP connection - pub fn local_addr(&self) -> Option { + pub fn local_addr(&self) -> Option { self.addr } @@ -82,47 +85,70 @@ impl ServerSettings { /// `A` - peer address /// /// `H` - request handler -pub struct HttpServer { +pub struct HttpServer + where H: 'static +{ h: Rc>, io: PhantomData, addr: PhantomData, + threads: usize, + factory: Arc U + Send + Sync>, + workers: Vec>>, } -impl Actor for HttpServer { +impl Actor for HttpServer { type Context = Context; } -impl HttpServer where H: HttpHandler +impl HttpServer + where H: HttpHandler, + U: IntoIterator + 'static, + V: IntoHttpHandler, { /// Create new http server with vec of http handlers - pub fn new>(factory: F) -> Self - where F: Fn() -> U + Send, - V: IntoHttpHandler + pub fn new(factory: F) -> Self + where F: Sync + Send + 'static + Fn() -> U, { - let apps: Vec<_> = factory().into_iter().map(|h| h.into_handler()).collect(); - - HttpServer{ h: Rc::new(apps), + HttpServer{ h: Rc::new(Vec::new()), io: PhantomData, - addr: PhantomData } + addr: PhantomData, + threads: num_cpus::get(), + factory: Arc::new(factory), + workers: Vec::new(), + } + } + + /// Set number of workers to start. + /// + /// By default http server uses number of available logical cpu as threads count. + pub fn threads(mut self, num: usize) -> Self { + self.threads = num; + self } } -impl HttpServer +impl HttpServer where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: HttpHandler, + U: IntoIterator + 'static, + V: IntoHttpHandler, { - /// Start listening for incomming connections from stream. + /// Start listening for incomming connections from a stream. + /// + /// This method uses only one thread for handling incoming connections. pub fn serve_incoming(mut self, stream: S, secure: bool) -> io::Result where Self: ActorAddress, S: Stream + 'static { // set server settings - let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let settings = ServerSettings::new(Some(addr), secure); - for h in Rc::get_mut(&mut self.h).unwrap().iter_mut() { - h.server_settings(settings.clone()); + let mut apps: Vec<_> = (*self.factory)().into_iter().map(|h| h.into_handler()).collect(); + for app in &mut apps { + app.server_settings(settings.clone()); } + self.h = Rc::new(apps); // start server Ok(HttpServer::create(move |ctx| { @@ -133,134 +159,226 @@ impl HttpServer } fn bind(&self, addr: S) - -> io::Result> + -> io::Result> { let mut err = None; - let mut addrs = Vec::new(); + let mut sockets = Vec::new(); if let Ok(iter) = addr.to_socket_addrs() { for addr in iter { - match TcpListener::bind(&addr, Arbiter::handle()) { - Ok(tcp) => addrs.push((addr, tcp)), - Err(e) => err = Some(e), + match addr { + net::SocketAddr::V4(a) => { + let socket = Socket::new(Domain::ipv4(), Type::stream(), None)?; + match socket.bind(&a.into()) { + Ok(_) => { + socket.listen(1024) + .expect("failed to set socket backlog"); + socket.set_reuse_address(true) + .expect("failed to set socket reuse address"); + sockets.push((addr, socket)); + }, + Err(e) => err = Some(e), + } + } + net::SocketAddr::V6(a) => { + let socket = Socket::new(Domain::ipv6(), Type::stream(), None)?; + match socket.bind(&a.into()) { + Ok(_) => { + socket.listen(1024) + .expect("failed to set socket backlog"); + socket.set_reuse_address(true) + .expect("failed to set socket reuse address"); + sockets.push((addr, socket)) + } + Err(e) => err = Some(e), + } + } } } } - if addrs.is_empty() { + + if sockets.is_empty() { if let Some(e) = err.take() { Err(e) } else { Err(io::Error::new(io::ErrorKind::Other, "Can not bind to address.")) } } else { - Ok(addrs) + Ok(sockets) } } + + fn start_workers(&mut self, settings: &ServerSettings) + -> Vec>> + { + // start workers + let mut workers = Vec::new(); + for _ in 0..self.threads { + let s = settings.clone(); + let (tx, rx) = mpsc::unbounded::>(); + + let factory = Arc::clone(&self.factory); + let addr = Arbiter::start(move |ctx: &mut Context<_>| { + let mut apps: Vec<_> = (*factory)() + .into_iter().map(|h| h.into_handler()).collect(); + for app in &mut apps { + app.server_settings(s.clone()); + } + ctx.add_stream(rx); + Worker{h: Rc::new(apps)} + }); + workers.push(tx); + self.workers.push(addr); + } + info!("Starting {} http workers", self.threads); + workers + } } -impl HttpServer { - +impl HttpServer + where U: IntoIterator + 'static, + V: IntoHttpHandler, +{ /// Start listening for incomming connections. /// /// This methods converts address to list of `SocketAddr` /// then binds to all available addresses. + /// It also starts number of http handler workers in seperate threads. + /// For each address this method starts separate thread which does `accept()` in a loop. pub fn serve(mut self, addr: S) -> io::Result where Self: ActorAddress, S: net::ToSocketAddrs, { let addrs = self.bind(addr)?; - - // set server settings let settings = ServerSettings::new(Some(addrs[0].0), false); - for h in Rc::get_mut(&mut self.h).unwrap().iter_mut() { - h.server_settings(settings.clone()); + let workers = self.start_workers(&settings); + + // start acceptors threads + for (addr, sock) in addrs { + let wrks = workers.clone(); + let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { + let mut next = 0; + loop { + match sock.accept() { + Ok((socket, addr)) => { + let addr = if let Some(addr) = addr.as_inet() { + net::SocketAddr::V4(addr) + } else { + net::SocketAddr::V6(addr.as_inet6().unwrap()) + }; + let msg = IoStream{ + io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; + wrks[next].unbounded_send(msg).expect("worker thread died"); + next = (next + 1) % wrks.len(); + } + Err(err) => error!("Error accepting connection: {:?}", err), + } + } + }); + info!("Starting http server on {}", addr); } - // start server - Ok(HttpServer::create(move |ctx| { - for (addr, tcp) in addrs { - info!("Starting http server on {}", addr); - - ctx.add_stream(tcp.incoming().map( - move |(t, a)| IoStream{io: t, peer: Some(a), http2: false})); - } - self - })) + // start http server actor + Ok(HttpServer::create(|_| {self})) } } #[cfg(feature="tls")] -impl HttpServer, net::SocketAddr, H> { - +impl HttpServer, net::SocketAddr, H, U> + where U: IntoIterator + 'static, + V: IntoHttpHandler, +{ /// Start listening for incomming tls connections. /// /// This methods converts address to list of `SocketAddr` /// then binds to all available addresses. - pub fn serve_tls(mut self, addr: S, pkcs12: ::Pkcs12) -> io::Result + pub fn serve_tls(self, addr: S, pkcs12: ::Pkcs12) -> io::Result where Self: ActorAddress, S: net::ToSocketAddrs, { let addrs = self.bind(addr)?; - - // set server settings - let settings = ServerSettings::new(Some(addrs[0].0), true); - for h in Rc::get_mut(&mut self.h).unwrap().iter_mut() { - h.server_settings(settings.clone()); - } - + let settings = ServerSettings::new(Some(addrs[0].0), false); let acceptor = match TlsAcceptor::builder(pkcs12) { Ok(builder) => { match builder.build() { - Ok(acceptor) => Rc::new(acceptor), + Ok(acceptor) => acceptor, Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) } } Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) }; - // start server - Ok(HttpServer::create(move |ctx| { - for (srv, tcp) in addrs { - info!("Starting tls http server on {}", srv); + // start workers + let mut workers = Vec::new(); + for _ in 0..self.threads { + let s = settings.clone(); + let (tx, rx) = mpsc::unbounded::>(); - let acc = acceptor.clone(); - ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| { - TlsAcceptorExt::accept_async(acc.as_ref(), stream) - .map(move |t| IoStream{io: t, peer: Some(addr), http2: false}) - .map_err(|err| { - trace!("Error during handling tls connection: {}", err); - io::Error::new(io::ErrorKind::Other, err) - }) - })); - } - self - })) + let acc = acceptor.clone(); + let factory = Arc::clone(&self.factory); + let _addr = Arbiter::start(move |ctx: &mut Context<_>| { + let mut apps: Vec<_> = (*factory)() + .into_iter().map(|h| h.into_handler()).collect(); + for app in &mut apps { + app.server_settings(s.clone()); + } + ctx.add_stream(rx); + TlsWorker{h: Rc::new(apps), acceptor: acc} + }); + workers.push(tx); + // self.workers.push(addr); + } + info!("Starting {} http workers", self.threads); + + // start acceptors threads + for (addr, sock) in addrs { + let wrks = workers.clone(); + let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { + let mut next = 0; + loop { + match sock.accept() { + Ok((socket, addr)) => { + let addr = if let Some(addr) = addr.as_inet() { + net::SocketAddr::V4(addr) + } else { + net::SocketAddr::V6(addr.as_inet6().unwrap()) + }; + let msg = IoStream{ + io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; + wrks[next].unbounded_send(msg).expect("worker thread died"); + next = (next + 1) % wrks.len(); + } + Err(err) => error!("Error accepting connection: {:?}", err), + } + } + }); + info!("Starting tls http server on {}", addr); + } + + // start http server actor + Ok(HttpServer::create(|_| {self})) } } #[cfg(feature="alpn")] -impl HttpServer, net::SocketAddr, H> { - +impl HttpServer, net::SocketAddr, H, U> + where U: IntoIterator + 'static, + V: IntoHttpHandler, +{ /// Start listening for incomming tls connections. /// /// This methods converts address to list of `SocketAddr` /// then binds to all available addresses. - pub fn serve_tls(mut self, addr: S, identity: ParsedPkcs12) -> io::Result + pub fn serve_tls(self, addr: S, identity: &ParsedPkcs12) -> io::Result where Self: ActorAddress, S: net::ToSocketAddrs, { let addrs = self.bind(addr)?; - - // set server settings - let settings = ServerSettings::new(Some(addrs[0].0), true); - for h in Rc::get_mut(&mut self.h).unwrap().iter_mut() { - h.server_settings(settings.clone()); - } - + let settings = ServerSettings::new(Some(addrs[0].0), false); let acceptor = match SslAcceptorBuilder::mozilla_intermediate( SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain) { Ok(mut builder) => { - match builder.builder_mut().set_alpn_protocols(&[b"h2", b"http/1.1"]) { + match builder.set_alpn_protocols(&[b"h2", b"http/1.1"]) { Ok(_) => builder.build(), Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)), } @@ -268,55 +386,80 @@ impl HttpServer, net::SocketAddr, H> { Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) }; - Ok(HttpServer::create(move |ctx| { - for (srv, tcp) in addrs { - info!("Starting tls http server on {}", srv); + // start workers + let mut workers = Vec::new(); + for _ in 0..self.threads { + let s = settings.clone(); + let (tx, rx) = mpsc::unbounded::>(); - let acc = acceptor.clone(); - ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| { - SslAcceptorExt::accept_async(&acc, stream) - .map(move |stream| { - let http2 = if let Some(p) = - stream.get_ref().ssl().selected_alpn_protocol() - { - p.len() == 2 && &p == b"h2" + let acc = acceptor.clone(); + let factory = Arc::clone(&self.factory); + let _addr = Arbiter::start(move |ctx: &mut Context<_>| { + let mut apps: Vec<_> = (*factory)() + .into_iter().map(|h| h.into_handler()).collect(); + for app in &mut apps { + app.server_settings(s.clone()); + } + ctx.add_stream(rx); + AlpnWorker{h: Rc::new(apps), acceptor: acc} + }); + workers.push(tx); + // self.workers.push(addr); + } + info!("Starting {} http workers", self.threads); + + // start acceptors threads + for (addr, sock) in addrs { + let wrks = workers.clone(); + let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { + let mut next = 0; + loop { + match sock.accept() { + Ok((socket, addr)) => { + let addr = if let Some(addr) = addr.as_inet() { + net::SocketAddr::V4(addr) } else { - false + net::SocketAddr::V6(addr.as_inet6().unwrap()) }; - IoStream{io: stream, peer: Some(addr), http2: http2} - }) - .map_err(|err| { - trace!("Error during handling tls connection: {}", err); - io::Error::new(io::ErrorKind::Other, err) - }) - })); - } - self - })) + let msg = IoStream{ + io: socket.into_tcp_stream(), peer: Some(addr), http2: false}; + wrks[next].unbounded_send(msg).expect("worker thread died"); + next = (next + 1) % wrks.len(); + } + Err(err) => error!("Error accepting connection: {:?}", err), + } + } + }); + info!("Starting tls http server on {}", addr); + } + + // start http server actor + Ok(HttpServer::create(|_| {self})) } } struct IoStream { io: T, - peer: Option, + peer: Option, http2: bool, } impl ResponseType for IoStream - where T: AsyncRead + AsyncWrite + 'static { type Item = (); type Error = (); } -impl StreamHandler, io::Error> for HttpServer +impl StreamHandler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static, + U: 'static, A: 'static {} -impl Handler, io::Error> for HttpServer +impl Handler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static, + U: 'static, A: 'static, { fn error(&mut self, err: io::Error, _: &mut Context) { @@ -331,3 +474,135 @@ impl Handler, io::Error> for HttpServer Self::empty() } } + + +/// Http workers +/// +/// Worker accepts Socket objects via unbounded channel and start requests processing. +struct Worker { + h: Rc>, +} + +impl Actor for Worker { + type Context = Context; +} + +impl StreamHandler> for Worker + where H: HttpHandler + 'static {} + +impl Handler> for Worker + where H: HttpHandler + 'static, +{ + fn handle(&mut self, msg: IoStream, _: &mut Context) + -> Response> + { + let io = TcpStream::from_stream(msg.io, Arbiter::handle()) + .expect("failed to associate TCP stream"); + + Arbiter::handle().spawn( + HttpChannel::new(Rc::clone(&self.h), io, msg.peer, msg.http2)); + Self::empty() + } +} + +/// Tls http workers +/// +/// Worker accepts Socket objects via unbounded channel and start requests processing. +#[cfg(feature="tls")] +struct TlsWorker { + h: Rc>, + acceptor: TlsAcceptor, +} + +#[cfg(feature="tls")] +impl Actor for TlsWorker { + type Context = Context; +} + +#[cfg(feature="tls")] +impl StreamHandler> for TlsWorker + where H: HttpHandler + 'static {} + +#[cfg(feature="tls")] +impl Handler> for TlsWorker + where H: HttpHandler + 'static, +{ + fn handle(&mut self, msg: IoStream, _: &mut Context) + -> Response> + { + let IoStream { io, peer, http2 } = msg; + let io = TcpStream::from_stream(io, Arbiter::handle()) + .expect("failed to associate TCP stream"); + + let h = Rc::clone(&self.h); + + Arbiter::handle().spawn( + TlsAcceptorExt::accept_async(&self.acceptor, io).then(move |res| { + match res { + Ok(io) => Arbiter::handle().spawn( + HttpChannel::new(h, io, peer, http2)), + Err(err) => + trace!("Error during handling tls connection: {}", err), + }; + future::result(Ok(())) + }) + ); + + Self::empty() + } +} + +/// Tls http workers with alpn support +/// +/// Worker accepts Socket objects via unbounded channel and start requests processing. +#[cfg(feature="alpn")] +struct AlpnWorker { + h: Rc>, + acceptor: SslAcceptor, +} + +#[cfg(feature="alpn")] +impl Actor for AlpnWorker { + type Context = Context; +} + +#[cfg(feature="alpn")] +impl StreamHandler> for AlpnWorker + where H: HttpHandler + 'static {} + +#[cfg(feature="alpn")] +impl Handler> for AlpnWorker + where H: HttpHandler + 'static, +{ + fn handle(&mut self, msg: IoStream, _: &mut Context) + -> Response> + { + let IoStream { io, peer, .. } = msg; + let io = TcpStream::from_stream(io, Arbiter::handle()) + .expect("failed to associate TCP stream"); + + let h = Rc::clone(&self.h); + + Arbiter::handle().spawn( + SslAcceptorExt::accept_async(&self.acceptor, io).then(move |res| { + match res { + Ok(io) => { + let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol() + { + p.len() == 2 && &p == b"h2" + } else { + false + }; + Arbiter::handle().spawn( + HttpChannel::new(h, io, peer, http2)); + }, + Err(err) => + trace!("Error during handling tls connection: {}", err), + }; + future::result(Ok(())) + }) + ); + + Self::empty() + } +} diff --git a/tests/test_server.rs b/tests/test_server.rs index 4a657bccd..b3b58b3b7 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -4,7 +4,6 @@ extern crate tokio_core; extern crate reqwest; use std::{net, thread}; -use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_core::net::TcpListener; @@ -52,7 +51,6 @@ struct MiddlewareTest { start: Arc, response: Arc, finish: Arc, - test: Rc, } impl middlewares::Middleware for MiddlewareTest { @@ -89,7 +87,7 @@ fn test_middlewares() { move || vec![Application::new() .middleware(MiddlewareTest{start: act_num1.clone(), response: act_num2.clone(), - finish: act_num3.clone(), test: Rc::new(1)}) + finish: act_num3.clone()}) .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]) .serve::<_, ()>("127.0.0.1:58904").unwrap(); sys.run();