use std::{io, net, thread}; use std::rc::Rc; use std::cell::{RefCell, RefMut}; use std::sync::Arc; use std::time::Duration; use std::marker::PhantomData; use std::collections::HashMap; use actix::dev::*; use futures::Stream; use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; use num_cpus; use socket2::{Socket, Domain, Type}; #[cfg(feature="tls")] use futures::{future, Future}; #[cfg(feature="tls")] use native_tls::TlsAcceptor; #[cfg(feature="tls")] use tokio_tls::{TlsStream, TlsAcceptorExt}; #[cfg(feature="alpn")] use futures::{future, Future}; #[cfg(feature="alpn")] use openssl::ssl::{SslMethod, SslAcceptor, SslAcceptorBuilder}; #[cfg(feature="alpn")] use openssl::pkcs12::ParsedPkcs12; #[cfg(feature="alpn")] use tokio_openssl::{SslStream, SslAcceptorExt}; use helpers; use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; /// Various server settings #[derive(Debug, Clone)] pub struct ServerSettings { addr: Option, secure: bool, host: String, } impl Default for ServerSettings { fn default() -> Self { ServerSettings { addr: None, secure: false, host: "localhost:8080".to_owned(), } } } impl ServerSettings { /// Crate server settings instance fn new(addr: Option, host: &Option, secure: bool) -> Self { let host = if let Some(ref host) = *host { host.clone() } else if let Some(ref addr) = addr { format!("{}", addr) } else { "localhost".to_owned() }; ServerSettings { addr: addr, secure: secure, host: host, } } /// Returns the socket address of the local half of this TCP connection pub fn local_addr(&self) -> Option { self.addr } /// Returns true if connection is secure(https) pub fn secure(&self) -> bool { self.secure } /// Returns host header value pub fn host(&self) -> &str { &self.host } } /// An HTTP Server /// /// `T` - async stream, anything that implements `AsyncRead` + `AsyncWrite`. /// /// `A` - peer address /// /// `H` - request handler pub struct HttpServer where H: 'static { h: Option>>, io: PhantomData, addr: PhantomData, threads: usize, backlog: i32, host: Option, keep_alive: Option, factory: Arc U + Send + Sync>, workers: Vec>>, sockets: HashMap, } impl Actor for HttpServer { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { self.update_time(ctx); } } impl HttpServer { fn update_time(&self, ctx: &mut Context) { helpers::update_date(); ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); } } impl HttpServer where A: 'static, T: AsyncRead + AsyncWrite + 'static, H: HttpHandler, U: IntoIterator + 'static, V: IntoHttpHandler, { /// Create new http server with application factory pub fn new(factory: F) -> Self where F: Sync + Send + 'static + Fn() -> U, { HttpServer{ h: None, io: PhantomData, addr: PhantomData, threads: num_cpus::get(), backlog: 2048, host: None, keep_alive: None, factory: Arc::new(factory), workers: Vec::new(), sockets: HashMap::new(), } } /// Set number of workers to start. /// /// By default http server uses number of available logical cpu as threads count. pub fn threads(mut self, num: usize) -> Self { self.threads = num; self } /// Set the maximum number of pending connections. /// /// This refers to the number of clients that can be waiting to be served. /// Exceeding this number results in the client getting an error when /// attempting to connect. It should only affect servers under significant load. /// /// Generally set in the 64-2048 range. Default value is 2048. pub fn backlog(mut self, num: i32) -> Self { self.backlog = num; self } /// Set server keep-alive setting. /// /// By default keep alive is enabled. /// /// - `Some(75)` - enable /// /// - `Some(0)` - disable /// /// - `None` - use `SO_KEEPALIVE` socket option pub fn keep_alive(mut self, val: Option) -> Self { self.keep_alive = val; self } /// Set server host name. /// /// Host name is used by application router aa a hostname for url generation. /// Check [ConnectionInfo](./dev/struct.ConnectionInfo.html#method.host) documentation /// for more information. pub fn server_hostname(mut self, val: String) -> Self { self.host = Some(val); self } /// Get addresses of bound sockets. pub fn addrs(&self) -> Vec { self.sockets.keys().map(|addr| addr.clone()).collect() } /// The socket address to bind /// /// To mind multiple addresses this method can be call multiple times. pub fn bind(mut self, addr: S) -> io::Result { let mut err = None; let mut succ = false; if let Ok(iter) = addr.to_socket_addrs() { for addr in iter { let socket = match addr { net::SocketAddr::V4(a) => { let socket = Socket::new(Domain::ipv4(), Type::stream(), None)?; match socket.bind(&a.into()) { Ok(_) => socket, Err(e) => { err = Some(e); continue; } } } net::SocketAddr::V6(a) => { let socket = Socket::new(Domain::ipv6(), Type::stream(), None)?; match socket.bind(&a.into()) { Ok(_) => socket, Err(e) => { err = Some(e); continue } } } }; succ = true; socket.listen(self.backlog) .expect("failed to set socket backlog"); socket.set_reuse_address(true) .expect("failed to set socket reuse address"); self.sockets.insert(addr, socket); } } if !succ { if let Some(e) = err.take() { Err(e) } else { Err(io::Error::new(io::ErrorKind::Other, "Can not bind to address.")) } } else { Ok(self) } } fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) -> Vec>> { // start workers let mut workers = Vec::new(); for _ in 0..self.threads { let s = settings.clone(); let (tx, rx) = mpsc::unbounded::>(); let h = handler.clone(); let ka = self.keep_alive; let factory = Arc::clone(&self.factory); let addr = Arbiter::start(move |ctx: &mut Context<_>| { let mut apps: Vec<_> = (*factory)() .into_iter().map(|h| h.into_handler()).collect(); for app in &mut apps { app.server_settings(s.clone()); } ctx.add_stream(rx); Worker::new(apps, h, ka) }); workers.push(tx); self.workers.push(addr); } info!("Starting {} http workers", self.threads); workers } } impl HttpServer where U: IntoIterator + 'static, V: IntoHttpHandler, { /// Start listening for incomming connections. /// /// This method starts number of http handler workers in seperate threads. /// For each address this method starts separate thread which does `accept()` in a loop. /// /// This methods panics if no socket addresses get bound. /// /// This method requires to run within properly configured `Actix` system. /// /// ```rust /// extern crate actix; /// extern crate actix_web; /// use actix_web::*; /// /// fn main() { /// let sys = actix::System::new("example"); // <- create Actix system /// /// HttpServer::new( /// || Application::new() /// .resource("/", |r| r.h(httpcodes::HTTPOk))) /// .bind("127.0.0.1:8088").expect("Can not bind to 127.0.0.1:8088") /// .start(); /// # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); /// /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// } /// ``` pub fn start(mut self) -> SyncAddress { if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called befor start()"); } else { let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let workers = self.start_workers(&settings, &StreamHandlerType::Normal); // start acceptors threads for (addr, sock) in addrs { info!("Starting http server on {}", addr); start_accept_thread(sock, addr, workers.clone()); } // start http server actor HttpServer::create(|_| {self}) } } } #[cfg(feature="tls")] impl HttpServer, net::SocketAddr, H, U> where U: IntoIterator + 'static, V: IntoHttpHandler, { /// Start listening for incomming tls connections. pub fn start_tls(mut self, pkcs12: ::Pkcs12) -> io::Result> { if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) } else { let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), false); let acceptor = match TlsAcceptor::builder(pkcs12) { Ok(builder) => { match builder.build() { Ok(acceptor) => acceptor, Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) } } Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) }; let workers = self.start_workers(&settings, &StreamHandlerType::Tls(acceptor)); // start acceptors threads for (addr, sock) in addrs { info!("Starting tls http server on {}", addr); start_accept_thread(sock, addr, workers.clone()); } // start http server actor Ok(HttpServer::create(|_| {self})) } } } #[cfg(feature="alpn")] impl HttpServer, net::SocketAddr, H, U> where U: IntoIterator + 'static, V: IntoHttpHandler, { /// Start listening for incomming tls connections. /// /// This method sets alpn protocols to "h2" and "http/1.1" pub fn start_ssl(mut self, identity: &ParsedPkcs12) -> io::Result> { if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) } else { let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), false); let acceptor = match SslAcceptorBuilder::mozilla_intermediate( SslMethod::tls(), &identity.pkey, &identity.cert, &identity.chain) { Ok(mut builder) => { match builder.set_alpn_protocols(&[b"h2", b"http/1.1"]) { Ok(_) => builder.build(), Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)), } }, Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)) }; let workers = self.start_workers(&settings, &StreamHandlerType::Alpn(acceptor)); // start acceptors threads for (addr, sock) in addrs { info!("Starting tls http server on {}", addr); start_accept_thread(sock, addr, workers.clone()); } // start http server actor Ok(HttpServer::create(|_| {self})) } } } impl HttpServer where A: 'static, T: AsyncRead + AsyncWrite + 'static, H: HttpHandler, U: IntoIterator + 'static, V: IntoHttpHandler, { /// Start listening for incomming connections from a stream. /// /// This method uses only one thread for handling incoming connections. pub fn start_incoming(mut self, stream: S, secure: bool) -> SyncAddress where S: Stream + 'static { if !self.sockets.is_empty() { let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let workers = self.start_workers(&settings, &StreamHandlerType::Normal); // start acceptors threads for (addr, sock) in addrs { info!("Starting http server on {}", addr); start_accept_thread(sock, addr, workers.clone()); } } // set server settings let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let settings = ServerSettings::new(Some(addr), &self.host, secure); let mut apps: Vec<_> = (*self.factory)().into_iter().map(|h| h.into_handler()).collect(); for app in &mut apps { app.server_settings(settings.clone()); } self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive))); // start server HttpServer::create(move |ctx| { ctx.add_stream(stream.map( move |(t, _)| IoStream{io: t, peer: None, http2: false})); self }) } } struct IoStream { io: T, peer: Option, http2: bool, } impl ResponseType for IoStream { type Item = (); type Error = (); } impl StreamHandler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static, U: 'static, A: 'static {} impl Handler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static, U: 'static, A: 'static, { fn error(&mut self, err: io::Error, _: &mut Context) { debug!("Error handling request: {}", err) } fn handle(&mut self, msg: IoStream, _: &mut Context) -> Response> { Arbiter::handle().spawn( HttpChannel::new(Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)); Self::empty() } } /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests processing. struct Worker { h: Rc>, handler: StreamHandlerType, } pub(crate) struct WorkerSettings { h: RefCell>, enabled: bool, keep_alive: u64, bytes: Rc, messages: Rc, } impl WorkerSettings { pub(crate) fn new(h: Vec, keep_alive: Option) -> WorkerSettings { WorkerSettings { h: RefCell::new(h), enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, keep_alive: keep_alive.unwrap_or(0), bytes: Rc::new(helpers::SharedBytesPool::new()), messages: Rc::new(helpers::SharedMessagePool::new()), } } pub fn handlers(&self) -> RefMut> { self.h.borrow_mut() } pub fn keep_alive(&self) -> u64 { self.keep_alive } pub fn keep_alive_enabled(&self) -> bool { self.enabled } pub fn get_shared_bytes(&self) -> helpers::SharedBytes { helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) } pub fn get_http_message(&self) -> helpers::SharedHttpMessage { helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages)) } } impl Worker { fn new(h: Vec, handler: StreamHandlerType, keep_alive: Option) -> Worker { Worker { h: Rc::new(WorkerSettings::new(h, keep_alive)), handler: handler, } } fn update_time(&self, ctx: &mut Context) { helpers::update_date(); ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); } } impl Actor for Worker { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { self.update_time(ctx); } } impl StreamHandler> for Worker where H: HttpHandler + 'static {} impl Handler> for Worker where H: HttpHandler + 'static, { fn handle(&mut self, msg: IoStream, _: &mut Context) -> Response> { if !self.h.keep_alive_enabled() && msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err() { error!("Can not set socket keep-alive option"); } self.handler.handle(Rc::clone(&self.h), msg); Self::empty() } } #[derive(Clone)] enum StreamHandlerType { Normal, #[cfg(feature="tls")] Tls(TlsAcceptor), #[cfg(feature="alpn")] Alpn(SslAcceptor), } impl StreamHandlerType { fn handle(&mut self, h: Rc>, msg: IoStream) { match *self { StreamHandlerType::Normal => { let io = TcpStream::from_stream(msg.io.into_tcp_stream(), Arbiter::handle()) .expect("failed to associate TCP stream"); Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); } #[cfg(feature="tls")] StreamHandlerType::Tls(ref acceptor) => { let IoStream { io, peer, http2 } = msg; let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle()) .expect("failed to associate TCP stream"); Arbiter::handle().spawn( TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { match res { Ok(io) => Arbiter::handle().spawn( HttpChannel::new(h, io, peer, http2)), Err(err) => trace!("Error during handling tls connection: {}", err), }; future::result(Ok(())) }) ); } #[cfg(feature="alpn")] StreamHandlerType::Alpn(ref acceptor) => { let IoStream { io, peer, .. } = msg; let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle()) .expect("failed to associate TCP stream"); Arbiter::handle().spawn( SslAcceptorExt::accept_async(acceptor, io).then(move |res| { match res { Ok(io) => { let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol() { p.len() == 2 && &p == b"h2" } else { false }; Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2)); }, Err(err) => trace!("Error during handling tls connection: {}", err), }; future::result(Ok(())) }) ); } } } } fn start_accept_thread(sock: Socket, addr: net::SocketAddr, workers: Vec>>) { // start acceptors thread let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { let mut next = 0; loop { match sock.accept() { Ok((socket, addr)) => { let addr = if let Some(addr) = addr.as_inet() { net::SocketAddr::V4(addr) } else { net::SocketAddr::V6(addr.as_inet6().unwrap()) }; let msg = IoStream{io: socket, peer: Some(addr), http2: false}; workers[next].unbounded_send(msg).expect("worker thread died"); next = (next + 1) % workers.len(); } Err(err) => error!("Error accepting connection: {:?}", err), } } }); }