1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-20 08:50:32 +01:00

use actix-rt for server impl

This commit is contained in:
Nikolay Kim 2018-12-09 21:51:35 -08:00
parent cdd6904aa0
commit ffb07c8884
14 changed files with 270 additions and 292 deletions

View File

@ -48,7 +48,6 @@ rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
cell = [] cell = []
[dependencies] [dependencies]
actix = "0.7.6"
actix-service = "0.1.1" actix-service = "0.1.1"
actix-codec = { path = "actix-codec" } actix-codec = { path = "actix-codec" }
actix-rt = { path = "actix-rt" } actix-rt = { path = "actix-rt" }
@ -62,13 +61,12 @@ net2 = "0.2"
bytes = "0.4" bytes = "0.4"
futures = "0.1" futures = "0.1"
slab = "0.4" slab = "0.4"
tokio = "0.1"
tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-tcp = "0.1" tokio-tcp = "0.1"
tokio-timer = "0.2" tokio-timer = "0.2"
tokio-reactor = "0.1" tokio-reactor = "0.1"
tokio-current-thread = "0.1" tokio-signal = "0.2"
trust-dns-proto = "^0.5.0" trust-dns-proto = "^0.5.0"
trust-dns-resolver = "^0.10.0" trust-dns-resolver = "^0.10.0"

View File

@ -234,6 +234,7 @@ impl Future for SystemArbiter {
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.commands.poll() { match self.commands.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(cmd))) => match cmd { Ok(Async::Ready(Some(cmd))) => match cmd {
@ -256,7 +257,7 @@ impl Future for SystemArbiter {
}, },
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
} }
Ok(Async::NotReady) }
} }
} }

View File

@ -1,15 +1,6 @@
//! simple composite service //! simple composite service
//! build: cargo run --example basic --features "ssl" //! build: cargo run --example basic --features "ssl"
//! to test: curl https://127.0.0.1:8443/ -k //! to test: curl https://127.0.0.1:8443/ -k
extern crate actix;
extern crate actix_net;
extern crate env_logger;
extern crate futures;
extern crate openssl;
extern crate tokio_io;
extern crate tokio_openssl;
extern crate tokio_tcp;
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
@ -22,7 +13,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::SslAcceptorExt; use tokio_openssl::SslAcceptorExt;
use actix_net::server::Server; use actix_net::server::Server;
use actix_net::service::{IntoNewService, NewServiceExt}; use actix_rt::System;
use actix_service::{IntoNewService, NewService};
/// Simple logger service, it just prints fact of the new connections /// Simple logger service, it just prints fact of the new connections
fn logger<T: AsyncRead + AsyncWrite + fmt::Debug>( fn logger<T: AsyncRead + AsyncWrite + fmt::Debug>(
@ -36,7 +28,7 @@ fn main() {
env::set_var("RUST_LOG", "actix_net=trace"); env::set_var("RUST_LOG", "actix_net=trace");
env_logger::init(); env_logger::init();
let sys = actix::System::new("test"); let sys = System::new("test");
// load ssl keys // load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
@ -53,7 +45,7 @@ fn main() {
// bind socket address and start workers. By default server uses number of // bind socket address and start workers. By default server uses number of
// available logical cpu as threads count. actix net start separate // available logical cpu as threads count. actix net start separate
// instances of service pipeline in each worker. // instances of service pipeline in each worker.
Server::default() Server::build()
.bind( .bind(
// configure service pipeline // configure service pipeline
"basic", "basic",

View File

@ -1,10 +1,3 @@
extern crate actix;
extern crate actix_net;
extern crate futures;
extern crate openssl;
extern crate tokio_io;
extern crate tokio_tcp;
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
@ -15,8 +8,9 @@ use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use actix_net::server::Server; use actix_net::server::Server;
use actix_net::service::NewServiceExt;
use actix_net::ssl; use actix_net::ssl;
use actix_rt::System;
use actix_service::NewService;
#[derive(Debug)] #[derive(Debug)]
struct ServiceState { struct ServiceState {
@ -33,7 +27,7 @@ fn service<T: AsyncRead + AsyncWrite>(
} }
fn main() { fn main() {
let sys = actix::System::new("test"); let sys = System::new("test");
// load ssl keys // load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
@ -48,7 +42,7 @@ fn main() {
let openssl = ssl::OpensslAcceptor::new(builder.build()); let openssl = ssl::OpensslAcceptor::new(builder.build());
// server start mutiple workers, it runs supplied `Fn` in each worker. // server start mutiple workers, it runs supplied `Fn` in each worker.
Server::default() Server::build()
.bind("test-ssl", "0.0.0.0:8443", move || { .bind("test-ssl", "0.0.0.0:8443", move || {
let num = num.clone(); let num = num.clone();

View File

@ -2,13 +2,12 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem; use std::mem;
use actix; use actix_codec::{Decoder, Encoder, Framed};
use actix_codec::Framed; use actix_rt::Arbiter;
use actix_service::{IntoNewService, IntoService, NewService, Service}; use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use futures::unsync::mpsc; use futures::unsync::mpsc;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use tokio_codec::{Decoder, Encoder};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
type Request<U> = <U as Decoder>::Item; type Request<U> = <U as Decoder>::Item;
@ -256,7 +255,7 @@ where
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
if let Some(item) = self.request.take() { if let Some(item) = self.request.take() {
let sender = self.write_tx.clone(); let sender = self.write_tx.clone();
actix::Arbiter::spawn( Arbiter::spawn(
self.service self.service
.call(item) .call(item)
.then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())),
@ -281,7 +280,7 @@ where
match self.service.poll_ready() { match self.service.poll_ready() {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
let sender = self.write_tx.clone(); let sender = self.write_tx.clone();
actix::Arbiter::spawn( Arbiter::spawn(
self.service self.service
.call(item) .call(item)
.then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())),

View File

@ -4,8 +4,8 @@ use std::net::IpAddr;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use actix_rt::spawn;
use actix_service::Service; use actix_service::Service;
use tokio_current_thread::spawn;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
pub use trust_dns_resolver::error::ResolveError; pub use trust_dns_resolver::error::ResolveError;
use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::lookup_ip::LookupIpFuture;

View File

@ -2,15 +2,14 @@ use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{io, net, thread}; use std::{io, net, thread};
use futures::{sync::mpsc, Future}; use actix_rt::System;
use futures::future::{lazy, Future};
use log::{error, info}; use log::{error, info};
use mio; use mio;
use slab::Slab; use slab::Slab;
use tokio_timer::Delay; use tokio_timer::Delay;
use actix::{msgs::Execute, Arbiter, System}; use super::server::Server;
use super::server::ServerCommand;
use super::worker::{Conn, WorkerClient}; use super::worker::{Conn, WorkerClient};
use super::Token; use super::Token;
@ -54,14 +53,11 @@ pub(crate) struct AcceptLoop {
notify_ready: mio::SetReadiness, notify_ready: mio::SetReadiness,
tx: sync_mpsc::Sender<Command>, tx: sync_mpsc::Sender<Command>,
rx: Option<sync_mpsc::Receiver<Command>>, rx: Option<sync_mpsc::Receiver<Command>>,
srv: Option<( srv: Option<Server>,
mpsc::UnboundedSender<ServerCommand>,
mpsc::UnboundedReceiver<ServerCommand>,
)>,
} }
impl AcceptLoop { impl AcceptLoop {
pub fn new() -> AcceptLoop { pub fn new(srv: Server) -> AcceptLoop {
let (tx, rx) = sync_mpsc::channel(); let (tx, rx) = sync_mpsc::channel();
let (cmd_reg, cmd_ready) = mio::Registration::new2(); let (cmd_reg, cmd_ready) = mio::Registration::new2();
let (notify_reg, notify_ready) = mio::Registration::new2(); let (notify_reg, notify_ready) = mio::Registration::new2();
@ -73,7 +69,7 @@ impl AcceptLoop {
notify_ready, notify_ready,
notify_reg: Some(notify_reg), notify_reg: Some(notify_reg),
rx: Some(rx), rx: Some(rx),
srv: Some(mpsc::unbounded()), srv: Some(srv),
} }
} }
@ -90,18 +86,17 @@ impl AcceptLoop {
&mut self, &mut self,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, net::TcpListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) -> mpsc::UnboundedReceiver<ServerCommand> { ) {
let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
Accept::start( Accept::start(
self.rx.take().expect("Can not re-use AcceptInfo"), self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"), self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"), self.notify_reg.take().expect("Can not re-use AcceptInfo"),
socks, socks,
tx, srv,
workers, workers,
); );
rx
} }
} }
@ -110,7 +105,7 @@ struct Accept {
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>, sockets: Slab<ServerSocketInfo>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
srv: mpsc::UnboundedSender<ServerCommand>, srv: Server,
timer: (mio::Registration, mio::SetReadiness), timer: (mio::Registration, mio::SetReadiness),
next: usize, next: usize,
backpressure: bool, backpressure: bool,
@ -141,7 +136,7 @@ impl Accept {
cmd_reg: mio::Registration, cmd_reg: mio::Registration,
notify_reg: mio::Registration, notify_reg: mio::Registration,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, net::TcpListener)>,
srv: mpsc::UnboundedSender<ServerCommand>, srv: Server,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) { ) {
let sys = System::current(); let sys = System::current();
@ -181,7 +176,7 @@ impl Accept {
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, net::TcpListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
srv: mpsc::UnboundedSender<ServerCommand>, srv: Server,
) -> Accept { ) -> Accept {
// Create a poll instance // Create a poll instance
let poll = match mio::Poll::new() { let poll = match mio::Poll::new() {
@ -376,9 +371,7 @@ impl Accept {
match self.workers[self.next].send(msg) { match self.workers[self.next].send(msg) {
Ok(_) => (), Ok(_) => (),
Err(tmp) => { Err(tmp) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( let _ = self.srv.worker_died(self.workers[self.next].idx);
self.workers[self.next].idx,
));
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {
@ -404,9 +397,7 @@ impl Accept {
return; return;
} }
Err(tmp) => { Err(tmp) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( let _ = self.srv.worker_died(self.workers[self.next].idx);
self.workers[self.next].idx,
));
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {
@ -449,19 +440,14 @@ impl Accept {
info.timeout = Some(Instant::now() + Duration::from_millis(500)); info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone(); let r = self.timer.1.clone();
System::current().arbiter().do_send(Execute::new( System::current().arbiter().send(lazy(move || {
move || -> Result<(), ()> {
Arbiter::spawn(
Delay::new(Instant::now() + Duration::from_millis(510)) Delay::new(Instant::now() + Duration::from_millis(510))
.map_err(|_| ()) .map_err(|_| ())
.and_then(move |_| { .and_then(move |_| {
let _ = r.set_readiness(mio::Ready::readable()); let _ = r.set_readiness(mio::Ready::readable());
Ok(()) Ok(())
}), })
); }));
Ok(())
},
));
return; return;
} }
} }

View File

@ -1,32 +1,27 @@
use std::time::Duration; use std::time::Duration;
use std::{io, mem, net}; use std::{io, mem, net};
use actix_rt::Arbiter; use actix_rt::{spawn, Arbiter, System};
use futures::sync::{mpsc, mpsc::unbounded}; use futures::future::{lazy, ok};
use futures::{future::lazy, Future, Sink, Stream}; use futures::stream::futures_unordered;
use futures::sync::mpsc::{unbounded, UnboundedReceiver};
use futures::{Async, Future, Poll, Stream};
use log::{error, info}; use log::{error, info};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_timer::sleep;
use actix::{ // use actix::{actors::signal};
actors::signal, fut, Actor, ActorFuture, Addr, AsyncContext, Context, Handler, Response,
StreamHandler, System, WrapFuture,
};
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig}; use super::config::{ConfiguredService, ServiceConfig};
use super::server::{Server, ServerCommand};
use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory}; use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory};
use super::services::{ServiceFactory, ServiceNewService}; use super::services::{ServiceFactory, ServiceNewService};
use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{PauseServer, ResumeServer, StopServer, Token}; use super::Token;
pub(crate) enum ServerCommand {
WorkerDied(usize),
}
/// Server
pub struct Server {}
/// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: Token,
@ -36,24 +31,35 @@ pub struct ServerBuilder {
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
shutdown_timeout: Duration, shutdown_timeout: Duration,
signals: Option<Addr<signal::ProcessSignals>>,
no_signals: bool, no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>,
server: Server,
}
impl Default for ServerBuilder {
fn default() -> Self {
Self::new()
}
} }
impl ServerBuilder { impl ServerBuilder {
/// Create new Server instance /// Create new Server builder instance
pub fn new() -> Server { pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded();
let server = Server::new(tx);
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
token: Token(0), token: Token(0),
workers: Vec::new(), workers: Vec::new(),
services: Vec::new(), services: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
accept: AcceptLoop::new(), accept: AcceptLoop::new(server.clone()),
exit: false, exit: false,
shutdown_timeout: Duration::from_secs(30), shutdown_timeout: Duration::from_secs(30),
signals: None,
no_signals: false, no_signals: false,
cmd: rx,
server,
} }
} }
@ -85,13 +91,6 @@ impl ServerBuilder {
self self
} }
#[doc(hidden)]
/// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
/// Disable signal handling /// Disable signal handling
pub fn disable_signals(mut self) -> Self { pub fn disable_signals(mut self) -> Self {
self.no_signals = true; self.no_signals = true;
@ -115,7 +114,7 @@ impl ServerBuilder {
/// ///
/// This function is useful for moving parts of configuration to a /// This function is useful for moving parts of configuration to a
/// different module or even library. /// different module or even library.
pub fn configure<F>(mut self, f: F) -> io::Result<Server> pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder>
where where
F: Fn(&mut ServiceConfig) -> io::Result<()>, F: Fn(&mut ServiceConfig) -> io::Result<()>,
{ {
@ -134,7 +133,7 @@ impl ServerBuilder {
Ok(self) Ok(self)
} }
/// Add new service to server /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: StreamServiceFactory, F: StreamServiceFactory,
@ -155,7 +154,7 @@ impl ServerBuilder {
Ok(self) Ok(self)
} }
/// Add new service to server /// Add new service to the server.
pub fn listen<F, N: AsRef<str>>( pub fn listen<F, N: AsRef<str>>(
mut self, mut self,
name: N, name: N,
@ -175,7 +174,7 @@ impl ServerBuilder {
self self
} }
/// Add new service to server /// Add new service to the server.
pub fn listen2<F, N: AsRef<str>>( pub fn listen2<F, N: AsRef<str>>(
mut self, mut self,
name: N, name: N,
@ -223,10 +222,10 @@ impl ServerBuilder {
sys.run(); sys.run();
} }
/// Starts Server Actor and returns its address /// Starts processing incoming connections and return server controller.
pub fn start(mut self) -> Server { pub fn start(mut self) -> Server {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("Service should have at least one bound socket"); panic!("Server should have at least one bound socket");
} else { } else {
info!("Starting {} workers", self.threads); info!("Starting {} workers", self.threads);
@ -242,33 +241,17 @@ impl ServerBuilder {
for sock in &self.sockets { for sock in &self.sockets {
info!("Starting server on {}", sock.1.local_addr().ok().unwrap()); info!("Starting server on {}", sock.1.local_addr().ok().unwrap());
} }
let rx = self self.accept
.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers); .start(mem::replace(&mut self.sockets, Vec::new()), workers);
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); // let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| { // if let Some(signals) = signals {
ctx.add_stream(rx); // signals.do_send(signal::Subscribe(addr.clone().recipient()))
self // }
}); let server = self.server.clone();
if let Some(signals) = signals { spawn(self);
signals.do_send(signal::Subscribe(addr.clone().recipient())) server
}
addr
}
}
// subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
Some(signals.clone())
} else {
Some(System::current().registry().get::<signal::ProcessSignals>())
}
} else {
None
} }
} }
@ -290,109 +273,92 @@ impl ServerBuilder {
} }
} }
impl Actor for Server { // /// Signals support
type Context = Context<Self>; // /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
} // /// message to `System` actor.
// impl Handler<signal::Signal> for Server {
// type Result = ();
/// Signals support // fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system // match msg.0 {
/// message to `System` actor. // signal::SignalType::Int => {
impl Handler<signal::Signal> for Server { // info!("SIGINT received, exiting");
type Result = (); // self.exit = true;
// Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
// }
// signal::SignalType::Term => {
// info!("SIGTERM received, stopping");
// self.exit = true;
// Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
// }
// signal::SignalType::Quit => {
// info!("SIGQUIT received, exiting");
// self.exit = true;
// Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
// }
// _ => (),
// }
// }
// }
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) { impl Future for ServerBuilder {
match msg.0 { type Item = ();
signal::SignalType::Int => { type Error = ();
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
_ => (),
}
}
}
impl Handler<PauseServer> for Server { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
type Result = (); loop {
match self.cmd.poll() {
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) { Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(item))) => match item {
ServerCommand::Pause(tx) => {
self.accept.send(Command::Pause); self.accept.send(Command::Pause);
let _ = tx.send(());
} }
} ServerCommand::Resume(tx) => {
impl Handler<ResumeServer> for Server {
type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
self.accept.send(Command::Resume); self.accept.send(Command::Resume);
let _ = tx.send(());
} }
} ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
impl Handler<StopServer> for Server {
type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
// stop accept thread // stop accept thread
self.accept.send(Command::Stop); self.accept.send(Command::Stop);
// stop workers // stop workers
let (tx, rx) = mpsc::channel(1);
for worker in &self.workers {
let tx2 = tx.clone();
ctx.spawn(
worker
.1
.stop(msg.graceful)
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned
if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
}
fut::ok(())
}),
);
}
if !self.workers.is_empty() { if !self.workers.is_empty() {
Response::r#async(rx.into_future().map(|_| ()).map_err(|_| ())) spawn(
futures_unordered(
self.workers
.iter()
.map(move |worker| worker.1.stop(graceful)),
)
.collect()
.then(move |_| {
let _ = completion.send(());
if exit {
spawn(sleep(Duration::from_millis(300)).then(|_| {
System::current().stop();
ok(())
}));
}
ok(())
}),
)
} else { } else {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
ctx.run_later(Duration::from_millis(300), |_, _| { spawn(sleep(Duration::from_millis(300)).then(|_| {
System::current().stop(); System::current().stop();
}); ok(())
}));
} }
Response::reply(Ok(())) let _ = completion.send(());
} }
} }
}
/// Commands from accept threads
impl StreamHandler<ServerCommand, ()> for Server {
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx) => { ServerCommand::WorkerDied(idx) => {
let mut found = false; let mut found = false;
for i in 0..self.workers.len() { for i in 0..self.workers.len() {
@ -422,6 +388,8 @@ impl StreamHandler<ServerCommand, ()> for Server {
self.accept.send(Command::Worker(worker)); self.accept.send(Command::Worker(worker));
} }
} }
},
}
} }
} }
} }

View File

@ -8,7 +8,7 @@ use tokio_tcp::TcpStream;
use crate::counter::CounterGuard; use crate::counter::CounterGuard;
use super::server::bind_addr; use super::builder::bind_addr;
use super::services::{ use super::services::{
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
}; };

View File

@ -1,40 +1,17 @@
//! General purpose networking server //! General purpose tcp server
use actix::Message;
mod accept; mod accept;
mod builder; mod builder;
mod config; mod config;
mod server;
mod services; mod services;
mod worker; mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server;
pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory}; pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory};
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
#[derive(Message)]
pub struct PauseServer;
/// Resume accepting incoming connections
#[derive(Message)]
pub struct ResumeServer;
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub struct StopServer {
/// Whether to try and shut down gracefully
pub graceful: bool,
}
impl Message for StopServer {
type Result = Result<(), ()>;
}
/// Socket id token /// Socket id token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize); pub(crate) struct Token(usize);

63
src/server/server.rs Normal file
View File

@ -0,0 +1,63 @@
use futures::sync::mpsc::UnboundedSender;
use futures::sync::oneshot;
use futures::Future;
use super::builder::ServerBuilder;
pub(crate) enum ServerCommand {
WorkerDied(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
completion: oneshot::Sender<()>,
},
}
#[derive(Clone)]
pub struct Server(UnboundedSender<ServerCommand>);
impl Server {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
pub(crate) fn worker_died(&self, idx: usize) {
let _ = self.0.unbounded_send(ServerCommand::WorkerDied(idx));
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Item = (), Error = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
rx.map_err(|_| ())
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Item = (), Error = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
rx.map_err(|_| ())
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Item = (), Error = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Stop {
graceful,
completion: tx,
});
rx.map_err(|_| ())
}
}

View File

@ -1,11 +1,11 @@
use std::net; use std::net;
use std::time::Duration; use std::time::Duration;
use actix_rt::spawn;
use actix_service::{NewService, Service}; use actix_service::{NewService, Service};
use futures::future::{err, ok, FutureResult}; use futures::future::{err, ok, FutureResult};
use futures::{Future, Poll}; use futures::{Future, Poll};
use log::error; use log::error;
use tokio_current_thread::spawn;
use tokio_reactor::Handle; use tokio_reactor::Handle;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;

View File

@ -1,9 +1,9 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use actix_rt::spawn;
use actix_service::{IntoService, NewService, Service}; use actix_service::{IntoService, NewService, Service};
use futures::unsync::mpsc; use futures::unsync::mpsc;
use futures::{future, Async, Future, Poll, Stream}; use futures::{future, Async, Future, Poll, Stream};
use tokio_current_thread::spawn;
pub struct StreamDispatcher<S: Stream, T> { pub struct StreamDispatcher<S: Stream, T> {
stream: S, stream: S,

View File

@ -1,9 +1,9 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use actix_rt::spawn;
use actix_service::{NewService, Service}; use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use tokio_current_thread::spawn;
use tokio_timer::sleep; use tokio_timer::sleep;
use super::cell::Cell; use super::cell::Cell;