actix_rt/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
//! Tokio-based single-threaded async runtime for the Actix ecosystem.
//!
//! In most parts of the the Actix ecosystem, it has been chosen to use !Send futures. For this
//! reason, a single-threaded runtime is appropriate since it is guaranteed that futures will not
//! be moved between threads. This can result in small performance improvements over cases where
//! atomics would otherwise be needed.
//!
//! To achieve similar performance to multi-threaded, work-stealing runtimes, applications
//! using `actix-rt` will create multiple, mostly disconnected, single-threaded runtimes.
//! This approach has good performance characteristics for workloads where the majority of tasks
//! have similar runtime expense.
//!
//! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise
//! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```no_run
//! use std::sync::mpsc;
//! use actix_rt::{Arbiter, System};
//!
//! let _ = System::new();
//!
//! let (tx, rx) = mpsc::channel::<u32>();
//!
//! let arbiter = Arbiter::new();
//! arbiter.spawn_fn(move || tx.send(42).unwrap());
//!
//! let num = rx.recv().unwrap();
//! assert_eq!(num, 42);
//!
//! arbiter.stop();
//! arbiter.join().unwrap();
//! ```
//!
//! # `io-uring` Support
//!
//! There is experimental support for using io-uring with this crate by enabling the
//! `io-uring` feature. For now, it is semver exempt.
//!
//! Note that there are currently some unimplemented parts of using `actix-rt` with `io-uring`.
//! In particular, when running a `System`, only `System::block_on` is supported.
#![deny(rust_2018_idioms, nonstandard_style)]
#![warn(future_incompatible, missing_docs)]
#![allow(clippy::type_complexity)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[cfg(all(not(target_os = "linux"), feature = "io-uring"))]
compile_error!("io_uring is a linux only feature.");
use std::future::Future;
// Cannot define a main macro when compiled into test harness.
// Workaround for https://github.com/rust-lang/rust/issues/62127.
#[cfg(all(feature = "macros", not(test)))]
pub use actix_macros::main;
#[cfg(feature = "macros")]
pub use actix_macros::test;
mod arbiter;
mod runtime;
mod system;
pub use tokio::pin;
use tokio::task::JoinHandle;
pub use self::{
arbiter::{Arbiter, ArbiterHandle},
runtime::Runtime,
system::{System, SystemRunner},
};
pub mod signal {
//! Asynchronous signal handling (Tokio re-exports).
#[cfg(unix)]
pub mod unix {
//! Unix specific signals (Tokio re-exports).
pub use tokio::signal::unix::*;
}
pub use tokio::signal::ctrl_c;
}
pub mod net {
//! TCP/UDP/Unix bindings (mostly Tokio re-exports).
use std::{
future::Future,
io,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, Interest};
#[cfg(unix)]
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
pub use tokio::{
io::Ready,
net::{TcpListener, TcpSocket, TcpStream, UdpSocket},
};
/// Extension trait over async read+write types that can also signal readiness.
#[doc(hidden)]
pub trait ActixStream: AsyncRead + AsyncWrite + Unpin {
/// Poll stream and check read readiness of Self.
///
/// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use.
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
/// Poll stream and check write readiness of Self.
///
/// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use.
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
}
impl ActixStream for TcpStream {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::READABLE);
tokio::pin!(ready);
ready.poll(cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::WRITABLE);
tokio::pin!(ready);
ready.poll(cx)
}
}
#[cfg(unix)]
impl ActixStream for UnixStream {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::READABLE);
tokio::pin!(ready);
ready.poll(cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::WRITABLE);
tokio::pin!(ready);
ready.poll(cx)
}
}
impl<Io: ActixStream + ?Sized> ActixStream for Box<Io> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
(**self).poll_read_ready(cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
(**self).poll_write_ready(cx)
}
}
}
pub mod time {
//! Utilities for tracking time (Tokio re-exports).
pub use tokio::time::{
interval, interval_at, sleep, sleep_until, timeout, Instant, Interval, Sleep, Timeout,
};
}
pub mod task {
//! Task management (Tokio re-exports).
pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
}
/// Spawns a future on the current thread as a new task.
///
/// If not immediately awaited, the task can be cancelled using [`JoinHandle::abort`].
///
/// The provided future is spawned as a new task; therefore, panics are caught.
///
/// # Panics
/// Panics if Actix system is not running.
///
/// # Examples
/// ```
/// # use std::time::Duration;
/// # actix_rt::Runtime::new().unwrap().block_on(async {
/// // task resolves successfully
/// assert_eq!(actix_rt::spawn(async { 1 }).await.unwrap(), 1);
///
/// // task panics
/// assert!(actix_rt::spawn(async {
/// panic!("panic is caught at task boundary");
/// })
/// .await
/// .unwrap_err()
/// .is_panic());
///
/// // task is cancelled before completion
/// let handle = actix_rt::spawn(actix_rt::time::sleep(Duration::from_secs(100)));
/// handle.abort();
/// assert!(handle.await.unwrap_err().is_cancelled());
/// # });
/// ```
#[track_caller]
#[inline]
pub fn spawn<Fut>(f: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + 'static,
Fut::Output: 'static,
{
tokio::task::spawn_local(f)
}