use std::{net, time}; use futures::future; use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::oneshot; use actix::msgs::StopArbiter; use actix::{ fut, Actor, ActorContext, ActorFuture, Arbiter, AsyncContext, Context, Handler, Message, Response, WrapFuture, }; use super::server_service::{BoxedServerService, ServerServiceFactory}; use super::{server::Connections, Token}; #[derive(Message)] pub(crate) struct Conn { pub io: net::TcpStream, pub handler: Token, pub token: Token, pub peer: Option, } #[derive(Clone)] pub(crate) struct WorkerClient { pub idx: usize, tx: UnboundedSender, conns: Connections, } impl WorkerClient { pub fn new(idx: usize, tx: UnboundedSender, conns: Connections) -> Self { WorkerClient { idx, tx, conns } } pub fn send(&self, msg: Conn) -> Result<(), SendError> { self.tx.unbounded_send(msg) } pub fn available(&self) -> bool { self.conns.available() } } /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. pub(crate) struct StopWorker { pub graceful: Option, } impl Message for StopWorker { type Result = Result; } /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests /// processing. pub(crate) struct Worker { // conns: Connections, services: Vec, } impl Actor for Worker { type Context = Context; } impl Worker { pub(crate) fn new( ctx: &mut Context, services: Vec>, ) -> Self { let wrk = Worker { services: Vec::new(), }; ctx.wait( future::join_all(services.into_iter().map(|s| s.create())) .into_actor(&wrk) .map_err(|e, _, ctx| { error!("Can not start worker: {:?}", e); Arbiter::current().do_send(StopArbiter(0)); ctx.stop(); }).and_then(|services, act, _| { act.services.extend(services); fut::ok(()) }), ); wrk } fn shutdown(&self, _force: bool) { // self.services.iter().for_each(|h| h.shutdown(force)); } fn shutdown_timeout( &self, _ctx: &mut Context, _tx: oneshot::Sender, _dur: time::Duration, ) { // sleep for 1 second and then check again // ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { // let num = slf.conns.num_connections(); // if num == 0 { // let _ = tx.send(true); // Arbiter::current().do_send(StopArbiter(0)); // } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { // slf.shutdown_timeout(ctx, tx, d); // } else { // info!("Force shutdown http worker, {} connections", num); // slf.shutdown(true); // let _ = tx.send(false); // Arbiter::current().do_send(StopArbiter(0)); // } // }); } } impl Handler for Worker { type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) { Arbiter::spawn(self.services[msg.handler.0].call(msg.io)) } } /// `StopWorker` message handler impl Handler for Worker { type Result = Response; fn handle(&mut self, _msg: StopWorker, _ctx: &mut Context) -> Self::Result { unimplemented!() // let num = self.conns.num_connections(); // if num == 0 { // info!("Shutting down http worker, 0 connections"); // Response::reply(Ok(true)) // } else if let Some(dur) = msg.graceful { // self.shutdown(false); // let (tx, rx) = oneshot::channel(); // let num = self.conns.num_connections(); // if num != 0 { // info!("Graceful http worker shutdown, {} connections", num); // self.shutdown_timeout(ctx, tx, dur); // Response::reply(Ok(true)) // } else { // Response::async(rx.map_err(|_| ())) // } // } else { // info!("Force shutdown http worker, {} connections", num); // self.shutdown(true); // Response::reply(Ok(false)) // } } }