1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

add server spawn method

This commit is contained in:
Nikolay Kim 2017-12-27 17:49:10 -08:00
parent 4d741b4de5
commit 556de72932
3 changed files with 95 additions and 14 deletions

View File

@ -68,4 +68,4 @@ This project is licensed under either of
at your option. at your option.
[![Analytics](https://ga-beacon.appspot.com/UA-110322332-2/actix-web/readme?pixel)](https://github.com/igrigorik/ga-beacon) [![Analytics](https://ga-beacon.appspot.com/UA-110322332-2/actix-web/readme?flat&useReferer)](https://github.com/igrigorik/ga-beacon)

View File

@ -29,6 +29,35 @@ fn main() {
} }
``` ```
It is possible to start server in separate thread with *spawn()* method. In that
case server spawns new thread and create new actix system in it. To stop
this server send `StopServer` message.
Http server is implemented as an actix actor. It is possible to communicate with server
via messaging system. All start methods like `start()`, `start_ssl()`, etc returns
address of the started http server. Actix http server accept several messages:
* `PauseServer` - Pause accepting incoming connections
* `ResumeServer` - Resume accepting incoming connections
* `StopServer` - Stop incoming connection processing, stop all workers and exit
```rust
# extern crate futures;
# extern crate actix;
# extern crate actix_web;
# use futures::Future;
use actix_web::*;
fn main() {
let addr = HttpServer::new(
|| Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk)))
.bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
.spawn();
let _ = addr.call_fut(dev::StopServer).wait(); // <- Send `StopServer` message to server.
}
```
## Multi-threading ## Multi-threading

View File

@ -7,6 +7,7 @@ use std::marker::PhantomData;
use std::collections::HashMap; use std::collections::HashMap;
use actix::dev::*; use actix::dev::*;
use actix::System;
use futures::Stream; use futures::Stream;
use futures::sync::mpsc; use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -107,8 +108,13 @@ pub struct HttpServer<T, A, H, U>
workers: Vec<SyncAddress<Worker<H>>>, workers: Vec<SyncAddress<Worker<H>>>,
sockets: HashMap<net::SocketAddr, net::TcpListener>, sockets: HashMap<net::SocketAddr, net::TcpListener>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
spawned: bool,
} }
unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: 'static {}
unsafe impl<T, A, H, U> Send for HttpServer<T, A, H, U> where H: 'static {}
impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> { impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> {
type Context = Context<Self>; type Context = Context<Self>;
@ -146,6 +152,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
workers: Vec::new(), workers: Vec::new(),
sockets: HashMap::new(), sockets: HashMap::new(),
accept: Vec::new(), accept: Vec::new(),
spawned: false,
} }
} }
@ -206,8 +213,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
pub fn bind<S: net::ToSocketAddrs>(mut self, addr: S) -> io::Result<Self> { pub fn bind<S: net::ToSocketAddrs>(mut self, addr: S) -> io::Result<Self> {
let mut err = None; let mut err = None;
let mut succ = false; let mut succ = false;
if let Ok(iter) = addr.to_socket_addrs() { for addr in addr.to_socket_addrs()? {
for addr in iter {
match create_tcp_listener(addr, self.backlog) { match create_tcp_listener(addr, self.backlog) {
Ok(lst) => { Ok(lst) => {
succ = true; succ = true;
@ -216,7 +222,6 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
Err(e) => err = Some(e), Err(e) => err = Some(e),
} }
} }
}
if !succ { if !succ {
if let Some(e) = err.take() { if let Some(e) = err.take() {
@ -282,7 +287,7 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
/// HttpServer::new( /// HttpServer::new(
/// || Application::new() /// || Application::new()
/// .resource("/", |r| r.h(httpcodes::HTTPOk))) /// .resource("/", |r| r.h(httpcodes::HTTPOk)))
/// .bind("127.0.0.1:8088").expect("Can not bind to 127.0.0.1:8088") /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
/// .start(); /// .start();
/// # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); /// # actix::Arbiter::system().send(actix::msgs::SystemExit(0));
/// ///
@ -310,6 +315,43 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
HttpServer::create(|_| {self}) HttpServer::create(|_| {self})
} }
} }
/// Spawn new thread and start listening for incomming connections.
///
/// This method spawns new thread and starts new actix system. Other than that it is
/// similar to `start()` method. This method does not block.
///
/// This methods panics if no socket addresses get bound.
///
/// ```rust
/// # extern crate futures;
/// # extern crate actix;
/// # extern crate actix_web;
/// # use futures::Future;
/// use actix_web::*;
///
/// fn main() {
/// let addr = HttpServer::new(
/// || Application::new()
/// .resource("/", |r| r.h(httpcodes::HTTPOk)))
/// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
/// .spawn();
///
/// let _ = addr.call_fut(dev::StopServer).wait(); // <- Send `StopServer` message to server.
/// }
/// ```
pub fn spawn(mut self) -> SyncAddress<Self> {
self.spawned = true;
let (tx, rx) = sync_mpsc::channel();
thread::spawn(move || {
let sys = System::new("http-server");
let addr = self.start();
let _ = tx.send(addr);
sys.run();
});
rx.recv().unwrap()
}
} }
#[cfg(feature="tls")] #[cfg(feature="tls")]
@ -465,15 +507,20 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
} }
} }
/// Pause connection accepting process /// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
#[derive(Message)] #[derive(Message)]
pub struct PauseServer; pub struct PauseServer;
/// Resume connection accepting process /// Resume accepting incoming connections
#[derive(Message)] #[derive(Message)]
pub struct ResumeServer; pub struct ResumeServer;
/// Stop connection processing and exit /// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
#[derive(Message)] #[derive(Message)]
pub struct StopServer; pub struct StopServer;
@ -522,6 +569,11 @@ impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U>
let _ = item.0.set_readiness(mio::Ready::readable()); let _ = item.0.set_readiness(mio::Ready::readable());
} }
ctx.stop(); ctx.stop();
// we need to stop system if server was spawned
if self.spawned {
Arbiter::system().send(msgs::SystemExit(0))
}
Self::empty() Self::empty()
} }
} }