From 556de7293215153e08c33559456a4c786de68bf9 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 27 Dec 2017 17:49:10 -0800 Subject: [PATCH] add server spawn method --- README.md | 2 +- guide/src/qs_3_5.md | 29 +++++++++++++++++ src/server.rs | 78 +++++++++++++++++++++++++++++++++++++-------- 3 files changed, 95 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index cd9d5a828..fff88d049 100644 --- a/README.md +++ b/README.md @@ -68,4 +68,4 @@ This project is licensed under either of at your option. -[![Analytics](https://ga-beacon.appspot.com/UA-110322332-2/actix-web/readme?pixel)](https://github.com/igrigorik/ga-beacon) +[![Analytics](https://ga-beacon.appspot.com/UA-110322332-2/actix-web/readme?flat&useReferer)](https://github.com/igrigorik/ga-beacon) diff --git a/guide/src/qs_3_5.md b/guide/src/qs_3_5.md index 647aa9654..3a3e04c4f 100644 --- a/guide/src/qs_3_5.md +++ b/guide/src/qs_3_5.md @@ -29,6 +29,35 @@ fn main() { } ``` +It is possible to start server in separate thread with *spawn()* method. In that +case server spawns new thread and create new actix system in it. To stop +this server send `StopServer` message. + +Http server is implemented as an actix actor. It is possible to communicate with server +via messaging system. All start methods like `start()`, `start_ssl()`, etc returns +address of the started http server. Actix http server accept several messages: + +* `PauseServer` - Pause accepting incoming connections +* `ResumeServer` - Resume accepting incoming connections +* `StopServer` - Stop incoming connection processing, stop all workers and exit + +```rust +# extern crate futures; +# extern crate actix; +# extern crate actix_web; +# use futures::Future; +use actix_web::*; + +fn main() { + let addr = HttpServer::new( + || Application::new() + .resource("/", |r| r.h(httpcodes::HTTPOk))) + .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") + .spawn(); + + let _ = addr.call_fut(dev::StopServer).wait(); // <- Send `StopServer` message to server. +} +``` ## Multi-threading diff --git a/src/server.rs b/src/server.rs index 6552e2e3c..cf7276259 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,6 +7,7 @@ use std::marker::PhantomData; use std::collections::HashMap; use actix::dev::*; +use actix::System; use futures::Stream; use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; @@ -107,8 +108,13 @@ pub struct HttpServer workers: Vec>>, sockets: HashMap, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, + spawned: bool, } +unsafe impl Sync for HttpServer where H: 'static {} +unsafe impl Send for HttpServer where H: 'static {} + + impl Actor for HttpServer { type Context = Context; @@ -146,6 +152,7 @@ impl HttpServer workers: Vec::new(), sockets: HashMap::new(), accept: Vec::new(), + spawned: false, } } @@ -206,15 +213,13 @@ impl HttpServer pub fn bind(mut self, addr: S) -> io::Result { let mut err = None; let mut succ = false; - if let Ok(iter) = addr.to_socket_addrs() { - for addr in iter { - match create_tcp_listener(addr, self.backlog) { - Ok(lst) => { - succ = true; - self.sockets.insert(lst.local_addr().unwrap(), lst); - }, - Err(e) => err = Some(e), - } + for addr in addr.to_socket_addrs()? { + match create_tcp_listener(addr, self.backlog) { + Ok(lst) => { + succ = true; + self.sockets.insert(lst.local_addr().unwrap(), lst); + }, + Err(e) => err = Some(e), } } @@ -282,7 +287,7 @@ impl HttpServer /// HttpServer::new( /// || Application::new() /// .resource("/", |r| r.h(httpcodes::HTTPOk))) - /// .bind("127.0.0.1:8088").expect("Can not bind to 127.0.0.1:8088") + /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") /// .start(); /// # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); /// @@ -310,6 +315,43 @@ impl HttpServer HttpServer::create(|_| {self}) } } + + /// Spawn new thread and start listening for incomming connections. + /// + /// This method spawns new thread and starts new actix system. Other than that it is + /// similar to `start()` method. This method does not block. + /// + /// This methods panics if no socket addresses get bound. + /// + /// ```rust + /// # extern crate futures; + /// # extern crate actix; + /// # extern crate actix_web; + /// # use futures::Future; + /// use actix_web::*; + /// + /// fn main() { + /// let addr = HttpServer::new( + /// || Application::new() + /// .resource("/", |r| r.h(httpcodes::HTTPOk))) + /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") + /// .spawn(); + /// + /// let _ = addr.call_fut(dev::StopServer).wait(); // <- Send `StopServer` message to server. + /// } + /// ``` + pub fn spawn(mut self) -> SyncAddress { + self.spawned = true; + + let (tx, rx) = sync_mpsc::channel(); + thread::spawn(move || { + let sys = System::new("http-server"); + let addr = self.start(); + let _ = tx.send(addr); + sys.run(); + }); + rx.recv().unwrap() + } } #[cfg(feature="tls")] @@ -465,15 +507,20 @@ impl Handler, io::Error> for HttpServer } } -/// Pause connection accepting process +/// Pause accepting incoming connections +/// +/// If socket contains some pending connection, they might be dropped. +/// All opened connection remains active. #[derive(Message)] pub struct PauseServer; -/// Resume connection accepting process +/// Resume accepting incoming connections #[derive(Message)] pub struct ResumeServer; -/// Stop connection processing and exit +/// Stop incoming connection processing, stop all workers and exit. +/// +/// If server starts with `spawn()` method, then spawned thread get terminated. #[derive(Message)] pub struct StopServer; @@ -522,6 +569,11 @@ impl Handler for HttpServer let _ = item.0.set_readiness(mio::Ready::readable()); } ctx.stop(); + + // we need to stop system if server was spawned + if self.spawned { + Arbiter::system().send(msgs::SystemExit(0)) + } Self::empty() } }