1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-18 04:33:12 +01:00
actix-net/actix-server/src/worker.rs

519 lines
16 KiB
Rust
Raw Normal View History

2021-04-04 21:34:52 +02:00
use std::{
future::Future,
mem,
pin::Pin,
sync::{
2021-04-05 00:00:12 +02:00
atomic::{AtomicBool, Ordering},
2021-04-04 21:34:52 +02:00
Arc,
},
task::{Context, Poll},
time::Duration,
};
use actix_rt::{
spawn,
time::{sleep, Instant, Sleep},
Arbiter,
};
2019-12-02 17:30:09 +01:00
use actix_utils::counter::Counter;
use futures_core::{future::LocalBoxFuture, ready};
2018-12-06 23:04:42 +01:00
use log::{error, info, trace};
2021-04-04 21:34:52 +02:00
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
2018-08-19 19:47:04 +02:00
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token};
2018-08-19 19:47:04 +02:00
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop {
2018-11-01 23:33:35 +01:00
graceful: bool,
tx: oneshot::Sender<bool>,
}
2018-12-10 05:30:04 +01:00
#[derive(Debug)]
2018-08-19 19:47:04 +02:00
pub(crate) struct Conn {
pub io: MioStream,
2018-08-19 19:47:04 +02:00
pub token: Token,
}
fn handle_pair(
idx: usize,
tx1: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>,
2018-09-07 22:06:51 +02:00
avail: WorkerAvailability,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept { tx: tx1, avail };
let server = WorkerHandleServer { idx, tx: tx2 };
(accept, server)
2018-08-19 19:47:04 +02:00
}
/// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread.
///
/// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept {
tx: UnboundedSender<Conn>,
avail: WorkerAvailability,
}
2018-08-19 19:47:04 +02:00
impl WorkerHandleAccept {
#[inline(always)]
pub(crate) fn idx(&self) -> usize {
self.avail.idx
}
#[inline(always)]
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0)
2018-08-19 19:47:04 +02:00
}
#[inline(always)]
pub(crate) fn available(&self) -> bool {
2018-09-07 22:06:51 +02:00
self.avail.available()
}
}
/// Handle to worker than can send stop message to worker.
///
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
pub(crate) struct WorkerHandleServer {
pub idx: usize,
tx: UnboundedSender<Stop>,
}
impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(Stop { graceful, tx });
rx
}
2018-09-07 22:06:51 +02:00
}
#[derive(Clone)]
pub(crate) struct WorkerAvailability {
idx: usize,
waker: WakerQueue,
2018-09-07 22:06:51 +02:00
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub fn new(idx: usize, waker: WakerQueue) -> Self {
2018-09-07 22:06:51 +02:00
WorkerAvailability {
idx,
waker,
2018-09-07 22:06:51 +02:00
available: Arc::new(AtomicBool::new(false)),
}
}
#[inline(always)]
2018-09-07 22:06:51 +02:00
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub fn set(&self, val: bool) {
// Ordering:
//
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
// Order is important between them.
let old = self.available.swap(val, Ordering::AcqRel);
// Notify the accept on switched to available.
2018-09-07 22:06:51 +02:00
if !old && val {
self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
2018-09-07 22:06:51 +02:00
}
2018-08-19 19:47:04 +02:00
}
}
2021-01-29 05:08:14 +01:00
/// Service worker.
2018-08-19 19:47:04 +02:00
///
2021-01-29 05:08:14 +01:00
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
2021-04-13 02:12:59 +02:00
services: Box<[WorkerService]>,
2018-09-07 22:06:51 +02:00
availability: WorkerAvailability,
conns: Counter,
2021-04-13 02:12:59 +02:00
factories: Box<[Box<dyn InternalServiceFactory>]>,
state: WorkerState,
2021-04-04 21:34:52 +02:00
shutdown_timeout: Duration,
2018-08-19 19:47:04 +02:00
}
2019-12-04 10:12:02 +01:00
struct WorkerService {
factory: usize,
status: WorkerServiceStatus,
service: BoxedServerService,
}
impl WorkerService {
fn created(&mut self, service: BoxedServerService) {
self.service = service;
self.status = WorkerServiceStatus::Unavailable;
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum WorkerServiceStatus {
Available,
Unavailable,
Failed,
Restarting,
Stopping,
Stopped,
}
2021-02-04 16:01:51 +01:00
/// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)]
pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration,
max_blocking_threads: usize,
2021-04-05 00:00:12 +02:00
max_concurrent_connections: usize,
2021-02-04 16:01:51 +01:00
}
impl Default for ServerWorkerConfig {
fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self {
shutdown_timeout: Duration::from_secs(30),
max_blocking_threads,
2021-04-05 00:00:12 +02:00
max_concurrent_connections: 25600,
2021-02-04 16:01:51 +01:00
}
}
}
impl ServerWorkerConfig {
pub(crate) fn max_blocking_threads(&mut self, num: usize) {
self.max_blocking_threads = num;
}
2021-04-05 00:00:12 +02:00
pub(crate) fn max_concurrent_connections(&mut self, num: usize) {
self.max_concurrent_connections = num;
}
2021-02-04 16:01:51 +01:00
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
self.shutdown_timeout = dur;
}
}
2021-01-29 05:08:14 +01:00
impl ServerWorker {
pub(crate) fn start(
2019-12-05 11:40:24 +01:00
idx: usize,
2019-07-18 13:05:40 +02:00
factories: Vec<Box<dyn InternalServiceFactory>>,
2018-10-30 04:29:47 +01:00
availability: WorkerAvailability,
2021-02-04 16:01:51 +01:00
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
2021-04-13 02:12:59 +02:00
assert!(!availability.available());
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
2019-12-05 11:40:24 +01:00
let avail = availability.clone();
// every worker runs in it's own arbiter.
2021-02-04 16:01:51 +01:00
// use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
2021-04-13 02:12:59 +02:00
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move {
fut.await.map(|r| {
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
})
}
})
.collect::<Vec<_>>();
2021-04-13 02:12:59 +02:00
// a second spawn to run !Send future tasks.
spawn(async move {
2021-04-13 02:12:59 +02:00
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.flatten()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token.0, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
2021-04-13 02:12:59 +02:00
return;
}
2021-04-13 02:12:59 +02:00
};
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
availability,
conns: Counter::new(config.max_concurrent_connections),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
2021-02-04 16:01:51 +01:00
});
2019-12-05 11:40:24 +01:00
handle_pair(idx, tx1, tx2, avail)
2018-08-19 19:47:04 +02:00
}
fn restart_service(&mut self, token: Token, factory_id: usize) {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(token));
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(Restart {
factory_id,
token,
fut: factory.create(),
});
}
2018-09-07 22:06:51 +02:00
fn shutdown(&mut self, force: bool) {
self.services
.iter_mut()
.filter(|srv| srv.status == WorkerServiceStatus::Available)
.for_each(|srv| {
srv.status = if force {
WorkerServiceStatus::Stopped
} else {
WorkerServiceStatus::Stopping
};
2018-09-07 22:06:51 +02:00
});
2018-08-19 19:47:04 +02:00
}
2019-12-04 10:12:02 +01:00
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
let mut ready = self.conns.available(cx);
for (idx, srv) in self.services.iter_mut().enumerate() {
2019-12-04 10:12:02 +01:00
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
{
match srv.service.poll_ready(cx) {
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
Poll::Ready(Ok(_)) => {
2019-12-04 10:12:02 +01:00
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
2019-12-04 10:12:02 +01:00
self.factories[srv.factory].name(Token(idx))
);
2019-12-04 10:12:02 +01:00
srv.status = WorkerServiceStatus::Available;
}
}
Poll::Pending => {
ready = false;
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(Token(idx))
);
srv.status = WorkerServiceStatus::Unavailable;
}
}
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
2019-12-04 10:12:02 +01:00
self.factories[srv.factory].name(Token(idx))
);
2019-12-04 10:12:02 +01:00
srv.status = WorkerServiceStatus::Failed;
return Err((Token(idx), srv.factory));
2018-09-18 05:19:48 +02:00
}
}
2018-08-21 07:21:23 +02:00
}
}
Ok(ready)
2018-08-19 19:47:04 +02:00
}
}
2018-09-07 22:06:51 +02:00
enum WorkerState {
Available,
Unavailable,
Restarting(Restart),
Shutdown(Shutdown),
}
struct Restart {
factory_id: usize,
token: Token,
fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
}
// Shutdown keep states necessary for server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
struct Shutdown {
timer: Pin<Box<Sleep>>,
start_from: Instant,
tx: oneshot::Sender<bool>,
2021-04-04 21:34:52 +02:00
}
impl Default for WorkerState {
fn default() -> Self {
Self::Unavailable
}
}
2018-09-07 22:06:51 +02:00
impl Drop for ServerWorker {
fn drop(&mut self) {
// Set availability to true so if accept try to send connection to this worker
// it would find worker is gone and remove it.
// This is helpful when worker is dropped unexpected.
self.availability.set(true);
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
}
}
2021-01-29 05:08:14 +01:00
impl Future for ServerWorker {
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
type Output = ();
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2021-04-04 21:34:52 +02:00
let this = self.as_mut().get_mut();
2018-11-01 23:33:35 +01:00
// `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
{
2021-04-04 21:34:52 +02:00
this.availability.set(false);
2021-04-05 00:00:12 +02:00
let num = this.conns.total();
2018-12-06 23:04:42 +01:00
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = tx.send(true);
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
return Poll::Ready(());
2018-12-06 23:04:42 +01:00
} else if graceful {
info!("Graceful worker shutdown, {} connections", num);
2021-04-04 21:34:52 +02:00
this.shutdown(false);
this.state = WorkerState::Shutdown(Shutdown {
timer: Box::pin(sleep(Duration::from_secs(1))),
start_from: Instant::now(),
tx,
});
2018-12-06 23:04:42 +01:00
} else {
info!("Force shutdown worker, {} connections", num);
2021-04-04 21:34:52 +02:00
this.shutdown(true);
let _ = tx.send(false);
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
return Poll::Ready(());
2018-11-01 23:33:35 +01:00
}
}
2021-04-04 21:34:52 +02:00
match this.state {
WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => {
2021-04-04 21:34:52 +02:00
this.state = WorkerState::Available;
this.availability.set(true);
self.poll(cx)
}
Ok(false) => Poll::Pending,
Err((token, idx)) => {
2021-04-04 21:34:52 +02:00
this.restart_service(token, idx);
self.poll(cx)
}
},
WorkerState::Restarting(ref mut restart) => {
let factory_id = restart.factory_id;
let token = restart.token;
let service = ready!(restart.fut.as_mut().poll(cx))
.unwrap_or_else(|_| {
panic!(
"Can not restart {:?} service",
this.factories[factory_id].name(token)
)
})
.into_iter()
// Find the same token from vector. There should be only one
// So the first match would be enough.
.find(|(t, _)| *t == token)
.map(|(_, service)| service)
.expect("No BoxedServerService found");
trace!(
"Service {:?} has been restarted",
this.factories[factory_id].name(token)
);
2021-04-04 21:34:52 +02:00
this.services[token.0].created(service);
this.state = WorkerState::Unavailable;
2019-12-02 17:30:09 +01:00
self.poll(cx)
}
WorkerState::Shutdown(ref mut shutdown) => {
2021-04-04 21:34:52 +02:00
// Wait for 1 second.
ready!(shutdown.timer.as_mut().poll(cx));
2021-04-04 21:34:52 +02:00
2021-04-05 00:00:12 +02:00
if this.conns.total() == 0 {
2021-04-04 21:34:52 +02:00
// Graceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(true);
2021-04-04 21:34:52 +02:00
}
Poll::Ready(())
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
2021-04-04 21:34:52 +02:00
// Timeout forceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(false);
2021-04-04 21:34:52 +02:00
}
Poll::Ready(())
} else {
// Reset timer and wait for 1 second.
let time = Instant::now() + Duration::from_secs(1);
shutdown.timer.as_mut().reset(time);
shutdown.timer.as_mut().poll(cx)
}
2018-09-07 22:06:51 +02:00
}
// actively poll stream and handle worker command
WorkerState::Available => loop {
2021-04-04 21:34:52 +02:00
match this.check_readiness(cx) {
Ok(true) => {}
Ok(false) => {
trace!("Worker is unavailable");
2021-04-04 21:34:52 +02:00
this.availability.set(false);
this.state = WorkerState::Unavailable;
return self.poll(cx);
}
Err((token, idx)) => {
2021-04-04 21:34:52 +02:00
this.restart_service(token, idx);
this.availability.set(false);
return self.poll(cx);
}
}
2021-04-04 21:34:52 +02:00
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream
Some(msg) => {
2021-04-04 21:34:52 +02:00
let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};
},
2019-12-02 17:30:09 +01:00
}
2018-09-07 22:06:51 +02:00
}
}