mirror of
https://github.com/fafhrd91/actix-net
synced 2024-12-18 22:23:11 +01:00
1b7c969f6a
* build(deps): minimize `futures` deps by using `futures-channel` and `futures-util` directly * style(actix-rt): enforce spaces around equals in `Cargo.toml`
192 lines
5.5 KiB
Rust
192 lines
5.5 KiB
Rust
use std::borrow::Cow;
|
|
use std::io;
|
|
|
|
use futures_channel::mpsc::unbounded;
|
|
use futures_channel::oneshot::{channel, Receiver};
|
|
use futures_util::future::{lazy, Future, FutureExt};
|
|
use tokio::task::LocalSet;
|
|
|
|
use crate::arbiter::{Arbiter, SystemArbiter};
|
|
use crate::runtime::Runtime;
|
|
use crate::system::System;
|
|
|
|
/// Builder struct for a actix runtime.
|
|
///
|
|
/// Either use `Builder::build` to create a system and start actors.
|
|
/// Alternatively, use `Builder::run` to start the tokio runtime and
|
|
/// run a function in its context.
|
|
pub struct Builder {
|
|
/// Name of the System. Defaults to "actix" if unset.
|
|
name: Cow<'static, str>,
|
|
|
|
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
|
|
stop_on_panic: bool,
|
|
}
|
|
|
|
impl Builder {
|
|
pub(crate) fn new() -> Self {
|
|
Builder {
|
|
name: Cow::Borrowed("actix"),
|
|
stop_on_panic: false,
|
|
}
|
|
}
|
|
|
|
/// Sets the name of the System.
|
|
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
|
|
self.name = Cow::Owned(name.into());
|
|
self
|
|
}
|
|
|
|
/// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
|
|
/// uncaught panic is thrown from a worker thread.
|
|
///
|
|
/// Defaults to false.
|
|
pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
|
|
self.stop_on_panic = stop_on_panic;
|
|
self
|
|
}
|
|
|
|
/// Create new System.
|
|
///
|
|
/// This method panics if it can not create tokio runtime
|
|
pub fn build(self) -> SystemRunner {
|
|
self.create_runtime(|| {})
|
|
}
|
|
|
|
/// Create new System that can run asynchronously.
|
|
///
|
|
/// This method panics if it cannot start the system arbiter
|
|
pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
|
|
self.create_async_runtime(local)
|
|
}
|
|
|
|
/// This function will start tokio runtime and will finish once the
|
|
/// `System::stop()` message get called.
|
|
/// Function `f` get called within tokio runtime context.
|
|
pub fn run<F>(self, f: F) -> io::Result<()>
|
|
where
|
|
F: FnOnce() + 'static,
|
|
{
|
|
self.create_runtime(f).run()
|
|
}
|
|
|
|
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
|
|
let (stop_tx, stop) = channel();
|
|
let (sys_sender, sys_receiver) = unbounded();
|
|
|
|
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
|
|
|
|
// system arbiter
|
|
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
|
|
|
// start the system arbiter
|
|
let _ = local.spawn_local(arb);
|
|
|
|
AsyncSystemRunner { stop, system }
|
|
}
|
|
|
|
fn create_runtime<F>(self, f: F) -> SystemRunner
|
|
where
|
|
F: FnOnce() + 'static,
|
|
{
|
|
let (stop_tx, stop) = channel();
|
|
let (sys_sender, sys_receiver) = unbounded();
|
|
|
|
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
|
|
|
|
// system arbiter
|
|
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
|
|
|
let mut rt = Runtime::new().unwrap();
|
|
rt.spawn(arb);
|
|
|
|
// init system arbiter and run configuration method
|
|
rt.block_on(lazy(move |_| f()));
|
|
|
|
SystemRunner { rt, stop, system }
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) struct AsyncSystemRunner {
|
|
stop: Receiver<i32>,
|
|
system: System,
|
|
}
|
|
|
|
impl AsyncSystemRunner {
|
|
/// This function will start event loop and returns a future that
|
|
/// resolves once the `System::stop()` function is called.
|
|
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
|
|
let AsyncSystemRunner { stop, .. } = self;
|
|
|
|
// run loop
|
|
lazy(|_| {
|
|
Arbiter::run_system(None);
|
|
async {
|
|
let res = match stop.await {
|
|
Ok(code) => {
|
|
if code != 0 {
|
|
Err(io::Error::new(
|
|
io::ErrorKind::Other,
|
|
format!("Non-zero exit code: {}", code),
|
|
))
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
};
|
|
Arbiter::stop_system();
|
|
return res;
|
|
}
|
|
})
|
|
.flatten()
|
|
}
|
|
}
|
|
|
|
/// Helper object that runs System's event loop
|
|
#[must_use = "SystemRunner must be run"]
|
|
#[derive(Debug)]
|
|
pub struct SystemRunner {
|
|
rt: Runtime,
|
|
stop: Receiver<i32>,
|
|
system: System,
|
|
}
|
|
|
|
impl SystemRunner {
|
|
/// This function will start event loop and will finish once the
|
|
/// `System::stop()` function is called.
|
|
pub fn run(self) -> io::Result<()> {
|
|
let SystemRunner { mut rt, stop, .. } = self;
|
|
|
|
// run loop
|
|
Arbiter::run_system(Some(&rt));
|
|
let result = match rt.block_on(stop) {
|
|
Ok(code) => {
|
|
if code != 0 {
|
|
Err(io::Error::new(
|
|
io::ErrorKind::Other,
|
|
format!("Non-zero exit code: {}", code),
|
|
))
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
};
|
|
Arbiter::stop_system();
|
|
result
|
|
}
|
|
|
|
/// Execute a future and wait for result.
|
|
pub fn block_on<F, O>(&mut self, fut: F) -> O
|
|
where
|
|
F: Future<Output = O> + 'static,
|
|
{
|
|
Arbiter::run_system(Some(&self.rt));
|
|
let res = self.rt.block_on(fut);
|
|
Arbiter::stop_system();
|
|
res
|
|
}
|
|
}
|