diff --git a/Cargo.toml b/Cargo.toml index f032478a..cea3ee3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ members = [ [patch.crates-io] actix-codec = { path = "actix-codec" } actix-connect = { path = "actix-connect" } -actix-rt = { path = "actix-rt" } +actix-rt = { git = "https://github.com/actix/actix-net.git", ref = "ba44ea7d0bafaf5fccb9a34003d503e1910943eepath" } actix-macros = { path = "actix-macros" } actix-server = { path = "actix-server" } actix-service = { path = "actix-service" } diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index c3479db1..8e73cef2 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -7,8 +7,10 @@ * Add `System::attach_to_tokio` method. [#173] ### Changed - -* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. +* Update `tokio` dependency to `1` +* Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio. +* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. + These methods would accept &Self when calling. Remove `'static` lifetime requirement for `System::run` and `Builder::run`. `Arbiter::spawn` would panic when `System` is not in scope. [#207] diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 57710a7f..eff206f2 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -18,9 +18,4 @@ path = "src/lib.rs" [dependencies] actix-macros = "0.1.0" -futures-channel = "0.3.7" -tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } - -[dev-dependencies] -futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } -tokio = { version = "0.2.6", features = ["full"] } +tokio = { version = "1", features = ["rt", "net", "signal", "sync", "time"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 3fe81b99..7aae7cd2 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -7,12 +7,11 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::{fmt, thread}; -use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures_channel::oneshot::{channel, Canceled, Sender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot::{channel, error::RecvError as Canceled, Sender}; // use futures_util::stream::FuturesUnordered; // use tokio::task::JoinHandle; // use tokio::stream::StreamExt; -use tokio::stream::Stream; use tokio::task::LocalSet; use crate::runtime::Runtime; @@ -70,7 +69,7 @@ impl Default for Arbiter { impl Arbiter { pub(crate) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = unbounded(); + let (tx, rx) = unbounded_channel(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); @@ -98,7 +97,7 @@ impl Arbiter { /// Stop arbiter from continuing it's event loop. pub fn stop(&self) { - let _ = self.sender.unbounded_send(ArbiterCommand::Stop); + let _ = self.sender.send(ArbiterCommand::Stop); } /// Spawn new thread and run event loop in spawned thread. @@ -107,14 +106,14 @@ impl Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); let sys = System::current(); - let (tx, rx) = unbounded(); + let (tx, rx) = unbounded_channel(); let handle = thread::Builder::new() .name(name.clone()) .spawn({ let tx = tx.clone(); move || { - let mut rt = Runtime::new().expect("Can not create Runtime"); + let rt = Runtime::new().expect("Can not create Runtime"); let arb = Arbiter::with_sender(tx); STORAGE.with(|cell| cell.borrow_mut().clear()); @@ -126,7 +125,7 @@ impl Arbiter { // register arbiter let _ = System::current() .sys() - .unbounded_send(SystemCommand::RegisterArbiter(id, arb)); + .send(SystemCommand::RegisterArbiter(id, arb)); // start arbiter controller // run loop @@ -135,7 +134,7 @@ impl Arbiter { // unregister arbiter let _ = System::current() .sys() - .unbounded_send(SystemCommand::UnregisterArbiter(id)); + .send(SystemCommand::UnregisterArbiter(id)); } }) .unwrap_or_else(|err| { @@ -181,9 +180,7 @@ impl Arbiter { where F: Future + Send + Unpin + 'static, { - let _ = self - .sender - .unbounded_send(ArbiterCommand::Execute(Box::new(future))); + let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future))); } /// Send a function to the Arbiter's thread, and execute it. Any result from the function @@ -194,7 +191,7 @@ impl Arbiter { { let _ = self .sender - .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { + .send(ArbiterCommand::ExecuteFn(Box::new(move || { f(); }))); } @@ -210,8 +207,8 @@ impl Arbiter { let (tx, rx) = channel(); let _ = self .sender - .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { - if !tx.is_canceled() { + .send(ArbiterCommand::ExecuteFn(Box::new(move || { + if !tx.is_closed() { let _ = tx.send(f()); } }))); @@ -328,7 +325,7 @@ impl Future for ArbiterController { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.rx).poll_next(cx) { + match Pin::new(&mut self.rx).poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(item)) => match item { ArbiterCommand::Stop => return Poll::Ready(()), @@ -393,7 +390,7 @@ impl Future for SystemArbiter { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.commands).poll_next(cx) { + match Pin::new(&mut self.commands).poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(cmd)) => match cmd { SystemCommand::Exit(code) => { diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 83aed064..ff7b0e06 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -2,8 +2,8 @@ use std::borrow::Cow; use std::future::Future; use std::io; -use futures_channel::mpsc::unbounded; -use futures_channel::oneshot::{channel, Receiver}; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::oneshot::{channel, Receiver}; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; @@ -72,7 +72,7 @@ impl Builder { fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); + let (sys_sender, sys_receiver) = unbounded_channel(); let system = System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); @@ -91,9 +91,9 @@ impl Builder { F: FnOnce(), { let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); + let (sys_sender, sys_receiver) = unbounded_channel(); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); let system = System::construct( sys_sender, @@ -157,7 +157,7 @@ impl SystemRunner { /// This function will start event loop and will finish once the /// `System::stop()` function is called. pub fn run(self) -> io::Result<()> { - let SystemRunner { mut rt, stop, .. } = self; + let SystemRunner { rt, stop, .. } = self; // run loop match rt.block_on(stop) { @@ -177,10 +177,7 @@ impl SystemRunner { /// Execute a future and wait for result. #[inline] - pub fn block_on(&mut self, fut: F) -> O - where - F: Future, - { + pub fn block_on(&self, fut: F) -> F::Output { self.rt.block_on(fut) } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index b2e23c0f..3fd94bf9 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -58,7 +58,7 @@ pub mod net { /// Utilities for tracking time. pub mod time { pub use tokio::time::Instant; - pub use tokio::time::{delay_for, delay_until, Delay}; pub use tokio::time::{interval, interval_at, Interval}; + pub use tokio::time::{sleep, sleep_until, Sleep}; pub use tokio::time::{timeout, Timeout}; } diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 7ee02b02..a72f492c 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -18,10 +18,9 @@ impl Runtime { #[allow(clippy::new_ret_no_self)] /// Returns a new runtime initialized with default configuration values. pub fn new() -> io::Result { - let rt = runtime::Builder::new() + let rt = runtime::Builder::new_current_thread() .enable_io() .enable_time() - .basic_scheduler() .build()?; Ok(Runtime { @@ -48,7 +47,7 @@ impl Runtime { /// /// # fn dox() { /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// /// // Spawn a future onto the runtime /// rt.spawn(future::lazy(|_| { @@ -86,10 +85,10 @@ impl Runtime { /// /// The caller is responsible for ensuring that other spawned futures /// complete execution by calling `block_on` or `run`. - pub fn block_on(&mut self, f: F) -> F::Output + pub fn block_on(&self, f: F) -> F::Output where F: Future, { - self.local.block_on(&mut self.rt, f) + self.local.block_on(&self.rt, f) } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 16b96439..1fbbc0ee 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -3,7 +3,7 @@ use std::future::Future; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures_channel::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemCommand}; @@ -70,7 +70,7 @@ impl System { /// /// # Examples /// - /// ``` + /// ```rust,ignore /// use tokio::{runtime::Runtime, task::LocalSet}; /// use actix_rt::System; /// use futures_util::future::try_join_all; @@ -94,10 +94,9 @@ impl System { /// } /// /// - /// let mut runtime = tokio::runtime::Builder::new() - /// .core_threads(2) + /// let runtime = tokio::runtime::Builder::new_multi_thread() + /// .worker_threads(2) /// .enable_all() - /// .threaded_scheduler() /// .build() /// .unwrap(); /// @@ -140,7 +139,7 @@ impl System { /// /// # Examples /// - /// ``` + /// ```rust,ignore /// use tokio::runtime::Runtime; /// use actix_rt::System; /// use futures_util::future::try_join_all; @@ -164,10 +163,9 @@ impl System { /// } /// /// - /// let runtime = tokio::runtime::Builder::new() - /// .core_threads(2) + /// let runtime = tokio::runtime::Builder::new_multi_thread() + /// .worker_threads(2) /// .enable_all() - /// .threaded_scheduler() /// .build() /// .unwrap(); /// @@ -176,7 +174,7 @@ impl System { /// ``` pub fn attach_to_tokio( name: impl Into, - mut runtime: tokio::runtime::Runtime, + runtime: tokio::runtime::Runtime, rest_operations: Fut, ) -> R where @@ -233,7 +231,7 @@ impl System { /// Stop the system with a particular exit code. pub fn stop_with_code(&self, code: i32) { - let _ = self.sys.unbounded_send(SystemCommand::Exit(code)); + let _ = self.sys.send(SystemCommand::Exit(code)); } pub(crate) fn sys(&self) -> &UnboundedSender { diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index b3265476..12ceb4ef 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -5,7 +5,7 @@ fn await_for_timer() { let time = Duration::from_secs(2); let instant = Instant::now(); actix_rt::System::new("test_wait_timer").block_on(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; }); assert!( instant.elapsed() >= time, @@ -20,7 +20,7 @@ fn join_another_arbiter() { actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); arbiter.send(Box::pin(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); arbiter.join().unwrap(); @@ -35,7 +35,7 @@ fn join_another_arbiter() { let mut arbiter = actix_rt::Arbiter::new(); arbiter.exec_fn(move || { actix_rt::spawn(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); }); }); @@ -50,7 +50,7 @@ fn join_another_arbiter() { actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); arbiter.send(Box::pin(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); arbiter.stop(); @@ -104,17 +104,17 @@ fn non_static_block_on() { let string = String::from("test_str"); let str = string.as_str(); - let mut sys = actix_rt::System::new("borrow some"); + let sys = actix_rt::System::new("borrow some"); sys.block_on(async { - actix_rt::time::delay_for(Duration::from_millis(1)).await; + actix_rt::time::sleep(Duration::from_millis(1)).await; assert_eq!("test_str", str); }); - let mut rt = actix_rt::Runtime::new().unwrap(); + let rt = actix_rt::Runtime::new().unwrap(); rt.block_on(async { - actix_rt::time::delay_for(Duration::from_millis(1)).await; + actix_rt::time::sleep(Duration::from_millis(1)).await; assert_eq!("test_str", str); });