mirror of
https://github.com/fafhrd91/actix-net
synced 2024-12-19 01:53:11 +01:00
184 lines
5.1 KiB
Rust
184 lines
5.1 KiB
Rust
use std::borrow::Cow;
|
|
use std::future::Future;
|
|
use std::io;
|
|
|
|
use tokio::sync::mpsc::unbounded_channel;
|
|
use tokio::sync::oneshot::{channel, Receiver};
|
|
use tokio::task::LocalSet;
|
|
|
|
use crate::arbiter::{Arbiter, SystemArbiter};
|
|
use crate::runtime::Runtime;
|
|
use crate::system::System;
|
|
|
|
/// Builder struct for a actix runtime.
|
|
///
|
|
/// Either use `Builder::build` to create a system and start actors.
|
|
/// Alternatively, use `Builder::run` to start the tokio runtime and
|
|
/// run a function in its context.
|
|
pub struct Builder {
|
|
/// Name of the System. Defaults to "actix" if unset.
|
|
name: Cow<'static, str>,
|
|
|
|
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
|
|
stop_on_panic: bool,
|
|
}
|
|
|
|
impl Builder {
|
|
pub(crate) fn new() -> Self {
|
|
Builder {
|
|
name: Cow::Borrowed("actix"),
|
|
stop_on_panic: false,
|
|
}
|
|
}
|
|
|
|
/// Sets the name of the System.
|
|
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
|
|
self.name = Cow::Owned(name.into());
|
|
self
|
|
}
|
|
|
|
/// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
|
|
/// uncaught panic is thrown from a worker thread.
|
|
///
|
|
/// Defaults to false.
|
|
pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
|
|
self.stop_on_panic = stop_on_panic;
|
|
self
|
|
}
|
|
|
|
/// Create new System.
|
|
///
|
|
/// This method panics if it can not create tokio runtime
|
|
pub fn build(self) -> SystemRunner {
|
|
self.create_runtime(|| {})
|
|
}
|
|
|
|
/// Create new System that can run asynchronously.
|
|
///
|
|
/// This method panics if it cannot start the system arbiter
|
|
pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
|
|
self.create_async_runtime(local)
|
|
}
|
|
|
|
/// This function will start tokio runtime and will finish once the
|
|
/// `System::stop()` message get called.
|
|
/// Function `f` get called within tokio runtime context.
|
|
pub fn run<F>(self, f: F) -> io::Result<()>
|
|
where
|
|
F: FnOnce(),
|
|
{
|
|
self.create_runtime(f).run()
|
|
}
|
|
|
|
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
|
|
let (stop_tx, stop) = channel();
|
|
let (sys_sender, sys_receiver) = unbounded_channel();
|
|
|
|
let system =
|
|
System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic);
|
|
|
|
// system arbiter
|
|
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
|
|
|
// start the system arbiter
|
|
let _ = local.spawn_local(arb);
|
|
|
|
AsyncSystemRunner { stop, system }
|
|
}
|
|
|
|
fn create_runtime<F>(self, f: F) -> SystemRunner
|
|
where
|
|
F: FnOnce(),
|
|
{
|
|
let (stop_tx, stop) = channel();
|
|
let (sys_sender, sys_receiver) = unbounded_channel();
|
|
|
|
let rt = Runtime::new().unwrap();
|
|
|
|
let system = System::construct(
|
|
sys_sender,
|
|
Arbiter::new_system(rt.local()),
|
|
self.stop_on_panic,
|
|
);
|
|
|
|
// system arbiter
|
|
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
|
|
|
rt.spawn(arb);
|
|
|
|
// init system arbiter and run configuration method
|
|
rt.block_on(async { f() });
|
|
|
|
SystemRunner { rt, stop, system }
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) struct AsyncSystemRunner {
|
|
stop: Receiver<i32>,
|
|
system: System,
|
|
}
|
|
|
|
impl AsyncSystemRunner {
|
|
/// This function will start event loop and returns a future that
|
|
/// resolves once the `System::stop()` function is called.
|
|
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
|
|
let AsyncSystemRunner { stop, .. } = self;
|
|
|
|
// run loop
|
|
async {
|
|
match stop.await {
|
|
Ok(code) => {
|
|
if code != 0 {
|
|
Err(io::Error::new(
|
|
io::ErrorKind::Other,
|
|
format!("Non-zero exit code: {}", code),
|
|
))
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Helper object that runs System's event loop
|
|
#[must_use = "SystemRunner must be run"]
|
|
#[derive(Debug)]
|
|
pub struct SystemRunner {
|
|
rt: Runtime,
|
|
stop: Receiver<i32>,
|
|
system: System,
|
|
}
|
|
|
|
impl SystemRunner {
|
|
/// This function will start event loop and will finish once the
|
|
/// `System::stop()` function is called.
|
|
pub fn run(self) -> io::Result<()> {
|
|
let SystemRunner { rt, stop, .. } = self;
|
|
|
|
// run loop
|
|
match rt.block_on(stop) {
|
|
Ok(code) => {
|
|
if code != 0 {
|
|
Err(io::Error::new(
|
|
io::ErrorKind::Other,
|
|
format!("Non-zero exit code: {}", code),
|
|
))
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
}
|
|
}
|
|
|
|
/// Execute a future and wait for result.
|
|
#[inline]
|
|
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
|
self.rt.block_on(fut)
|
|
}
|
|
}
|