diff --git a/Cargo.toml b/Cargo.toml index e6190d3f1..43c286f71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,6 @@ regex = "0.2" sha1 = "0.2" url = "1.5" libc = "0.2" -socket2 = "0.2" serde = "1.0" serde_json = "1.0" flate2 = "0.2" @@ -57,7 +56,9 @@ bitflags = "1.0" num_cpus = "1.0" cookie = { version="0.10", features=["percent-encode", "secure"] } -# tokio +# io +mio = "0.6" +net2 = "0.2" bytes = "0.4" futures = "0.1" tokio-io = "0.1" diff --git a/src/lib.rs b/src/lib.rs index 2689bc111..ec2dafa84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,8 @@ extern crate bitflags; extern crate futures; extern crate tokio_io; extern crate tokio_core; +extern crate mio; +extern crate net2; extern crate failure; #[macro_use] extern crate failure_derive; @@ -69,7 +71,6 @@ extern crate brotli2; extern crate percent_encoding; extern crate smallvec; extern crate num_cpus; -extern crate socket2; extern crate actix; extern crate h2 as http2; diff --git a/src/server.rs b/src/server.rs index 399115076..082fb5dec 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,9 +10,11 @@ use actix::dev::*; use futures::Stream; use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_core::reactor::Handle; use tokio_core::net::TcpStream; +use mio; use num_cpus; -use socket2::{Socket, Domain, Type}; +use net2::{TcpBuilder, TcpStreamExt}; #[cfg(feature="tls")] use futures::{future, Future}; @@ -103,7 +105,7 @@ pub struct HttpServer keep_alive: Option, factory: Arc U + Send + Sync>, workers: Vec>>, - sockets: HashMap, + sockets: HashMap, } impl Actor for HttpServer { @@ -160,6 +162,8 @@ impl HttpServer /// attempting to connect. It should only affect servers under significant load. /// /// Generally set in the 64-2048 range. Default value is 2048. + /// + /// This method should be called before `bind()` method call. pub fn backlog(mut self, num: i32) -> Self { self.backlog = num; self @@ -202,34 +206,22 @@ impl HttpServer let mut succ = false; if let Ok(iter) = addr.to_socket_addrs() { for addr in iter { - let socket = match addr { - net::SocketAddr::V4(a) => { - let socket = Socket::new(Domain::ipv4(), Type::stream(), None)?; - match socket.bind(&a.into()) { - Ok(_) => socket, - Err(e) => { - err = Some(e); - continue; - } - } - } - net::SocketAddr::V6(a) => { - let socket = Socket::new(Domain::ipv6(), Type::stream(), None)?; - match socket.bind(&a.into()) { - Ok(_) => socket, - Err(e) => { - err = Some(e); - continue - } - } - } + let builder = match addr { + net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, + net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, }; - succ = true; - socket.listen(self.backlog) - .expect("failed to set socket backlog"); - socket.set_reuse_address(true) - .expect("failed to set socket reuse address"); - self.sockets.insert(addr, socket); + match builder.bind(addr) { + Ok(builder) => match builder.reuse_address(true) { + Ok(builder) => { + succ = true; + let lst = builder.listen(self.backlog) + .expect("failed to set socket backlog"); + self.sockets.insert(lst.local_addr().unwrap(), lst); + }, + Err(e) => err = Some(e) + }, + Err(e) => err = Some(e), + } } } @@ -245,13 +237,13 @@ impl HttpServer } fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) - -> Vec>> + -> Vec>> { // start workers let mut workers = Vec::new(); for _ in 0..self.threads { let s = settings.clone(); - let (tx, rx) = mpsc::unbounded::>(); + let (tx, rx) = mpsc::unbounded::>(); let h = handler.clone(); let ka = self.keep_alive; @@ -309,7 +301,8 @@ impl HttpServer if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called befor start()"); } else { - let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let addrs: Vec<(net::SocketAddr, net::TcpListener)> = + self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let workers = self.start_workers(&settings, &StreamHandlerType::Normal); @@ -413,7 +406,8 @@ impl HttpServer where S: Stream + 'static { if !self.sockets.is_empty() { - let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let addrs: Vec<(net::SocketAddr, net::TcpListener)> = + self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let workers = self.start_workers(&settings, &StreamHandlerType::Normal); @@ -484,6 +478,7 @@ impl Handler, io::Error> for HttpServer /// Worker accepts Socket objects via unbounded channel and start requests processing. struct Worker { h: Rc>, + hnd: Handle, handler: StreamHandlerType, } @@ -528,6 +523,7 @@ impl Worker { fn new(h: Vec, handler: StreamHandlerType, keep_alive: Option) -> Worker { Worker { h: Rc::new(WorkerSettings::new(h, keep_alive)), + hnd: Arbiter::handle().clone(), handler: handler, } } @@ -546,21 +542,21 @@ impl Actor for Worker { } } -impl StreamHandler> for Worker +impl StreamHandler> for Worker where H: HttpHandler + 'static {} -impl Handler> for Worker +impl Handler> for Worker where H: HttpHandler + 'static, { - fn handle(&mut self, msg: IoStream, _: &mut Context) - -> Response> + fn handle(&mut self, msg: IoStream, _: &mut Context) + -> Response> { if !self.h.keep_alive_enabled() && msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err() { error!("Can not set socket keep-alive option"); } - self.handler.handle(Rc::clone(&self.h), msg); + self.handler.handle(Rc::clone(&self.h), &self.hnd, msg); Self::empty() } } @@ -576,25 +572,27 @@ enum StreamHandlerType { impl StreamHandlerType { - fn handle(&mut self, h: Rc>, msg: IoStream) { + fn handle(&mut self, + h: Rc>, + hnd: &Handle, + msg: IoStream) { match *self { StreamHandlerType::Normal => { - let io = TcpStream::from_stream(msg.io.into_tcp_stream(), Arbiter::handle()) + let io = TcpStream::from_stream(msg.io, hnd) .expect("failed to associate TCP stream"); - Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); + hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); } #[cfg(feature="tls")] StreamHandlerType::Tls(ref acceptor) => { let IoStream { io, peer, http2 } = msg; - let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle()) + let io = TcpStream::from_stream(io, hnd) .expect("failed to associate TCP stream"); Arbiter::handle().spawn( TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { match res { - Ok(io) => Arbiter::handle().spawn( - HttpChannel::new(h, io, peer, http2)), + Ok(io) => hnd.spawn(HttpChannel::new(h, io, peer, http2)), Err(err) => trace!("Error during handling tls connection: {}", err), }; @@ -605,10 +603,10 @@ impl StreamHandlerType { #[cfg(feature="alpn")] StreamHandlerType::Alpn(ref acceptor) => { let IoStream { io, peer, .. } = msg; - let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle()) + let io = TcpStream::from_stream(io, hnd) .expect("failed to associate TCP stream"); - Arbiter::handle().spawn( + hnd.spawn( SslAcceptorExt::accept_async(acceptor, io).then(move |res| { match res { Ok(io) => { @@ -631,24 +629,57 @@ impl StreamHandlerType { } } -fn start_accept_thread(sock: Socket, addr: net::SocketAddr, - workers: Vec>>) { - // start acceptors thread +fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, + workers: Vec>>) { + // start accept thread let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { let mut next = 0; + let server = mio::net::TcpListener::from_listener(sock, &addr) + .expect("Can not create mio::net::TcpListener"); + const SERVER: mio::Token = mio::Token(0); + + // Create a poll instance + let poll = match mio::Poll::new() { + Ok(poll) => poll, + Err(err) => panic!("Can not create mio::Poll: {}", err), + }; + + // Start listening for incoming connections + if let Err(err) = poll.register(&server, SERVER, + mio::Ready::readable(), mio::PollOpt::edge()) { + panic!("Can not register io: {}", err); + } + + // Create storage for events + let mut events = mio::Events::with_capacity(128); + loop { - match sock.accept() { - Ok((socket, addr)) => { - let addr = if let Some(addr) = addr.as_inet() { - net::SocketAddr::V4(addr) - } else { - net::SocketAddr::V6(addr.as_inet6().unwrap()) - }; - let msg = IoStream{io: socket, peer: Some(addr), http2: false}; - workers[next].unbounded_send(msg).expect("worker thread died"); - next = (next + 1) % workers.len(); + if let Err(err) = poll.poll(&mut events, None) { + panic!("Poll error: {}", err); + } + + for event in events.iter() { + match event.token() { + SERVER => { + loop { + match server.accept_std() { + Ok((sock, addr)) => { + let msg = IoStream{io: sock, peer: Some(addr), http2: false}; + workers[next] + .unbounded_send(msg).expect("worker thread died"); + next = (next + 1) % workers.len(); + }, + Err(err) => if err.kind() == io::ErrorKind::WouldBlock { + break + } else { + error!("Error accepting connection: {:?}", err); + return + } + } + } + } + _ => unreachable!(), } - Err(err) => error!("Error accepting connection: {:?}", err), } } }); diff --git a/src/test.rs b/src/test.rs index 333f216b9..1ff954e76 100644 --- a/src/test.rs +++ b/src/test.rs @@ -11,9 +11,9 @@ use cookie::Cookie; use http::{Uri, Method, Version, HeaderMap, HttpTryFrom}; use http::header::{HeaderName, HeaderValue}; use futures::Future; -use socket2::{Socket, Domain, Type}; use tokio_core::net::TcpListener; use tokio_core::reactor::Core; +use net2::TcpBuilder; use error::Error; use server::HttpServer; @@ -139,10 +139,10 @@ impl TestServer { /// Get firat available unused address pub fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = Socket::new(Domain::ipv4(), Type::stream(), None).unwrap(); - socket.bind(&addr.into()).unwrap(); - socket.set_reuse_address(true).unwrap(); - let tcp = socket.into_tcp_listener(); + let socket = TcpBuilder::new_v4().unwrap(); + socket.bind(&addr).unwrap(); + socket.reuse_address(true).unwrap(); + let tcp = socket.to_tcp_listener().unwrap(); tcp.local_addr().unwrap() } diff --git a/tests/test_server.rs b/tests/test_server.rs index 7605847ca..6884d30f1 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -4,7 +4,7 @@ extern crate tokio_core; extern crate reqwest; use std::thread; -use std::sync::Arc; +use std::sync::{Arc, mpsc}; use std::sync::atomic::{AtomicUsize, Ordering}; use actix_web::*; @@ -12,16 +12,20 @@ use actix::System; #[test] fn test_start() { - let addr = test::TestServer::unused_addr(); - let srv_addr = addr.clone(); + let _ = test::TestServer::unused_addr(); + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { let sys = System::new("test"); let srv = HttpServer::new( || vec![Application::new() .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]); - srv.bind(srv_addr).unwrap().start(); + let srv = srv.bind("127.0.0.1:0").unwrap(); + let _ = tx.send(srv.addrs()[0].clone()); + srv.start(); sys.run(); }); + let addr = rx.recv().unwrap(); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); }