1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-01 05:10:06 +01:00
actix-net/actix-server/src/server.rs

369 lines
11 KiB
Rust
Raw Normal View History

2021-11-04 21:30:43 +01:00
use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
thread,
2021-11-04 21:30:43 +01:00
time::Duration,
};
2019-11-26 11:33:45 +01:00
2021-11-04 21:30:43 +01:00
use actix_rt::{time::sleep, System};
use futures_core::{future::BoxFuture, Stream};
use futures_util::stream::StreamExt as _;
2021-11-04 21:30:43 +01:00
use log::{error, info};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
2018-12-10 06:51:35 +01:00
2021-11-04 21:30:43 +01:00
use crate::{
accept::Accept,
builder::ServerBuilder,
join_all::join_all,
service::InternalServiceFactory,
2021-11-28 01:35:34 +01:00
signals::{SignalKind, Signals},
2021-11-04 21:30:43 +01:00
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle,
};
2018-12-10 06:51:35 +01:00
#[derive(Debug)]
2018-12-10 06:51:35 +01:00
pub(crate) enum ServerCommand {
2021-11-28 01:35:34 +01:00
/// Worker failed to accept connection, indicating a probable panic.
///
/// Contains index of faulted worker.
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
WorkerFaulted(usize),
2021-11-04 21:30:43 +01:00
2021-11-28 01:35:34 +01:00
/// Pause accepting connections.
///
2021-11-04 21:30:43 +01:00
/// Contains return channel to notify caller of successful state change.
2018-12-10 06:51:35 +01:00
Pause(oneshot::Sender<()>),
2021-11-04 21:30:43 +01:00
2021-11-28 01:35:34 +01:00
/// Resume accepting connections.
///
2021-11-04 21:30:43 +01:00
/// Contains return channel to notify caller of successful state change.
2018-12-10 06:51:35 +01:00
Resume(oneshot::Sender<()>),
2021-11-04 21:30:43 +01:00
2021-11-28 01:35:34 +01:00
/// Stop accepting connections and begin shutdown procedure.
2018-12-10 06:51:35 +01:00
Stop {
/// True if shut down should be graceful.
2018-12-10 06:51:35 +01:00
graceful: bool,
2021-11-04 21:30:43 +01:00
/// Return channel to notify caller that shutdown is complete.
2018-12-11 06:06:54 +01:00
completion: Option<oneshot::Sender<()>>,
/// Force System exit when true, overriding `ServerBuilder::system_exit()` if it is false.
force_system_stop: bool,
2018-12-10 06:51:35 +01:00
},
2021-11-02 00:36:51 +01:00
}
2021-11-04 21:30:43 +01:00
/// General purpose TCP server that runs services receiving Tokio `TcpStream`s.
///
/// Handles creating worker threads, restarting faulted workers, connection accepting, and
/// back-pressure logic.
///
/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
/// distributes connections with a round-robin strategy.
///
/// The [Server] must be awaited or polled in order to start running. It will resolve when the
/// server has fully shut down.
///
/// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
2021-11-04 21:30:43 +01:00
/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown.
///
/// A graceful shutdown will wait for all workers to stop first.
2021-11-04 21:30:43 +01:00
///
/// # Examples
/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`.
///
/// ```no_run
/// use std::io;
///
/// use actix_rt::net::TcpStream;
/// use actix_server::Server;
/// use actix_service::{fn_service, ServiceFactoryExt as _};
/// use bytes::BytesMut;
/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
///
/// #[actix_rt::main]
/// async fn main() -> io::Result<()> {
/// let bind_addr = ("127.0.0.1", 8080);
///
/// Server::build()
/// .bind("echo", bind_addr, move || {
/// fn_service(move |mut stream: TcpStream| {
/// async move {
/// let mut size = 0;
/// let mut buf = BytesMut::new();
///
/// loop {
/// match stream.read_buf(&mut buf).await {
/// // end of stream; bail from loop
/// Ok(0) => break,
///
/// // write bytes back to stream
/// Ok(bytes_read) => {
/// stream.write_all(&buf[size..]).await.unwrap();
/// size += bytes_read;
/// }
///
/// Err(err) => {
/// eprintln!("Stream Error: {:?}", err);
/// return Err(());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// })
/// .map_err(|err| eprintln!("Service Error: {:?}", err))
/// })?
/// .run()
/// .await
/// }
/// ```
#[must_use = "Server does nothing unless you `.await` or poll it"]
pub struct Server {
handle: ServerHandle,
fut: BoxFuture<'static, io::Result<()>>,
}
2018-12-10 06:51:35 +01:00
2021-11-04 21:30:43 +01:00
impl Server {
/// Create server build.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
2018-12-11 06:06:54 +01:00
2021-12-05 17:44:06 +01:00
pub(crate) fn new(builder: ServerBuilder) -> Self {
Server {
handle: ServerHandle::new(builder.cmd_tx.clone()),
fut: Box::pin(ServerInner::run(builder)),
}
2018-12-10 06:51:35 +01:00
}
/// Get a `Server` handle that can be used issue commands and change it's state.
2018-12-10 06:51:35 +01:00
///
2021-11-04 21:30:43 +01:00
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
self.handle.clone()
2019-11-26 11:33:45 +01:00
}
}
2021-11-04 21:30:43 +01:00
impl Future for Server {
type Output = io::Result<()>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
2019-11-26 11:33:45 +01:00
}
}
2021-11-04 21:30:43 +01:00
pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
accept_handle: Option<thread::JoinHandle<()>>,
2021-11-04 21:30:43 +01:00
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue,
system_stop: bool,
stopping: bool,
2021-11-04 21:30:43 +01:00
}
2019-11-26 11:33:45 +01:00
2021-11-04 21:30:43 +01:00
impl ServerInner {
async fn run(builder: ServerBuilder) -> io::Result<()> {
let (mut this, mut mux) = Self::run_sync(builder)?;
while let Some(cmd) = mux.next().await {
this.handle_cmd(cmd).await;
if this.stopping {
break;
}
}
Ok(())
}
fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> {
2021-12-05 17:44:06 +01:00
let sockets = mem::take(&mut builder.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect();
// Give log information on what runtime will be used.
let is_actix = actix_rt::System::try_current().is_some();
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
match (is_actix, is_tokio) {
(true, _) => info!("Actix runtime found; starting in Actix runtime"),
(_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
(_, false) => panic!("Actix or Tokio runtime not found; halting"),
}
for (_, name, lst) in &builder.sockets {
info!(
r#"Starting service: "{}", workers: {}, listening on: {}"#,
name,
builder.threads,
lst.local_addr()
);
}
let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
2021-12-05 17:44:06 +01:00
let mux = ServerEventMultiplexer {
signal_fut: (builder.listen_os_signals).then(Signals::new),
2021-12-05 17:44:06 +01:00
cmd_rx: builder.cmd_rx,
};
let server = ServerInner {
2021-12-05 17:44:06 +01:00
waker_queue,
accept_handle: Some(accept_handle),
2021-12-05 17:44:06 +01:00
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
system_stop: builder.exit,
stopping: false,
};
Ok((server, mux))
2021-12-05 17:44:06 +01:00
}
async fn handle_cmd(&mut self, item: ServerCommand) {
2021-11-04 21:30:43 +01:00
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
}
2019-11-26 11:33:45 +01:00
2021-11-04 21:30:43 +01:00
ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
}
ServerCommand::Stop {
graceful,
completion,
force_system_stop,
2021-11-04 21:30:43 +01:00
} => {
self.stopping = true;
2021-11-04 21:30:43 +01:00
// Signal accept thread to stop.
// Signal is non-blocking; we wait for thread to stop later.
2021-11-04 21:30:43 +01:00
self.waker_queue.wake(WakerInterest::Stop);
// send stop signal to workers
2021-11-04 21:30:43 +01:00
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
if graceful {
// wait for all workers to shut down
let _ = join_all(workers_stop).await;
}
2021-11-04 21:30:43 +01:00
// wait for accept thread stop
self.accept_handle
.take()
.unwrap()
.join()
.expect("Accept thread must not panic in any case");
if let Some(tx) = completion {
let _ = tx.send(());
}
2021-11-04 21:30:43 +01:00
if self.system_stop || force_system_stop {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
2021-11-04 21:30:43 +01:00
}
ServerCommand::WorkerFaulted(idx) => {
// TODO: maybe just return with warning log if not found ?
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
error!("Worker {} has died; restarting", idx);
let factories = self
.services
.iter()
.map(|service| service.clone_factory())
.collect();
match ServerWorker::start(
idx,
factories,
self.waker_queue.clone(),
self.worker_config,
) {
Ok((handle_accept, handle_server)) => {
*self
.worker_handles
.iter_mut()
.find(|wrk| wrk.idx == idx)
.unwrap() = handle_server;
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
}
Err(err) => error!("can not restart worker {}: {}", idx, err),
};
2019-11-26 11:33:45 +01:00
}
}
2021-11-04 21:30:43 +01:00
}
fn map_signal(signal: SignalKind) -> ServerCommand {
2021-11-04 21:30:43 +01:00
match signal {
2021-11-28 01:35:34 +01:00
SignalKind::Int => {
2021-11-04 21:30:43 +01:00
info!("SIGINT received; starting forced shutdown");
ServerCommand::Stop {
2021-11-04 21:30:43 +01:00
graceful: false,
completion: None,
force_system_stop: true,
}
2021-11-04 21:30:43 +01:00
}
2021-11-28 01:35:34 +01:00
SignalKind::Term => {
2021-11-04 21:30:43 +01:00
info!("SIGTERM received; starting graceful shutdown");
ServerCommand::Stop {
2021-11-04 21:30:43 +01:00
graceful: true,
completion: None,
force_system_stop: true,
}
2021-11-04 21:30:43 +01:00
}
2019-11-26 11:33:45 +01:00
2021-11-28 01:35:34 +01:00
SignalKind::Quit => {
2021-11-04 21:30:43 +01:00
info!("SIGQUIT received; starting forced shutdown");
ServerCommand::Stop {
2021-11-04 21:30:43 +01:00
graceful: false,
completion: None,
force_system_stop: true,
}
2021-11-04 21:30:43 +01:00
}
2019-11-26 11:33:45 +01:00
}
2018-12-10 06:51:35 +01:00
}
}
struct ServerEventMultiplexer {
cmd_rx: UnboundedReceiver<ServerCommand>,
signal_fut: Option<Signals>,
}
impl Stream for ServerEventMultiplexer {
type Item = ServerCommand;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
if let Some(signal_fut) = &mut this.signal_fut {
if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) {
this.signal_fut = None;
return Poll::Ready(Some(ServerInner::map_signal(signal)));
}
}
Pin::new(&mut this.cmd_rx).poll_recv(cx)
}
}