diff --git a/Cargo.toml b/Cargo.toml index e8f70fd45..265e0a333 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ bytes = "0.4" byteorder = "1" futures = "0.1" futures-cpupool = "0.1" +slab = "0.4" tokio-io = "0.1" tokio-core = "0.1" diff --git a/MIGRATION.md b/MIGRATION.md index 63c4989e3..4e8cad369 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -3,6 +3,9 @@ * `ws::Message::Close` now includes optional close reason. `ws::CloseCode::Status` and `ws::CloseCode::Empty` have been removed. +* `HttpServer::start_ssl()` and `HttpServer::start_tls()` deprecated. + Use `HttpServer::bind_ssl()` and `HttpServer::bind_tls()` instead. + ## Migration from 0.4 to 0.5 diff --git a/src/lib.rs b/src/lib.rs index 5e5d349a4..14e2cfc4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,6 +97,7 @@ extern crate mime_guess; extern crate mio; extern crate net2; extern crate rand; +extern crate slab; extern crate tokio_core; extern crate tokio_io; extern crate url; diff --git a/src/scope.rs b/src/scope.rs index 46eb9e97c..6f730e91a 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -71,7 +71,7 @@ impl Scope { /// /// ```rust /// # extern crate actix_web; - /// use actix_web::{http, App, HttpRequest, HttpResponse, Path}; + /// use actix_web::{App, HttpRequest}; /// /// struct AppState; /// diff --git a/src/server/srv.rs b/src/server/srv.rs index 57699b203..4ba263e7c 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -10,6 +10,7 @@ use futures::{Future, Sink, Stream}; use mio; use net2::TcpBuilder; use num_cpus; +use slab::Slab; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "tls")] @@ -20,7 +21,7 @@ use openssl::ssl::{AlpnError, SslAcceptorBuilder}; use super::channel::{HttpChannel, WrapperStream}; use super::settings::{ServerSettings, WorkerSettings}; -use super::worker::{Conn, StopWorker, StreamHandlerType, Worker}; +use super::worker::{Conn, SocketInfo, StopWorker, StreamHandlerType, Worker}; use super::{IntoHttpHandler, IoStream, KeepAlive}; use super::{PauseServer, ResumeServer, StopServer}; @@ -37,7 +38,7 @@ where factory: Arc Vec + Send + Sync>, #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] workers: Vec<(usize, Addr>)>, - sockets: Vec<(net::SocketAddr, net::TcpListener)>, + sockets: Vec, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, shutdown_timeout: u16, @@ -57,14 +58,8 @@ where { } -#[derive(Clone)] -struct Info { - addr: net::SocketAddr, - handler: StreamHandlerType, -} - enum ServerCommand { - WorkerDied(usize, Info), + WorkerDied(usize, Slab), } impl Actor for HttpServer @@ -74,6 +69,12 @@ where type Context = Context; } +struct Socket { + lst: net::TcpListener, + addr: net::SocketAddr, + tp: StreamHandlerType, +} + impl HttpServer where H: IntoHttpHandler + 'static, @@ -187,7 +188,7 @@ where /// Get addresses of bound sockets. pub fn addrs(&self) -> Vec { - self.sockets.iter().map(|s| s.0).collect() + self.sockets.iter().map(|s| s.addr).collect() } /// Use listener for accepting incoming connection requests @@ -195,21 +196,29 @@ where /// HttpServer does not change any configuration for TcpListener, /// it needs to be configured before passing it to listen() method. pub fn listen(mut self, lst: net::TcpListener) -> Self { - self.sockets.push((lst.local_addr().unwrap(), lst)); + let addr = lst.local_addr().unwrap(); + self.sockets.push(Socket { + addr, + lst, + tp: StreamHandlerType::Normal, + }); self } - /// The socket address to bind - /// - /// To mind multiple addresses this method can be call multiple times. - pub fn bind(mut self, addr: S) -> io::Result { + fn bind2(&mut self, addr: S) -> io::Result> { let mut err = None; let mut succ = false; + let mut sockets = Vec::new(); for addr in addr.to_socket_addrs()? { match create_tcp_listener(addr, self.backlog) { Ok(lst) => { succ = true; - self.sockets.push((lst.local_addr().unwrap(), lst)); + let addr = lst.local_addr().unwrap(); + sockets.push(Socket { + lst, + addr, + tp: StreamHandlerType::Normal, + }); } Err(e) => err = Some(e), } @@ -225,12 +234,65 @@ where )) } } else { - Ok(self) + Ok(sockets) } } + /// The socket address to bind + /// + /// To mind multiple addresses this method can be call multiple times. + pub fn bind(mut self, addr: S) -> io::Result { + let sockets = self.bind2(addr)?; + self.sockets.extend(sockets); + Ok(self) + } + + #[cfg(feature = "tls")] + /// The ssl socket address to bind + /// + /// To mind multiple addresses this method can be call multiple times. + pub fn bind_tls( + mut self, addr: S, acceptor: TlsAcceptor, + ) -> io::Result { + let sockets = self.bind2(addr)?; + self.sockets.extend(sockets.into_iter().map(|mut s| { + s.tp = StreamHandlerType::Tls(acceptor.clone()); + s + })); + Ok(self) + } + + #[cfg(feature = "alpn")] + /// Start listening for incoming tls connections. + /// + /// This method sets alpn protocols to "h2" and "http/1.1" + pub fn bind_ssl( + mut self, addr: S, mut builder: SslAcceptorBuilder, + ) -> io::Result { + // alpn support + if !self.no_http2 { + builder.set_alpn_protos(b"\x02h2\x08http/1.1")?; + builder.set_alpn_select_callback(|_, protos| { + const H2: &[u8] = b"\x02h2"; + if protos.windows(3).any(|window| window == H2) { + Ok(b"h2") + } else { + Err(AlpnError::NOACK) + } + }); + } + + let acceptor = builder.build(); + let sockets = self.bind2(addr)?; + self.sockets.extend(sockets.into_iter().map(|mut s| { + s.tp = StreamHandlerType::Alpn(acceptor.clone()); + s + })); + Ok(self) + } + fn start_workers( - &mut self, settings: &ServerSettings, handler: &StreamHandlerType, + &mut self, settings: &ServerSettings, sockets: &Slab, ) -> Vec<(usize, mpsc::UnboundedSender>)> { // start workers let mut workers = Vec::new(); @@ -238,8 +300,8 @@ where let s = settings.clone(); let (tx, rx) = mpsc::unbounded::>(); - let h = handler.clone(); let ka = self.keep_alive; + let socks = sockets.clone(); let factory = Arc::clone(&self.factory); let addr = Arbiter::start(move |ctx: &mut Context<_>| { let apps: Vec<_> = (*factory)() @@ -247,7 +309,7 @@ where .map(|h| h.into_handler(s.clone())) .collect(); ctx.add_message_stream(rx); - Worker::new(apps, h, ka) + Worker::new(apps, socks, ka) }); workers.push((idx, tx)); self.workers.push((idx, addr)); @@ -304,24 +366,32 @@ impl HttpServer { panic!("HttpServer::bind() has to be called before start()"); } else { let (tx, rx) = mpsc::unbounded(); - let addrs: Vec<(net::SocketAddr, net::TcpListener)> = - self.sockets.drain(..).collect(); - let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); - let workers = self.start_workers(&settings, &StreamHandlerType::Normal); - let info = Info { - addr: addrs[0].0, - handler: StreamHandlerType::Normal, - }; + + let mut socks = Slab::new(); + let mut addrs: Vec<(usize, Socket)> = Vec::new(); + + for socket in self.sockets.drain(..) { + let entry = socks.vacant_entry(); + let token = entry.key(); + entry.insert(SocketInfo { + addr: socket.addr, + htype: socket.tp.clone(), + }); + addrs.push((token, socket)); + } + + let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false); + let workers = self.start_workers(&settings, &socks); // start acceptors threads - for (addr, sock) in addrs { - info!("Starting server on http://{}", addr); + for (token, sock) in addrs { + info!("Starting server on http://{}", sock.addr); self.accept.push(start_accept_thread( + token, sock, - addr, self.backlog, tx.clone(), - info.clone(), + socks.clone(), workers.clone(), )); } @@ -373,55 +443,30 @@ impl HttpServer { } } +#[doc(hidden)] #[cfg(feature = "tls")] +#[deprecated( + since = "0.6.0", note = "please use `actix_web::HttpServer::bind_tls` instead" +)] impl HttpServer { /// Start listening for incoming tls connections. pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { - if self.sockets.is_empty() { - Err(io::Error::new( - io::ErrorKind::Other, - "No socket addresses are bound", - )) - } else { - let (tx, rx) = mpsc::unbounded(); - let addrs: Vec<(net::SocketAddr, net::TcpListener)> = - self.sockets.drain(..).collect(); - let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); - let workers = - self.start_workers(&settings, &StreamHandlerType::Tls(acceptor.clone())); - let info = Info { - addr: addrs[0].0, - handler: StreamHandlerType::Tls(acceptor), - }; - - // start acceptors threads - for (addr, sock) in addrs { - info!("Starting server on https://{}", addr); - self.accept.push(start_accept_thread( - sock, - addr, - self.backlog, - tx.clone(), - info.clone(), - workers.clone(), - )); + for sock in &mut self.sockets { + match sock.tp { + StreamHandlerType::Normal => (), + _ => continue, } - - // start http server actor - let signals = self.subscribe_to_signals(); - let addr: Addr = Actor::create(|ctx| { - ctx.add_stream(rx); - self - }); - signals.map(|signals| { - signals.do_send(signal::Subscribe(addr.clone().recipient())) - }); - Ok(addr) + sock.tp = StreamHandlerType::Tls(acceptor.clone()); } + Ok(self.start()) } } +#[doc(hidden)] #[cfg(feature = "alpn")] +#[deprecated( + since = "0.6.0", note = "please use `actix_web::HttpServer::bind_ssl` instead" +)] impl HttpServer { /// Start listening for incoming tls connections. /// @@ -429,63 +474,28 @@ impl HttpServer { pub fn start_ssl( mut self, mut builder: SslAcceptorBuilder, ) -> io::Result> { - if self.sockets.is_empty() { - Err(io::Error::new( - io::ErrorKind::Other, - "No socket addresses are bound", - )) - } else { - // alpn support - if !self.no_http2 { - builder.set_alpn_protos(b"\x02h2\x08http/1.1")?; - builder.set_alpn_select_callback(|_, protos| { - const H2: &[u8] = b"\x02h2"; - if protos.windows(3).any(|window| window == H2) { - Ok(b"h2") - } else { - Err(AlpnError::NOACK) - } - }); - } - - let (tx, rx) = mpsc::unbounded(); - let acceptor = builder.build(); - let addrs: Vec<(net::SocketAddr, net::TcpListener)> = - self.sockets.drain(..).collect(); - let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); - let workers = self.start_workers( - &settings, - &StreamHandlerType::Alpn(acceptor.clone()), - ); - let info = Info { - addr: addrs[0].0, - handler: StreamHandlerType::Alpn(acceptor), - }; - - // start acceptors threads - for (addr, sock) in addrs { - info!("Starting server on https://{}", addr); - self.accept.push(start_accept_thread( - sock, - addr, - self.backlog, - tx.clone(), - info.clone(), - workers.clone(), - )); - } - - // start http server actor - let signals = self.subscribe_to_signals(); - let addr: Addr = Actor::create(|ctx| { - ctx.add_stream(rx); - self + // alpn support + if !self.no_http2 { + builder.set_alpn_protos(b"\x02h2\x08http/1.1")?; + builder.set_alpn_select_callback(|_, protos| { + const H2: &[u8] = b"\x02h2"; + if protos.windows(3).any(|window| window == H2) { + Ok(b"h2") + } else { + Err(AlpnError::NOACK) + } }); - signals.map(|signals| { - signals.do_send(signal::Subscribe(addr.clone().recipient())) - }); - Ok(addr) } + + let acceptor = builder.build(); + for sock in &mut self.sockets { + match sock.tp { + StreamHandlerType::Normal => (), + _ => continue, + } + sock.tp = StreamHandlerType::Alpn(acceptor.clone()); + } + Ok(self.start()) } } @@ -499,32 +509,6 @@ impl HttpServer { T: AsyncRead + AsyncWrite + 'static, A: 'static, { - let (tx, rx) = mpsc::unbounded(); - - if !self.sockets.is_empty() { - let addrs: Vec<(net::SocketAddr, net::TcpListener)> = - self.sockets.drain(..).collect(); - let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); - let workers = self.start_workers(&settings, &StreamHandlerType::Normal); - let info = Info { - addr: addrs[0].0, - handler: StreamHandlerType::Normal, - }; - - // start acceptors threads - for (addr, sock) in addrs { - info!("Starting server on http://{}", addr); - self.accept.push(start_accept_thread( - sock, - addr, - self.backlog, - tx.clone(), - info.clone(), - workers.clone(), - )); - } - } - // set server settings let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let settings = ServerSettings::new(Some(addr), &self.host, secure); @@ -537,9 +521,9 @@ impl HttpServer { // start server let signals = self.subscribe_to_signals(); let addr: Addr = HttpServer::create(move |ctx| { - ctx.add_stream(rx); ctx.add_message_stream(stream.map_err(|_| ()).map(move |(t, _)| Conn { io: WrapperStream::new(t), + token: 0, peer: None, http2: false, })); @@ -585,7 +569,7 @@ impl StreamHandler for HttpServer { fn finished(&mut self, _: &mut Context) {} fn handle(&mut self, msg: ServerCommand, _: &mut Context) { match msg { - ServerCommand::WorkerDied(idx, info) => { + ServerCommand::WorkerDied(idx, socks) => { let mut found = false; for i in 0..self.workers.len() { if self.workers[i].0 == idx { @@ -610,11 +594,10 @@ impl StreamHandler for HttpServer { break; } - let h = info.handler; let ka = self.keep_alive; let factory = Arc::clone(&self.factory); let settings = - ServerSettings::new(Some(info.addr), &self.host, false); + ServerSettings::new(Some(socks[0].addr), &self.host, false); let addr = Arbiter::start(move |ctx: &mut Context<_>| { let apps: Vec<_> = (*factory)() @@ -622,7 +605,7 @@ impl StreamHandler for HttpServer { .map(|h| h.into_handler(settings.clone())) .collect(); ctx.add_message_stream(rx); - Worker::new(apps, h, ka) + Worker::new(apps, socks, ka) }); for item in &self.accept { let _ = item.1.send(Command::Worker(new_idx, tx.clone())); @@ -738,8 +721,8 @@ enum Command { } fn start_accept_thread( - sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, - srv: mpsc::UnboundedSender, info: Info, + token: usize, sock: Socket, backlog: i32, srv: mpsc::UnboundedSender, + socks: Slab, mut workers: Vec<(usize, mpsc::UnboundedSender>)>, ) -> (mio::SetReadiness, sync_mpsc::Sender) { let (tx, rx) = sync_mpsc::channel(); @@ -748,13 +731,14 @@ fn start_accept_thread( // start accept thread #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] let _ = thread::Builder::new() - .name(format!("Accept on {}", addr)) + .name(format!("Accept on {}", sock.addr)) .spawn(move || { const SRV: mio::Token = mio::Token(0); const CMD: mio::Token = mio::Token(1); + let addr = sock.addr; let mut server = Some( - mio::net::TcpListener::from_std(sock) + mio::net::TcpListener::from_std(sock.lst) .expect("Can not create mio::net::TcpListener"), ); @@ -800,9 +784,10 @@ fn start_accept_thread( SRV => if let Some(ref server) = server { loop { match server.accept_std() { - Ok((sock, addr)) => { + Ok((io, addr)) => { let mut msg = Conn { - io: sock, + io, + token, peer: Some(addr), http2: false, }; @@ -813,7 +798,7 @@ fn start_accept_thread( let _ = srv.unbounded_send( ServerCommand::WorkerDied( workers[next].0, - info.clone(), + socks.clone(), ), ); msg = err.into_inner(); diff --git a/src/server/worker.rs b/src/server/worker.rs index f10f79cb4..67f4645c0 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -1,6 +1,7 @@ use futures::unsync::oneshot; use futures::Future; use net2::TcpStreamExt; +use slab::Slab; use std::rc::Rc; use std::{net, time}; use tokio_core::net::TcpStream; @@ -29,10 +30,17 @@ use server::{HttpHandler, KeepAlive}; #[derive(Message)] pub(crate) struct Conn { pub io: T, + pub token: usize, pub peer: Option, pub http2: bool, } +#[derive(Clone)] +pub(crate) struct SocketInfo { + pub addr: net::SocketAddr, + pub htype: StreamHandlerType, +} + /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. pub(crate) struct StopWorker { @@ -53,13 +61,13 @@ where { settings: Rc>, hnd: Handle, - handler: StreamHandlerType, + socks: Slab, tcp_ka: Option, } impl Worker { pub(crate) fn new( - h: Vec, handler: StreamHandlerType, keep_alive: KeepAlive, + h: Vec, socks: Slab, keep_alive: KeepAlive, ) -> Worker { let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { Some(time::Duration::new(val as u64, 0)) @@ -70,7 +78,7 @@ impl Worker { Worker { settings: Rc::new(WorkerSettings::new(h, keep_alive)), hnd: Arbiter::handle().clone(), - handler, + socks, tcp_ka, } } @@ -124,8 +132,11 @@ where if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() { error!("Can not set socket keep-alive option"); } - self.handler - .handle(Rc::clone(&self.settings), &self.hnd, msg); + self.socks.get_mut(msg.token).unwrap().htype.handle( + Rc::clone(&self.settings), + &self.hnd, + msg, + ); } } @@ -177,7 +188,9 @@ impl StreamHandlerType { } #[cfg(feature = "tls")] StreamHandlerType::Tls(ref acceptor) => { - let Conn { io, peer, http2 } = msg; + let Conn { + io, peer, http2, .. + } = msg; let _ = io.set_nodelay(true); let io = TcpStream::from_stream(io, hnd) .expect("failed to associate TCP stream");