From 27d92f3a238283fdf4b1b09bec95e96363288042 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 17 Dec 2017 12:35:04 -0800 Subject: [PATCH] refactor server bind and start process --- Cargo.toml | 6 +- README.md | 6 +- examples/basic.rs | 3 +- examples/state.rs | 3 +- examples/websocket.rs | 3 +- guide/src/qs_2.md | 3 +- guide/src/qs_4.md | 3 +- src/lib.rs | 2 +- src/server.rs | 206 ++++++++++++++++++++++++------------------ tests/test_server.rs | 7 +- 10 files changed, 139 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a86dee33..5d80682d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.3.0" authors = ["Nikolay Kim "] description = "Actix web framework" readme = "README.md" -keywords = ["http", "http2", "web", "async", "futures"] +keywords = ["http", "web", "framework", "async", "futures"] homepage = "https://github.com/actix/actix-web" repository = "https://github.com/actix/actix-web.git" documentation = "https://docs.rs/actix-web/" @@ -74,8 +74,6 @@ tokio-openssl = { version="0.1", optional = true } [dependencies.actix] version = "^0.3.1" -#path = "../actix" -#git = "https://github.com/actix/actix.git" default-features = false features = [] @@ -96,4 +94,4 @@ version_check = "0.1" [profile.release] lto = true opt-level = 3 -debug = true +# debug = true diff --git a/README.md b/README.md index 56cbff26..532c3692 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,8 @@ fn main() { HttpServer::new( || Application::new() .resource("/{name}", |r| r.f(index))) - .serve("127.0.0.1:8080"); + .bind("127.0.0.1:8080")? + .start(); } ``` @@ -34,8 +35,7 @@ fn main() { * Transparent content compression/decompression (br, gzip, deflate) * Configurable request routing * Multipart streams - * Middlewares ( - [Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging), + * Middlewares ([Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging), [Session](https://actix.github.io/actix-web/guide/qs_10.html#user-sessions), [DefaultHeaders](https://actix.github.io/actix-web/guide/qs_10.html#default-headers)) * Built on top of [Actix](https://github.com/actix/actix). diff --git a/examples/basic.rs b/examples/basic.rs index 22dfaba3..9c182817 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -93,7 +93,8 @@ fn main() { .header("LOCATION", "/index.html") .body(Body::Empty) }))) - .serve::<_, ()>("127.0.0.1:8080").unwrap(); + .bind("127.0.0.1:8080").unwrap() + .start().unwrap(); println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); diff --git a/examples/state.rs b/examples/state.rs index aef09fc2..c36ac19d 100644 --- a/examples/state.rs +++ b/examples/state.rs @@ -70,7 +70,8 @@ fn main() { r.method(Method::GET).f(|req| ws::start(req, MyWebSocket{counter: 0}))) // register simple handler, handle all methods .resource("/", |r| r.f(index))) - .serve::<_, ()>("127.0.0.1:8080").unwrap(); + .bind("127.0.0.1:8080").unwrap() + .start().unwrap(); println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); diff --git a/examples/websocket.rs b/examples/websocket.rs index cbcf91c1..8f62ef29 100644 --- a/examples/websocket.rs +++ b/examples/websocket.rs @@ -70,7 +70,8 @@ fn main() { .resource("/{tail:.*}", |r| r.h(fs::StaticFiles::new("tail", "examples/static/", true)))) // start http server on 127.0.0.1:8080 - .serve::<_, ()>("127.0.0.1:8080").unwrap(); + .bind("127.0.0.1:8080").unwrap() + .start().unwrap(); println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); diff --git a/guide/src/qs_2.md b/guide/src/qs_2.md index 0c29f527..b76855c8 100644 --- a/guide/src/qs_2.md +++ b/guide/src/qs_2.md @@ -81,7 +81,8 @@ fn main() { HttpServer::new( || Application::new() .resource("/", |r| r.f(index))) - .serve::<_, ()>("127.0.0.1:8088").unwrap(); + .bind("127.0.0.1:8088").unwrap() + .start(); println!("Started http server: 127.0.0.1:8088"); # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index 077c71fc..25528c45 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -82,7 +82,8 @@ fn main() { HttpServer::new( || Application::new() .resource("/", |r| r.method(Method::GET).f(index))) - .serve::<_, ()>("127.0.0.1:8088").unwrap(); + .bind("127.0.0.1:8088").unwrap() + .start(); println!("Started http server: 127.0.0.1:8088"); # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); diff --git a/src/lib.rs b/src/lib.rs index d9563e1f..92ed2ea5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ //! HttpServer::new( //! || Application::new() //! .resource("/{name}", |r| r.f(index))) -//! .serve::<_, ()>("127.0.0.1:8080"); +//! .serve("127.0.0.1:8080"); //! } //! ``` //! diff --git a/src/server.rs b/src/server.rs index 363a268a..f787621f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,6 +3,7 @@ use std::rc::Rc; use std::sync::Arc; use std::time::Duration; use std::marker::PhantomData; +use std::collections::HashMap; use actix::dev::*; use futures::Stream; @@ -94,9 +95,11 @@ pub struct HttpServer io: PhantomData, addr: PhantomData, threads: usize, + backlog: i32, keep_alive: Option, factory: Arc U + Send + Sync>, workers: Vec>>, + sockets: HashMap, } impl Actor for HttpServer { @@ -129,9 +132,11 @@ impl HttpServer io: PhantomData, addr: PhantomData, threads: num_cpus::get(), + backlog: 2048, keep_alive: None, factory: Arc::new(factory), workers: Vec::new(), + sockets: HashMap::new(), } } @@ -143,6 +148,18 @@ impl HttpServer self } + /// Set the maximum number of pending connections. + /// + /// This refers to the number of clients that can be waiting to be served. + /// Exceeding this number results in the client getting an error when + /// attempting to connect. It should only affect servers under significant load. + /// + /// Generally set in the 64-2048 range. Default value is 2048. + pub fn backlog(mut self, num: i32) -> Self { + self.backlog = num; + self + } + /// Set server keep-alive setting. /// /// By default keep alive is enabled. @@ -160,10 +177,22 @@ impl HttpServer /// Start listening for incomming connections from a stream. /// /// This method uses only one thread for handling incoming connections. - pub fn serve_incoming(mut self, stream: S, secure: bool) -> io::Result + pub fn start_incoming(mut self, stream: S, secure: bool) -> io::Result where Self: ActorAddress, S: Stream + 'static { + if !self.sockets.is_empty() { + let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let settings = ServerSettings::new(Some(addrs[0].0), false); + let workers = self.start_workers(&settings, &StreamHandlerType::Normal); + + // start acceptors threads + for (addr, sock) in addrs { + info!("Starting http server on {}", addr); + start_accept_thread(sock, addr, workers.clone()); + } + } + // set server settings let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let settings = ServerSettings::new(Some(addr), secure); @@ -181,52 +210,53 @@ impl HttpServer })) } - fn bind(&self, addr: S) - -> io::Result> - { + /// The socket address to bind + /// + /// To mind multiple addresses this method can be call multiple times. + pub fn bind(mut self, addr: S) -> io::Result { let mut err = None; - let mut sockets = Vec::new(); + let mut succ = false; if let Ok(iter) = addr.to_socket_addrs() { for addr in iter { - match addr { + let socket = match addr { net::SocketAddr::V4(a) => { let socket = Socket::new(Domain::ipv4(), Type::stream(), None)?; match socket.bind(&a.into()) { - Ok(_) => { - socket.listen(1024) - .expect("failed to set socket backlog"); - socket.set_reuse_address(true) - .expect("failed to set socket reuse address"); - sockets.push((addr, socket)); - }, - Err(e) => err = Some(e), + Ok(_) => socket, + Err(e) => { + err = Some(e); + continue; + } } } net::SocketAddr::V6(a) => { let socket = Socket::new(Domain::ipv6(), Type::stream(), None)?; match socket.bind(&a.into()) { - Ok(_) => { - socket.listen(1024) - .expect("failed to set socket backlog"); - socket.set_reuse_address(true) - .expect("failed to set socket reuse address"); - sockets.push((addr, socket)) + Ok(_) => socket, + Err(e) => { + err = Some(e); + continue } - Err(e) => err = Some(e), } } - } + }; + succ = true; + socket.listen(self.backlog) + .expect("failed to set socket backlog"); + socket.set_reuse_address(true) + .expect("failed to set socket reuse address"); + self.sockets.insert(addr, socket); } } - if sockets.is_empty() { + if !succ { if let Some(e) = err.take() { Err(e) } else { Err(io::Error::new(io::ErrorKind::Other, "Can not bind to address.")) } } else { - Ok(sockets) + Ok(self) } } @@ -265,26 +295,26 @@ impl HttpServer { /// Start listening for incomming connections. /// - /// This methods converts address to list of `SocketAddr` - /// then binds to all available addresses. - /// It also starts number of http handler workers in seperate threads. + /// This method starts number of http handler workers in seperate threads. /// For each address this method starts separate thread which does `accept()` in a loop. - pub fn serve(mut self, addr: S) -> io::Result - where Self: ActorAddress, - S: net::ToSocketAddrs, + pub fn start(mut self) -> io::Result> { - let addrs = self.bind(addr)?; - let settings = ServerSettings::new(Some(addrs[0].0), false); - let workers = self.start_workers(&settings, &StreamHandlerType::Normal); + if self.sockets.is_empty() { + Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) + } else { + let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let settings = ServerSettings::new(Some(addrs[0].0), false); + let workers = self.start_workers(&settings, &StreamHandlerType::Normal); - // start acceptors threads - for (addr, sock) in addrs { - info!("Starting http server on {}", addr); - start_accept_thread(sock, addr, workers.clone()); + // start acceptors threads + for (addr, sock) in addrs { + info!("Starting http server on {}", addr); + start_accept_thread(sock, addr, workers.clone()); + } + + // start http server actor + Ok(HttpServer::create(|_| {self})) } - - // start http server actor - Ok(HttpServer::create(|_| {self})) } } @@ -294,34 +324,34 @@ impl HttpServer, net::SocketAddr, H, V: IntoHttpHandler, { /// Start listening for incomming tls connections. - /// - /// This methods converts address to list of `SocketAddr` - /// then binds to all available addresses. - pub fn serve_tls(mut self, addr: S, pkcs12: ::Pkcs12) -> io::Result + pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result where Self: ActorAddress, - S: net::ToSocketAddrs, { - let addrs = self.bind(addr)?; - let settings = ServerSettings::new(Some(addrs[0].0), false); - let acceptor = match TlsAcceptor::builder(pkcs12) { - Ok(builder) => { - match builder.build() { - Ok(acceptor) => acceptor, - Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) + if self.sockets.is_empty() { + Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) + } else { + let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let settings = ServerSettings::new(Some(addrs[0].0), false); + let acceptor = match TlsAcceptor::builder(pkcs12) { + Ok(builder) => { + match builder.build() { + Ok(acceptor) => acceptor, + Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) + } } + Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) + }; + let workers = self.start_workers(&settings, &StreamHandlerType::Tls(acceptor)); + + // start acceptors threads + for (addr, sock) in addrs { + info!("Starting tls http server on {}", addr); + start_accept_thread(sock, addr, workers.clone()); } - Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) - }; - let workers = self.start_workers(&settings, &StreamHandlerType::Tls(acceptor)); - // start acceptors threads - for (addr, sock) in addrs { - info!("Starting tls http server on {}", addr); - start_accept_thread(sock, addr, workers.clone()); + // start http server actor + Ok(HttpServer::create(|_| {self})) } - - // start http server actor - Ok(HttpServer::create(|_| {self})) } } @@ -332,35 +362,37 @@ impl HttpServer, net::SocketAddr, H, { /// Start listening for incomming tls connections. /// - /// This methods converts address to list of `SocketAddr` - /// then binds to all available addresses. - pub fn serve_ssl(mut self, addr: S, identity: &ParsedPkcs12) -> io::Result + /// This method sets alpn protocols to "h2" and "http/1.1" + pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result where Self: ActorAddress, - S: net::ToSocketAddrs, { - let addrs = self.bind(addr)?; - let settings = ServerSettings::new(Some(addrs[0].0), false); - let acceptor = match SslAcceptorBuilder::mozilla_intermediate( - SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain) - { - Ok(mut builder) => { - match builder.set_alpn_protocols(&[b"h2", b"http/1.1"]) { - Ok(_) => builder.build(), - Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)), - } - }, - Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) - }; - let workers = self.start_workers(&settings, &StreamHandlerType::Alpn(acceptor)); + if self.sockets.is_empty() { + Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) + } else { + let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); + let settings = ServerSettings::new(Some(addrs[0].0), false); + let acceptor = match SslAcceptorBuilder::mozilla_intermediate( + SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain) + { + Ok(mut builder) => { + match builder.set_alpn_protocols(&[b"h2", b"http/1.1"]) { + Ok(_) => builder.build(), + Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)), + } + }, + Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) + }; + let workers = self.start_workers(&settings, &StreamHandlerType::Alpn(acceptor)); - // start acceptors threads - for (addr, sock) in addrs { - info!("Starting tls http server on {}", addr); - start_accept_thread(sock, addr, workers.clone()); + // start acceptors threads + for (addr, sock) in addrs { + info!("Starting tls http server on {}", addr); + start_accept_thread(sock, addr, workers.clone()); + } + + // start http server actor + Ok(HttpServer::create(|_| {self})) } - - // start http server actor - Ok(HttpServer::create(|_| {self})) } } diff --git a/tests/test_server.rs b/tests/test_server.rs index 53dacbba..3dbc924b 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -18,7 +18,7 @@ fn test_serve() { let srv = HttpServer::new( || vec![Application::new() .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]); - srv.serve::<_, ()>("127.0.0.1:58902").unwrap(); + srv.bind("127.0.0.1:58902").unwrap().start().unwrap(); sys.run(); }); assert!(reqwest::get("http://localhost:58902/").unwrap().status().is_success()); @@ -39,7 +39,7 @@ fn test_serve_incoming() { || Application::new() .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))); let tcp = TcpListener::from_listener(tcp, &addr2, Arbiter::handle()).unwrap(); - srv.serve_incoming::<_, ()>(tcp.incoming(), false).unwrap(); + srv.start_incoming::<_, ()>(tcp.incoming(), false).unwrap(); sys.run(); }); @@ -89,7 +89,8 @@ fn test_middlewares() { response: Arc::clone(&act_num2), finish: Arc::clone(&act_num3)}) .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]) - .serve::<_, ()>("127.0.0.1:58904").unwrap(); + .bind("127.0.0.1:58904").unwrap() + .start().unwrap(); sys.run(); });