1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-29 16:14:58 +02:00

move server impl to sub module

This commit is contained in:
Nikolay Kim
2018-09-11 08:43:23 -07:00
parent 56b31960f1
commit dc50268c8a
8 changed files with 47 additions and 42 deletions

468
src/server/accept.rs Normal file
View File

@ -0,0 +1,468 @@
use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant};
use std::{io, net, thread};
use futures::{sync::mpsc, Future};
use mio;
use slab::Slab;
use tokio_timer::Delay;
use actix::{msgs::Execute, Arbiter, System};
use super::server::ServerCommand;
use super::worker::{Conn, WorkerClient};
use super::Token;
pub(crate) enum Command {
Pause,
Resume,
Stop,
Worker(WorkerClient),
}
struct ServerSocketInfo {
addr: net::SocketAddr,
token: Token,
handler: Token,
sock: mio::net::TcpListener,
timeout: Option<Instant>,
}
#[derive(Clone)]
pub(crate) struct AcceptNotify(mio::SetReadiness);
impl AcceptNotify {
pub(crate) fn new(ready: mio::SetReadiness) -> Self {
AcceptNotify(ready)
}
pub(crate) fn notify(&self) {
let _ = self.0.set_readiness(mio::Ready::readable());
}
}
impl Default for AcceptNotify {
fn default() -> Self {
AcceptNotify::new(mio::Registration::new2().1)
}
}
pub(crate) struct AcceptLoop {
cmd_reg: Option<mio::Registration>,
cmd_ready: mio::SetReadiness,
notify_reg: Option<mio::Registration>,
notify_ready: mio::SetReadiness,
tx: sync_mpsc::Sender<Command>,
rx: Option<sync_mpsc::Receiver<Command>>,
srv: Option<(
mpsc::UnboundedSender<ServerCommand>,
mpsc::UnboundedReceiver<ServerCommand>,
)>,
}
impl AcceptLoop {
pub fn new() -> AcceptLoop {
let (tx, rx) = sync_mpsc::channel();
let (cmd_reg, cmd_ready) = mio::Registration::new2();
let (notify_reg, notify_ready) = mio::Registration::new2();
AcceptLoop {
tx,
cmd_ready,
cmd_reg: Some(cmd_reg),
notify_ready,
notify_reg: Some(notify_reg),
rx: Some(rx),
srv: Some(mpsc::unbounded()),
}
}
pub fn send(&self, msg: Command) {
let _ = self.tx.send(msg);
let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
}
pub fn get_notify(&self) -> AcceptNotify {
AcceptNotify::new(self.notify_ready.clone())
}
pub(crate) fn start(
&mut self, socks: Vec<(Token, net::TcpListener)>, workers: Vec<WorkerClient>,
) -> mpsc::UnboundedReceiver<ServerCommand> {
let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo");
Accept::start(
self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"),
socks,
tx,
workers,
);
rx
}
}
struct Accept {
poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>,
workers: Vec<WorkerClient>,
srv: mpsc::UnboundedSender<ServerCommand>,
timer: (mio::Registration, mio::SetReadiness),
next: usize,
backpressure: bool,
}
const DELTA: usize = 100;
const CMD: mio::Token = mio::Token(0);
const TIMER: mio::Token = mio::Token(1);
const NOTIFY: mio::Token = mio::Token(2);
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
impl Accept {
#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>, cmd_reg: mio::Registration,
notify_reg: mio::Registration, socks: Vec<(Token, net::TcpListener)>,
srv: mpsc::UnboundedSender<ServerCommand>, workers: Vec<WorkerClient>,
) {
let sys = System::current();
// start accept thread
let _ = thread::Builder::new()
.name("actix-web accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let mut accept = Accept::new(rx, socks, workers, srv);
// Start listening for incoming commands
if let Err(err) = accept.poll.register(
&cmd_reg,
CMD,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
// Start listening for notify updates
if let Err(err) = accept.poll.register(
&notify_reg,
NOTIFY,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
accept.poll();
});
}
fn new(
rx: sync_mpsc::Receiver<Command>, socks: Vec<(Token, net::TcpListener)>,
workers: Vec<WorkerClient>, srv: mpsc::UnboundedSender<ServerCommand>,
) -> Accept {
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Start accept
let mut sockets = Slab::new();
for (idx, (hnd_token, lst)) in socks.into_iter().enumerate() {
let addr = lst.local_addr().unwrap();
let server = mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener");
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
}
entry.insert(ServerSocketInfo {
addr,
token: hnd_token,
handler: Token(idx),
sock: server,
timeout: None,
});
}
// Timer
let (tm, tmr) = mio::Registration::new2();
if let Err(err) =
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
Accept {
poll,
rx,
sockets,
workers,
srv,
next: 0,
timer: (tm, tmr),
backpressure: false,
}
}
fn poll(&mut self) {
// Create storage for events
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(err) = self.poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
for event in events.iter() {
let token = event.token();
match token {
CMD => if !self.process_cmd() {
return;
},
TIMER => self.process_timer(),
NOTIFY => self.backpressure(false),
_ => {
let token = usize::from(token);
if token < DELTA {
continue;
}
self.accept(token - DELTA);
}
}
}
}
}
fn process_timer(&mut self) {
let now = Instant::now();
for (token, info) in self.sockets.iter_mut() {
if let Some(inst) = info.timeout.take() {
if now > inst {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
} else {
info.timeout = Some(inst);
}
}
}
}
fn process_cmd(&mut self) -> bool {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => {
for (_, info) in self.sockets.iter_mut() {
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
}
Command::Resume => {
for (token, info) in self.sockets.iter() {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
Command::Stop => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
Command::Worker(worker) => {
self.backpressure(false);
self.workers.push(worker);
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => break,
sync_mpsc::TryRecvError::Disconnected => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
},
}
}
true
}
fn backpressure(&mut self, on: bool) {
if self.backpressure {
if !on {
self.backpressure = false;
for (token, info) in self.sockets.iter() {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", info.addr);
}
}
}
} else if on {
self.backpressure = true;
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
}
}
fn accept_one(&mut self, mut msg: Conn) {
if self.backpressure {
while !self.workers.is_empty() {
match self.workers[self.next].send(msg) {
Ok(_) => (),
Err(err) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx,
));
msg = err.into_inner();
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
self.next = (self.next + 1) % self.workers.len();
break;
}
} else {
let mut idx = 0;
while idx < self.workers.len() {
idx += 1;
if self.workers[self.next].available() {
match self.workers[self.next].send(msg) {
Ok(_) => {
self.next = (self.next + 1) % self.workers.len();
return;
}
Err(err) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx,
));
msg = err.into_inner();
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
self.backpressure(true);
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
}
self.next = (self.next + 1) % self.workers.len();
}
// enable backpressure
self.backpressure(true);
self.accept_one(msg);
}
}
fn accept(&mut self, token: usize) {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept_std() {
Ok((io, addr)) => Conn {
io,
token: info.token,
handler: info.handler,
peer: Some(addr),
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
}
// sleep after error
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone();
System::current().arbiter().do_send(Execute::new(
move || -> Result<(), ()> {
Arbiter::spawn(
Delay::new(Instant::now() + Duration::from_millis(510))
.map_err(|_| ())
.and_then(move |_| {
let _ = r.set_readiness(mio::Ready::readable());
Ok(())
}),
);
Ok(())
},
));
return;
}
}
} else {
return;
};
self.accept_one(msg);
}
}
}

40
src/server/mod.rs Normal file
View File

@ -0,0 +1,40 @@
//! General purpose networking server
use actix::Message;
mod accept;
mod server;
mod services;
mod worker;
pub use self::server::Server;
pub use self::services::ServerServiceFactory;
pub(crate) use self::worker::Connections;
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
#[derive(Message)]
pub struct PauseServer;
/// Resume accepting incoming connections
#[derive(Message)]
pub struct ResumeServer;
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub struct StopServer {
/// Whether to try and shut down gracefully
pub graceful: bool,
}
impl Message for StopServer {
type Result = Result<(), ()>;
}
/// Socket id token
#[derive(Clone, Copy)]
pub(crate) struct Token(usize);

438
src/server/server.rs Normal file
View File

@ -0,0 +1,438 @@
use std::time::Duration;
use std::{io, mem, net};
use futures::sync::{mpsc, mpsc::unbounded};
use futures::{Future, Sink, Stream};
use net2::TcpBuilder;
use num_cpus;
use actix::{
actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler, System, WrapFuture,
};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::services::{InternalServerServiceFactory, ServerNewService, ServerServiceFactory};
use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient};
use super::{PauseServer, ResumeServer, StopServer, Token};
pub(crate) enum ServerCommand {
WorkerDied(usize),
}
/// Server
pub struct Server {
threads: usize,
workers: Vec<(usize, Addr<Worker>)>,
services: Vec<Box<InternalServerServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>,
no_signals: bool,
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
impl Server {
/// Create new Server instance
pub fn new() -> Server {
Server {
threads: num_cpus::get(),
workers: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
accept: AcceptLoop::new(),
exit: false,
shutdown_timeout: 30,
signals: None,
no_signals: false,
}
}
/// Set number of workers to start.
///
/// By default server uses number of available logical cpu as threads
/// count.
pub fn workers(mut self, num: usize) -> Self {
self.threads = num;
self
}
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn maxconn(self, num: usize) -> Self {
worker::max_concurrent_connections(num);
self
}
/// Stop actix system.
///
/// `SystemExit` message stops currently running system.
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
#[doc(hidden)]
/// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
/// Disable signal handling
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish
/// serving requests. Workers still alive after the timeout are force
/// dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec;
self
}
/// Run external configuration as part of the server building
/// process
///
/// This function is useful for moving parts of configuration to a
/// different module or event library.
///
/// ```rust
/// # extern crate actix_web;
/// use actix_web::{fs, middleware, App, HttpResponse};
///
/// // this function could be located in different module
/// fn config(app: App) -> App {
/// app.resource("/test", |r| {
/// r.get().f(|_| HttpResponse::Ok());
/// r.head().f(|_| HttpResponse::MethodNotAllowed());
/// })
/// }
///
/// fn main() {
/// let app = App::new()
/// .middleware(middleware::Logger::default())
/// .configure(config) // <- register resources
/// .handler("/static", fs::StaticFiles::new(".").unwrap());
/// }
/// ```
pub fn configure<F>(self, cfg: F) -> Server
where
F: Fn(Server) -> Server,
{
cfg(self)
}
/// Add new service to server
pub fn bind<F, U>(mut self, addr: U, factory: F) -> io::Result<Self>
where
F: ServerServiceFactory,
U: net::ToSocketAddrs,
{
let sockets = bind_addr(addr)?;
for lst in sockets {
self = self.listen(lst, factory.clone())
}
Ok(self)
}
/// Add new service to server
pub fn listen<F>(mut self, lst: net::TcpListener, factory: F) -> Self
where
F: ServerServiceFactory,
{
let token = Token(self.services.len());
self.services.push(ServerNewService::create(factory));
self.sockets.push((token, lst));
self
}
/// Spawn new thread and start listening for incoming connections.
///
/// This method spawns new thread and starts new actix system. Other than
/// that it is similar to `start()` method. This method blocks.
///
/// This methods panics if no socket addresses get bound.
///
/// ```rust,ignore
/// # extern crate futures;
/// # extern crate actix_web;
/// # use futures::Future;
/// use actix_web::*;
///
/// fn main() {
/// Server::new().
/// .service(
/// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:0")
/// .expect("Can not bind to 127.0.0.1:0"))
/// .run();
/// }
/// ```
pub fn run(self) {
let sys = System::new("http-server");
self.start();
sys.run();
}
/// Starts Server Actor and returns its address
pub fn start(mut self) -> Addr<Server> {
if self.sockets.is_empty() {
panic!("Service should have at least one bound socket");
} else {
info!("Starting {} http workers", self.threads);
// start workers
let mut workers = Vec::new();
for idx in 0..self.threads {
let (addr, worker) = self.start_worker(idx, self.accept.get_notify());
workers.push(worker);
self.workers.push((idx, addr));
}
// start accept thread
for sock in &self.sockets {
info!("Starting server on http://{:?}", sock.1.local_addr().ok());
}
let rx = self
.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers);
// start http server actor
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream(rx);
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
// subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
Some(signals.clone())
} else {
Some(System::current().registry().get::<signal::ProcessSignals>())
}
} else {
None
}
}
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) {
let (tx, rx) = unbounded::<Conn>();
let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx, avail.clone());
let services: Vec<Box<InternalServerServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
ctx.add_message_stream(rx);
Worker::new(ctx, services, avail)
});
(addr, worker)
}
}
impl Actor for Server {
type Context = Context<Self>;
}
/// Signals support
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
/// message to `System` actor.
impl Handler<signal::Signal> for Server {
type Result = ();
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
match msg.0 {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
_ => (),
}
}
}
impl Handler<PauseServer> for Server {
type Result = ();
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
self.accept.send(Command::Pause);
}
}
impl Handler<ResumeServer> for Server {
type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
self.accept.send(Command::Resume);
}
}
impl Handler<StopServer> for Server {
type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
// stop accept thread
self.accept.send(Command::Stop);
// stop workers
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers {
let tx2 = tx.clone();
ctx.spawn(
worker
.1
.send(StopWorker { graceful: dur })
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned
if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
}
fut::ok(())
}),
);
}
if !self.workers.is_empty() {
Response::async(rx.into_future().map(|_| ()).map_err(|_| ()))
} else {
// we need to stop system if server was spawned
if self.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
Response::reply(Ok(()))
}
}
}
/// Commands from accept threads
impl StreamHandler<ServerCommand, ()> for Server {
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify());
self.workers.push((new_idx, addr));
self.accept.send(Command::Worker(worker));
}
}
}
}
}
fn bind_addr<S: net::ToSocketAddrs>(addr: S) -> io::Result<Vec<net::TcpListener>> {
let mut err = None;
let mut succ = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr) {
Ok(lst) => {
succ = true;
sockets.push(lst);
}
Err(e) => err = Some(e),
}
}
if !succ {
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
} else {
Ok(sockets)
}
}
fn create_tcp_listener(addr: net::SocketAddr) -> io::Result<net::TcpListener> {
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};
builder.reuse_address(true)?;
builder.bind(addr)?;
Ok(builder.listen(1024)?)
}

131
src/server/services.rs Normal file
View File

@ -0,0 +1,131 @@
use std::net;
use futures::future::{err, ok};
use futures::{Future, Poll};
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
use {NewService, Service};
pub enum ServerMessage {
Connect(net::TcpStream),
Shutdown,
ForceShutdown,
}
pub(crate) type BoxedServerService = Box<
Service<
Request = ServerMessage,
Response = (),
Error = (),
Future = Box<Future<Item = (), Error = ()>>,
>,
>;
pub(crate) struct ServerService<T> {
service: T,
}
impl<T> ServerService<T> {
fn new(service: T) -> Self {
ServerService { service }
}
}
impl<T> Service for ServerService<T>
where
T: Service<Request = TcpStream, Response = (), Error = ()>,
T::Future: 'static,
T::Error: 'static,
{
type Request = ServerMessage;
type Response = ();
type Error = ();
type Future = Box<Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(|_| ())
}
fn call(&mut self, req: ServerMessage) -> Self::Future {
match req {
ServerMessage::Connect(stream) => {
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e);
});
if let Ok(stream) = stream {
Box::new(self.service.call(stream))
} else {
Box::new(err(()))
}
}
_ => Box::new(ok(())),
}
}
}
pub(crate) struct ServerNewService<F: ServerServiceFactory> {
inner: F,
}
impl<F> ServerNewService<F>
where
F: ServerServiceFactory,
{
pub(crate) fn create(inner: F) -> Box<InternalServerServiceFactory> {
Box::new(Self { inner })
}
}
pub(crate) trait InternalServerServiceFactory: Send {
fn clone_factory(&self) -> Box<InternalServerServiceFactory>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
}
impl<F> InternalServerServiceFactory for ServerNewService<F>
where
F: ServerServiceFactory,
{
fn clone_factory(&self) -> Box<InternalServerServiceFactory> {
Box::new(Self {
inner: self.inner.clone(),
})
}
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
Box::new(self.inner.create().new_service().map(move |inner| {
let service: BoxedServerService = Box::new(ServerService::new(inner));
service
}))
}
}
impl InternalServerServiceFactory for Box<InternalServerServiceFactory> {
fn clone_factory(&self) -> Box<InternalServerServiceFactory> {
self.as_ref().clone_factory()
}
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
self.as_ref().create()
}
}
pub trait ServerServiceFactory: Send + Clone + 'static {
type NewService: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>;
fn create(&self) -> Self::NewService;
}
impl<F, T> ServerServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>,
{
type NewService = T;
fn create(&self) -> T {
(self)()
}
}

320
src/server/worker.rs Normal file
View File

@ -0,0 +1,320 @@
use std::cell::Cell;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::{net, time};
use futures::sync::mpsc::{SendError, UnboundedSender};
use futures::sync::oneshot;
use futures::task::AtomicTask;
use futures::{future, Async, Future, Poll};
use tokio_current_thread::spawn;
use actix::msgs::StopArbiter;
use actix::{
fut, Actor, ActorContext, ActorFuture, Arbiter, AsyncContext, Context, Handler, Message,
Response, WrapFuture,
};
use super::accept::AcceptNotify;
use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage};
use super::Token;
#[derive(Message)]
pub(crate) struct Conn {
pub io: net::TcpStream,
pub handler: Token,
pub token: Token,
pub peer: Option<net::SocketAddr>,
}
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
thread_local! {
static MAX_CONNS_COUNTER: Connections =
Connections::new(MAX_CONNS.load(Ordering::Relaxed));
}
#[derive(Clone)]
pub(crate) struct WorkerClient {
pub idx: usize,
tx: UnboundedSender<Conn>,
avail: WorkerAvailability,
}
impl WorkerClient {
pub fn new(idx: usize, tx: UnboundedSender<Conn>, avail: WorkerAvailability) -> Self {
WorkerClient { idx, tx, avail }
}
pub fn send(&self, msg: Conn) -> Result<(), SendError<Conn>> {
self.tx.unbounded_send(msg)
}
pub fn available(&self) -> bool {
self.avail.available()
}
}
#[derive(Clone)]
pub(crate) struct WorkerAvailability {
notify: AcceptNotify,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub fn new(notify: AcceptNotify) -> Self {
WorkerAvailability {
notify,
available: Arc::new(AtomicBool::new(false)),
}
}
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release);
if !old && val {
self.notify.notify()
}
}
}
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
impl Message for StopWorker {
type Result = Result<bool, ()>;
}
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests
/// processing.
pub(crate) struct Worker {
services: Vec<BoxedServerService>,
availability: WorkerAvailability,
conns: Connections,
}
impl Actor for Worker {
type Context = Context<Self>;
}
impl Worker {
pub(crate) fn new(
ctx: &mut Context<Self>, services: Vec<Box<InternalServerServiceFactory>>,
availability: WorkerAvailability,
) -> Self {
let wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
availability,
services: Vec::new(),
conns: conns.clone(),
});
ctx.wait(
future::join_all(services.into_iter().map(|s| s.create()))
.into_actor(&wrk)
.map_err(|e, _, ctx| {
error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0));
ctx.stop();
}).and_then(|services, act, ctx| {
act.services.extend(services);
act.availability.set(true);
ctx.spawn(CheckReadiness(true));
fut::ok(())
}),
);
wrk
}
fn shutdown(&mut self, force: bool) {
if force {
self.services.iter_mut().for_each(|h| {
h.call(ServerMessage::ForceShutdown);
});
} else {
self.services.iter_mut().for_each(|h| {
h.call(ServerMessage::Shutdown);
});
}
}
fn shutdown_timeout(
&mut self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration,
) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = num_connections();
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
slf.shutdown(true);
let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
}
});
}
}
impl Handler<Conn> for Worker {
type Result = ();
fn handle(&mut self, msg: Conn, _: &mut Context<Self>) {
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
)
}
}
/// `StopWorker` message handler
impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>;
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
let num = num_connections();
if num == 0 {
info!("Shutting down http worker, 0 connections");
Response::reply(Ok(true))
} else if let Some(dur) = msg.graceful {
self.shutdown(false);
let (tx, rx) = oneshot::channel();
let num = num_connections();
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
self.shutdown_timeout(ctx, tx, dur);
Response::reply(Ok(true))
} else {
Response::async(rx.map_err(|_| ()))
}
} else {
info!("Force shutdown http worker, {} connections", num);
self.shutdown(true);
Response::reply(Ok(false))
}
}
}
struct CheckReadiness(bool);
impl ActorFuture for CheckReadiness {
type Item = ();
type Error = ();
type Actor = Worker;
fn poll(&mut self, act: &mut Worker, _: &mut Context<Worker>) -> Poll<(), ()> {
let mut val = act.conns.check();
if val {
for service in &mut act.services {
if let Ok(Async::NotReady) = service.poll_ready() {
val = false;
break;
}
}
}
if self.0 != val {
self.0 = val;
act.availability.set(val);
}
Ok(Async::NotReady)
}
}
#[derive(Clone)]
pub(crate) struct Connections(Rc<ConnectionsInner>);
struct ConnectionsInner {
count: Cell<usize>,
maxconn: usize,
task: AtomicTask,
}
impl Connections {
pub fn new(maxconn: usize) -> Self {
Connections(Rc::new(ConnectionsInner {
maxconn,
count: Cell::new(0),
task: AtomicTask::new(),
}))
}
pub fn get(&self) -> ConnectionsGuard {
ConnectionsGuard::new(self.0.clone())
}
pub fn check(&self) -> bool {
self.0.check()
}
pub fn total(&self) -> usize {
self.0.count.get()
}
}
pub(crate) struct ConnectionsGuard(Rc<ConnectionsInner>);
impl ConnectionsGuard {
fn new(inner: Rc<ConnectionsInner>) -> Self {
inner.inc();
ConnectionsGuard(inner)
}
}
impl Drop for ConnectionsGuard {
fn drop(&mut self) {
self.0.dec();
}
}
impl ConnectionsInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.maxconn {
self.task.register();
}
}
fn dec(&self) {
let num = self.count.get();
self.count.set(num - 1);
if num == self.maxconn {
self.task.notify();
}
}
fn check(&self) -> bool {
self.count.get() < self.maxconn
}
}