1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-01 05:10:06 +01:00
actix-net/actix-server/src/server.rs

358 lines
12 KiB
Rust
Raw Normal View History

2021-11-04 21:30:43 +01:00
use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
2019-11-26 11:33:45 +01:00
2021-11-04 21:30:43 +01:00
use actix_rt::{time::sleep, System};
use futures_core::future::BoxFuture;
use log::{error, info};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
2018-12-10 06:51:35 +01:00
2021-11-04 21:30:43 +01:00
use crate::{
accept::Accept,
builder::ServerBuilder,
join_all::join_all,
service::InternalServiceFactory,
signals::{Signal, Signals},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle,
};
2018-12-10 06:51:35 +01:00
#[derive(Debug)]
2018-12-10 06:51:35 +01:00
pub(crate) enum ServerCommand {
2021-11-04 21:30:43 +01:00
/// TODO
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 13:38:24 +01:00
WorkerFaulted(usize),
2021-11-04 21:30:43 +01:00
/// Contains return channel to notify caller of successful state change.
2018-12-10 06:51:35 +01:00
Pause(oneshot::Sender<()>),
2021-11-04 21:30:43 +01:00
/// Contains return channel to notify caller of successful state change.
2018-12-10 06:51:35 +01:00
Resume(oneshot::Sender<()>),
2021-11-04 21:30:43 +01:00
/// TODO
2018-12-10 06:51:35 +01:00
Stop {
/// True if shut down should be graceful.
2018-12-10 06:51:35 +01:00
graceful: bool,
2021-11-04 21:30:43 +01:00
/// Return channel to notify caller that shutdown is complete.
2018-12-11 06:06:54 +01:00
completion: Option<oneshot::Sender<()>>,
2018-12-10 06:51:35 +01:00
},
2021-11-02 00:36:51 +01:00
}
2021-11-04 21:30:43 +01:00
/// General purpose TCP server that runs services receiving Tokio `TcpStream`s.
///
/// Handles creating worker threads, restarting faulted workers, connection accepting, and
/// back-pressure logic.
///
/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
/// distributes connections with a round-robin strategy.
///
/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve
/// when the server has fully shut down.
///
/// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
2021-11-04 21:30:43 +01:00
/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown.
///
/// A graceful shutdown will wait for all workers to stop first.
2021-11-04 21:30:43 +01:00
///
/// # Examples
/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`.
///
/// ```no_run
/// use std::io;
///
/// use actix_rt::net::TcpStream;
/// use actix_server::Server;
/// use actix_service::{fn_service, ServiceFactoryExt as _};
/// use bytes::BytesMut;
/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
///
/// #[actix_rt::main]
/// async fn main() -> io::Result<()> {
/// let bind_addr = ("127.0.0.1", 8080);
///
/// Server::build()
/// .bind("echo", bind_addr, move || {
/// fn_service(move |mut stream: TcpStream| {
/// async move {
/// let mut size = 0;
/// let mut buf = BytesMut::new();
///
/// loop {
/// match stream.read_buf(&mut buf).await {
/// // end of stream; bail from loop
/// Ok(0) => break,
///
/// // write bytes back to stream
/// Ok(bytes_read) => {
/// stream.write_all(&buf[size..]).await.unwrap();
/// size += bytes_read;
/// }
///
/// Err(err) => {
/// eprintln!("Stream Error: {:?}", err);
/// return Err(());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// })
/// .map_err(|err| eprintln!("Service Error: {:?}", err))
/// })?
/// .run()
/// .await
/// }
/// ```
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum Server {
Server(ServerInner),
Error(Option<io::Error>),
}
2018-12-10 06:51:35 +01:00
2021-11-04 21:30:43 +01:00
impl Server {
/// Create server build.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
2018-12-11 06:06:54 +01:00
2021-11-04 21:30:43 +01:00
pub(crate) fn new(mut builder: ServerBuilder) -> Self {
let sockets = mem::take(&mut builder.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect();
2018-12-10 06:51:35 +01:00
2021-11-04 21:30:43 +01:00
// Give log information on what runtime will be used.
let is_actix = actix_rt::System::try_current().is_some();
2021-11-14 20:45:15 +01:00
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
2021-11-04 21:30:43 +01:00
2021-11-14 20:45:15 +01:00
match (is_actix, is_tokio) {
(true, _) => info!("Actix runtime found; starting in Actix runtime"),
(_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
(_, false) => panic!("Actix or Tokio runtime not found; halting"),
}
2018-12-10 06:51:35 +01:00
2021-11-04 21:30:43 +01:00
for (_, name, lst) in &builder.sockets {
info!(
r#"Starting service: "{}", workers: {}, listening on: {}"#,
name,
builder.threads,
lst.local_addr()
);
}
match Accept::start(sockets, &builder) {
Ok((waker_queue, worker_handles)) => {
// construct OS signals listener future
let signals = (builder.listen_os_signals).then(Signals::new);
Self::Server(ServerInner {
cmd_tx: builder.cmd_tx.clone(),
cmd_rx: builder.cmd_rx,
signals,
waker_queue,
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
exit: builder.exit,
stop_task: None,
})
}
Err(err) => Self::Error(Some(err)),
}
2018-12-10 06:51:35 +01:00
}
2021-11-04 21:30:43 +01:00
/// Get a handle for ServerFuture that can be used to change state of actix server.
2018-12-10 06:51:35 +01:00
///
2021-11-04 21:30:43 +01:00
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
match self {
Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()),
Server::Error(err) => {
// TODO: i don't think this is the best way to handle server startup fail
panic!(
"server handle can not be obtained because server failed to start up: {}",
err.as_ref().unwrap()
);
}
}
2019-11-26 11:33:45 +01:00
}
}
2021-11-04 21:30:43 +01:00
impl Future for Server {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().get_mut() {
2021-11-15 03:33:13 +01:00
Self::Error(err) => Poll::Ready(Err(err
2021-11-04 21:30:43 +01:00
.take()
.expect("Server future cannot be polled after error"))),
2021-11-15 03:33:13 +01:00
Self::Server(inner) => {
2021-11-04 21:30:43 +01:00
// poll Signals
if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
inner.stop_task = inner.handle_signal(signal);
// drop signals listener
inner.signals = None;
}
}
// handle stop tasks and eager drain command channel
loop {
if let Some(ref mut fut) = inner.stop_task {
// only resolve stop task and exit
return fut.as_mut().poll(cx).map(|_| Ok(()));
}
match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(cmd)) => {
// if stop task is required, set it and loop
inner.stop_task = inner.handle_cmd(cmd);
}
_ => return Poll::Pending,
}
}
}
}
2019-11-26 11:33:45 +01:00
}
}
2021-11-04 21:30:43 +01:00
pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
exit: bool,
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
signals: Option<Signals>,
waker_queue: WakerQueue,
stop_task: Option<BoxFuture<'static, ()>>,
}
2019-11-26 11:33:45 +01:00
2021-11-04 21:30:43 +01:00
impl ServerInner {
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
None
}
2019-11-26 11:33:45 +01:00
2021-11-04 21:30:43 +01:00
ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
None
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
self.waker_queue.wake(WakerInterest::Stop);
// stop workers
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
Some(Box::pin(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(workers_stop).await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
}
ServerCommand::WorkerFaulted(idx) => {
// TODO: maybe just return with warning log if not found ?
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
error!("Worker {} has died; restarting", idx);
let factories = self
.services
.iter()
.map(|service| service.clone_factory())
.collect();
match ServerWorker::start(
idx,
factories,
self.waker_queue.clone(),
self.worker_config,
) {
Ok((handle_accept, handle_server)) => {
*self
.worker_handles
.iter_mut()
.find(|wrk| wrk.idx == idx)
.unwrap() = handle_server;
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
}
Err(err) => error!("can not restart worker {}: {}", idx, err),
};
None
2019-11-26 11:33:45 +01:00
}
}
2021-11-04 21:30:43 +01:00
}
fn handle_signal(&mut self, signal: Signal) -> Option<BoxFuture<'static, ()>> {
match signal {
Signal::Int => {
info!("SIGINT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received; starting graceful shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
2019-11-26 11:33:45 +01:00
2021-11-04 21:30:43 +01:00
Signal::Quit => {
info!("SIGQUIT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
2019-11-26 11:33:45 +01:00
}
2018-12-10 06:51:35 +01:00
}
}