actix_rt/
runtime.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::{future::Future, io};

use tokio::task::{JoinHandle, LocalSet};

/// A Tokio-based runtime proxy.
///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures.
#[derive(Debug)]
pub struct Runtime {
    local: LocalSet,
    rt: tokio::runtime::Runtime,
}

pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> {
    tokio::runtime::Builder::new_current_thread()
        .enable_io()
        .enable_time()
        .build()
}

impl Runtime {
    /// Returns a new runtime initialized with default configuration values.
    #[allow(clippy::new_ret_no_self)]
    pub fn new() -> io::Result<Self> {
        let rt = default_tokio_runtime()?;

        Ok(Runtime {
            rt,
            local: LocalSet::new(),
        })
    }

    /// Offload a future onto the single-threaded runtime.
    ///
    /// The returned join handle can be used to await the future's result.
    ///
    /// See [crate root][crate] documentation for more details.
    ///
    /// # Examples
    /// ```
    /// let rt = actix_rt::Runtime::new().unwrap();
    ///
    /// // Spawn a future onto the runtime
    /// let handle = rt.spawn(async {
    ///     println!("running on the runtime");
    ///     42
    /// });
    ///
    /// assert_eq!(rt.block_on(handle).unwrap(), 42);
    /// ```
    ///
    /// # Panics
    /// This function panics if the spawn fails. Failure occurs if the executor is currently at
    /// capacity and is unable to spawn a new future.
    #[track_caller]
    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
    where
        F: Future + 'static,
    {
        self.local.spawn_local(future)
    }

    /// Retrieves a reference to the underlying Tokio runtime associated with this instance.
    ///
    /// The Tokio runtime is responsible for executing asynchronous tasks and managing
    /// the event loop for an asynchronous Rust program. This method allows accessing
    /// the runtime to interact with its features directly.
    ///
    /// In a typical use case, you might need to share the same runtime between different
    /// modules of your project. For example, a module might require a `tokio::runtime::Handle`
    /// to spawn tasks on the same runtime, or the runtime itself to configure more complex
    /// behaviours.
    ///
    /// # Example
    ///
    /// ```
    /// use actix_rt::Runtime;
    ///
    /// mod module_a {
    ///     pub fn do_something(handle: tokio::runtime::Handle) {
    ///         handle.spawn(async {
    ///             // Some asynchronous task here
    ///         });
    ///     }
    /// }
    ///
    /// mod module_b {
    ///     pub fn do_something_else(rt: &tokio::runtime::Runtime) {
    ///         rt.spawn(async {
    ///             // Another asynchronous task here
    ///         });
    ///     }
    /// }
    ///
    /// let actix_runtime = actix_rt::Runtime::new().unwrap();
    /// let tokio_runtime = actix_runtime.tokio_runtime();
    ///
    /// let handle = tokio_runtime.handle().clone();
    ///
    /// module_a::do_something(handle);
    /// module_b::do_something_else(tokio_runtime);
    /// ```
    ///
    /// # Returns
    ///
    /// An immutable reference to the `tokio::runtime::Runtime` instance associated with this
    /// `Runtime` instance.
    ///
    /// # Note
    ///
    /// While this method provides an immutable reference to the Tokio runtime, which is safe to share across threads,
    /// be aware that spawning blocking tasks on the Tokio runtime could potentially impact the execution
    /// of the Actix runtime. This is because Tokio is responsible for driving the Actix system,
    /// and blocking tasks could delay or deadlock other tasks in run loop.
    pub fn tokio_runtime(&self) -> &tokio::runtime::Runtime {
        &self.rt
    }

    /// Runs the provided future, blocking the current thread until the future completes.
    ///
    /// This function can be used to synchronously block the current thread until the provided
    /// `future` has resolved either successfully or with an error. The result of the future is
    /// then returned from this function call.
    ///
    /// Note that this function will also execute any spawned futures on the current thread, but
    /// will not block until these other spawned futures have completed. Once the function returns,
    /// any uncompleted futures remain pending in the `Runtime` instance. These futures will not run
    /// until `block_on` or `run` is called again.
    ///
    /// The caller is responsible for ensuring that other spawned futures complete execution by
    /// calling `block_on` or `run`.
    #[track_caller]
    pub fn block_on<F>(&self, f: F) -> F::Output
    where
        F: Future,
    {
        self.local.block_on(&self.rt, f)
    }
}

impl From<tokio::runtime::Runtime> for Runtime {
    fn from(rt: tokio::runtime::Runtime) -> Self {
        Self {
            local: LocalSet::new(),
            rt,
        }
    }
}