1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-12-02 19:32:24 +01:00

refactor server bind and start process

This commit is contained in:
Nikolay Kim 2017-12-17 12:35:04 -08:00
parent 4b421b44a2
commit 27d92f3a23
10 changed files with 139 additions and 103 deletions

View File

@ -4,7 +4,7 @@ version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web framework" description = "Actix web framework"
readme = "README.md" readme = "README.md"
keywords = ["http", "http2", "web", "async", "futures"] keywords = ["http", "web", "framework", "async", "futures"]
homepage = "https://github.com/actix/actix-web" homepage = "https://github.com/actix/actix-web"
repository = "https://github.com/actix/actix-web.git" repository = "https://github.com/actix/actix-web.git"
documentation = "https://docs.rs/actix-web/" documentation = "https://docs.rs/actix-web/"
@ -74,8 +74,6 @@ tokio-openssl = { version="0.1", optional = true }
[dependencies.actix] [dependencies.actix]
version = "^0.3.1" version = "^0.3.1"
#path = "../actix"
#git = "https://github.com/actix/actix.git"
default-features = false default-features = false
features = [] features = []
@ -96,4 +94,4 @@ version_check = "0.1"
[profile.release] [profile.release]
lto = true lto = true
opt-level = 3 opt-level = 3
debug = true # debug = true

View File

@ -13,7 +13,8 @@ fn main() {
HttpServer::new( HttpServer::new(
|| Application::new() || Application::new()
.resource("/{name}", |r| r.f(index))) .resource("/{name}", |r| r.f(index)))
.serve("127.0.0.1:8080"); .bind("127.0.0.1:8080")?
.start();
} }
``` ```
@ -34,8 +35,7 @@ fn main() {
* Transparent content compression/decompression (br, gzip, deflate) * Transparent content compression/decompression (br, gzip, deflate)
* Configurable request routing * Configurable request routing
* Multipart streams * Multipart streams
* Middlewares ( * Middlewares ([Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging),
[Logger](https://actix.github.io/actix-web/guide/qs_10.html#logging),
[Session](https://actix.github.io/actix-web/guide/qs_10.html#user-sessions), [Session](https://actix.github.io/actix-web/guide/qs_10.html#user-sessions),
[DefaultHeaders](https://actix.github.io/actix-web/guide/qs_10.html#default-headers)) [DefaultHeaders](https://actix.github.io/actix-web/guide/qs_10.html#default-headers))
* Built on top of [Actix](https://github.com/actix/actix). * Built on top of [Actix](https://github.com/actix/actix).

View File

@ -93,7 +93,8 @@ fn main() {
.header("LOCATION", "/index.html") .header("LOCATION", "/index.html")
.body(Body::Empty) .body(Body::Empty)
}))) })))
.serve::<_, ()>("127.0.0.1:8080").unwrap(); .bind("127.0.0.1:8080").unwrap()
.start().unwrap();
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -70,7 +70,8 @@ fn main() {
r.method(Method::GET).f(|req| ws::start(req, MyWebSocket{counter: 0}))) r.method(Method::GET).f(|req| ws::start(req, MyWebSocket{counter: 0})))
// register simple handler, handle all methods // register simple handler, handle all methods
.resource("/", |r| r.f(index))) .resource("/", |r| r.f(index)))
.serve::<_, ()>("127.0.0.1:8080").unwrap(); .bind("127.0.0.1:8080").unwrap()
.start().unwrap();
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -70,7 +70,8 @@ fn main() {
.resource("/{tail:.*}", .resource("/{tail:.*}",
|r| r.h(fs::StaticFiles::new("tail", "examples/static/", true)))) |r| r.h(fs::StaticFiles::new("tail", "examples/static/", true))))
// start http server on 127.0.0.1:8080 // start http server on 127.0.0.1:8080
.serve::<_, ()>("127.0.0.1:8080").unwrap(); .bind("127.0.0.1:8080").unwrap()
.start().unwrap();
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -81,7 +81,8 @@ fn main() {
HttpServer::new( HttpServer::new(
|| Application::new() || Application::new()
.resource("/", |r| r.f(index))) .resource("/", |r| r.f(index)))
.serve::<_, ()>("127.0.0.1:8088").unwrap(); .bind("127.0.0.1:8088").unwrap()
.start();
println!("Started http server: 127.0.0.1:8088"); println!("Started http server: 127.0.0.1:8088");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); # actix::Arbiter::system().send(actix::msgs::SystemExit(0));

View File

@ -82,7 +82,8 @@ fn main() {
HttpServer::new( HttpServer::new(
|| Application::new() || Application::new()
.resource("/", |r| r.method(Method::GET).f(index))) .resource("/", |r| r.method(Method::GET).f(index)))
.serve::<_, ()>("127.0.0.1:8088").unwrap(); .bind("127.0.0.1:8088").unwrap()
.start();
println!("Started http server: 127.0.0.1:8088"); println!("Started http server: 127.0.0.1:8088");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); # actix::Arbiter::system().send(actix::msgs::SystemExit(0));

View File

@ -11,7 +11,7 @@
//! HttpServer::new( //! HttpServer::new(
//! || Application::new() //! || Application::new()
//! .resource("/{name}", |r| r.f(index))) //! .resource("/{name}", |r| r.f(index)))
//! .serve::<_, ()>("127.0.0.1:8080"); //! .serve("127.0.0.1:8080");
//! } //! }
//! ``` //! ```
//! //!

View File

@ -3,6 +3,7 @@ use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::collections::HashMap;
use actix::dev::*; use actix::dev::*;
use futures::Stream; use futures::Stream;
@ -94,9 +95,11 @@ pub struct HttpServer<T, A, H, U>
io: PhantomData<T>, io: PhantomData<T>,
addr: PhantomData<A>, addr: PhantomData<A>,
threads: usize, threads: usize,
backlog: i32,
keep_alive: Option<u64>, keep_alive: Option<u64>,
factory: Arc<Fn() -> U + Send + Sync>, factory: Arc<Fn() -> U + Send + Sync>,
workers: Vec<SyncAddress<Worker<H>>>, workers: Vec<SyncAddress<Worker<H>>>,
sockets: HashMap<net::SocketAddr, Socket>,
} }
impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> { impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> {
@ -129,9 +132,11 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
io: PhantomData, io: PhantomData,
addr: PhantomData, addr: PhantomData,
threads: num_cpus::get(), threads: num_cpus::get(),
backlog: 2048,
keep_alive: None, keep_alive: None,
factory: Arc::new(factory), factory: Arc::new(factory),
workers: Vec::new(), workers: Vec::new(),
sockets: HashMap::new(),
} }
} }
@ -143,6 +148,18 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
self self
} }
/// Set the maximum number of pending connections.
///
/// This refers to the number of clients that can be waiting to be served.
/// Exceeding this number results in the client getting an error when
/// attempting to connect. It should only affect servers under significant load.
///
/// Generally set in the 64-2048 range. Default value is 2048.
pub fn backlog(mut self, num: i32) -> Self {
self.backlog = num;
self
}
/// Set server keep-alive setting. /// Set server keep-alive setting.
/// ///
/// By default keep alive is enabled. /// By default keep alive is enabled.
@ -160,10 +177,22 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
/// Start listening for incomming connections from a stream. /// Start listening for incomming connections from a stream.
/// ///
/// This method uses only one thread for handling incoming connections. /// This method uses only one thread for handling incoming connections.
pub fn serve_incoming<S, Addr>(mut self, stream: S, secure: bool) -> io::Result<Addr> pub fn start_incoming<S, Addr>(mut self, stream: S, secure: bool) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>, where Self: ActorAddress<Self, Addr>,
S: Stream<Item=(T, A), Error=io::Error> + 'static S: Stream<Item=(T, A), Error=io::Error> + 'static
{ {
if !self.sockets.is_empty() {
let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
// start acceptors threads
for (addr, sock) in addrs {
info!("Starting http server on {}", addr);
start_accept_thread(sock, addr, workers.clone());
}
}
// set server settings // set server settings
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let settings = ServerSettings::new(Some(addr), secure); let settings = ServerSettings::new(Some(addr), secure);
@ -181,52 +210,53 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
})) }))
} }
fn bind<S: net::ToSocketAddrs>(&self, addr: S) /// The socket address to bind
-> io::Result<Vec<(net::SocketAddr, Socket)>> ///
{ /// To mind multiple addresses this method can be call multiple times.
pub fn bind<S: net::ToSocketAddrs>(mut self, addr: S) -> io::Result<Self> {
let mut err = None; let mut err = None;
let mut sockets = Vec::new(); let mut succ = false;
if let Ok(iter) = addr.to_socket_addrs() { if let Ok(iter) = addr.to_socket_addrs() {
for addr in iter { for addr in iter {
match addr { let socket = match addr {
net::SocketAddr::V4(a) => { net::SocketAddr::V4(a) => {
let socket = Socket::new(Domain::ipv4(), Type::stream(), None)?; let socket = Socket::new(Domain::ipv4(), Type::stream(), None)?;
match socket.bind(&a.into()) { match socket.bind(&a.into()) {
Ok(_) => { Ok(_) => socket,
socket.listen(1024) Err(e) => {
.expect("failed to set socket backlog"); err = Some(e);
socket.set_reuse_address(true) continue;
.expect("failed to set socket reuse address"); }
sockets.push((addr, socket));
},
Err(e) => err = Some(e),
} }
} }
net::SocketAddr::V6(a) => { net::SocketAddr::V6(a) => {
let socket = Socket::new(Domain::ipv6(), Type::stream(), None)?; let socket = Socket::new(Domain::ipv6(), Type::stream(), None)?;
match socket.bind(&a.into()) { match socket.bind(&a.into()) {
Ok(_) => { Ok(_) => socket,
socket.listen(1024) Err(e) => {
err = Some(e);
continue
}
}
}
};
succ = true;
socket.listen(self.backlog)
.expect("failed to set socket backlog"); .expect("failed to set socket backlog");
socket.set_reuse_address(true) socket.set_reuse_address(true)
.expect("failed to set socket reuse address"); .expect("failed to set socket reuse address");
sockets.push((addr, socket)) self.sockets.insert(addr, socket);
}
Err(e) => err = Some(e),
}
}
}
} }
} }
if sockets.is_empty() { if !succ {
if let Some(e) = err.take() { if let Some(e) = err.take() {
Err(e) Err(e)
} else { } else {
Err(io::Error::new(io::ErrorKind::Other, "Can not bind to address.")) Err(io::Error::new(io::ErrorKind::Other, "Can not bind to address."))
} }
} else { } else {
Ok(sockets) Ok(self)
} }
} }
@ -265,15 +295,14 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
{ {
/// Start listening for incomming connections. /// Start listening for incomming connections.
/// ///
/// This methods converts address to list of `SocketAddr` /// This method starts number of http handler workers in seperate threads.
/// then binds to all available addresses.
/// It also starts number of http handler workers in seperate threads.
/// For each address this method starts separate thread which does `accept()` in a loop. /// For each address this method starts separate thread which does `accept()` in a loop.
pub fn serve<S, Addr>(mut self, addr: S) -> io::Result<Addr> pub fn start(mut self) -> io::Result<SyncAddress<Self>>
where Self: ActorAddress<Self, Addr>,
S: net::ToSocketAddrs,
{ {
let addrs = self.bind(addr)?; if self.sockets.is_empty() {
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), false); let settings = ServerSettings::new(Some(addrs[0].0), false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal); let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
@ -286,6 +315,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
// start http server actor // start http server actor
Ok(HttpServer::create(|_| {self})) Ok(HttpServer::create(|_| {self}))
} }
}
} }
#[cfg(feature="tls")] #[cfg(feature="tls")]
@ -294,14 +324,13 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
V: IntoHttpHandler<Handler=H>, V: IntoHttpHandler<Handler=H>,
{ {
/// Start listening for incomming tls connections. /// Start listening for incomming tls connections.
/// pub fn start_tls<Addr>(mut self, pkcs12: ::Pkcs12) -> io::Result<Addr>
/// This methods converts address to list of `SocketAddr`
/// then binds to all available addresses.
pub fn serve_tls<S, Addr>(mut self, addr: S, pkcs12: ::Pkcs12) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>, where Self: ActorAddress<Self, Addr>,
S: net::ToSocketAddrs,
{ {
let addrs = self.bind(addr)?; if self.sockets.is_empty() {
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), false); let settings = ServerSettings::new(Some(addrs[0].0), false);
let acceptor = match TlsAcceptor::builder(pkcs12) { let acceptor = match TlsAcceptor::builder(pkcs12) {
Ok(builder) => { Ok(builder) => {
@ -323,6 +352,7 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
// start http server actor // start http server actor
Ok(HttpServer::create(|_| {self})) Ok(HttpServer::create(|_| {self}))
} }
}
} }
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
@ -332,13 +362,14 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
{ {
/// Start listening for incomming tls connections. /// Start listening for incomming tls connections.
/// ///
/// This methods converts address to list of `SocketAddr` /// This method sets alpn protocols to "h2" and "http/1.1"
/// then binds to all available addresses. pub fn start_ssl<Addr>(mut self, identity: &ParsedPkcs12) -> io::Result<Addr>
pub fn serve_ssl<S, Addr>(mut self, addr: S, identity: &ParsedPkcs12) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>, where Self: ActorAddress<Self, Addr>,
S: net::ToSocketAddrs,
{ {
let addrs = self.bind(addr)?; if self.sockets.is_empty() {
Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound"))
} else {
let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), false); let settings = ServerSettings::new(Some(addrs[0].0), false);
let acceptor = match SslAcceptorBuilder::mozilla_intermediate( let acceptor = match SslAcceptorBuilder::mozilla_intermediate(
SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain) SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain)
@ -362,6 +393,7 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
// start http server actor // start http server actor
Ok(HttpServer::create(|_| {self})) Ok(HttpServer::create(|_| {self}))
} }
}
} }
struct IoStream<T> { struct IoStream<T> {

View File

@ -18,7 +18,7 @@ fn test_serve() {
let srv = HttpServer::new( let srv = HttpServer::new(
|| vec![Application::new() || vec![Application::new()
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]); .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]);
srv.serve::<_, ()>("127.0.0.1:58902").unwrap(); srv.bind("127.0.0.1:58902").unwrap().start().unwrap();
sys.run(); sys.run();
}); });
assert!(reqwest::get("http://localhost:58902/").unwrap().status().is_success()); assert!(reqwest::get("http://localhost:58902/").unwrap().status().is_success());
@ -39,7 +39,7 @@ fn test_serve_incoming() {
|| Application::new() || Application::new()
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))); .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk)));
let tcp = TcpListener::from_listener(tcp, &addr2, Arbiter::handle()).unwrap(); let tcp = TcpListener::from_listener(tcp, &addr2, Arbiter::handle()).unwrap();
srv.serve_incoming::<_, ()>(tcp.incoming(), false).unwrap(); srv.start_incoming::<_, ()>(tcp.incoming(), false).unwrap();
sys.run(); sys.run();
}); });
@ -89,7 +89,8 @@ fn test_middlewares() {
response: Arc::clone(&act_num2), response: Arc::clone(&act_num2),
finish: Arc::clone(&act_num3)}) finish: Arc::clone(&act_num3)})
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]) .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))])
.serve::<_, ()>("127.0.0.1:58904").unwrap(); .bind("127.0.0.1:58904").unwrap()
.start().unwrap();
sys.run(); sys.run();
}); });