1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 22:51:07 +01:00

update to tokio 1.0 for actix-rt (#236)

This commit is contained in:
fakeshadow 2020-12-28 09:40:22 +08:00 committed by GitHub
parent ba44ea7d0b
commit 0c12930796
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 49 additions and 61 deletions

View File

@ -18,7 +18,7 @@ members = [
[patch.crates-io] [patch.crates-io]
actix-codec = { path = "actix-codec" } actix-codec = { path = "actix-codec" }
actix-connect = { path = "actix-connect" } actix-connect = { path = "actix-connect" }
actix-rt = { path = "actix-rt" } actix-rt = { git = "https://github.com/actix/actix-net.git", ref = "ba44ea7d0bafaf5fccb9a34003d503e1910943eepath" }
actix-macros = { path = "actix-macros" } actix-macros = { path = "actix-macros" }
actix-server = { path = "actix-server" } actix-server = { path = "actix-server" }
actix-service = { path = "actix-service" } actix-service = { path = "actix-service" }

View File

@ -7,8 +7,10 @@
* Add `System::attach_to_tokio` method. [#173] * Add `System::attach_to_tokio` method. [#173]
### Changed ### Changed
* Update `tokio` dependency to `1`
* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. * Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio.
* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`.
These methods would accept &Self when calling.
Remove `'static` lifetime requirement for `System::run` and `Builder::run`. Remove `'static` lifetime requirement for `System::run` and `Builder::run`.
`Arbiter::spawn` would panic when `System` is not in scope. [#207] `Arbiter::spawn` would panic when `System` is not in scope. [#207]

View File

@ -18,9 +18,4 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-macros = "0.1.0" actix-macros = "0.1.0"
futures-channel = "0.3.7" tokio = { version = "1", features = ["rt", "net", "signal", "sync", "time"] }
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
[dev-dependencies]
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
tokio = { version = "0.2.6", features = ["full"] }

View File

@ -7,12 +7,11 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, thread}; use std::{fmt, thread};
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot::{channel, Canceled, Sender}; use tokio::sync::oneshot::{channel, error::RecvError as Canceled, Sender};
// use futures_util::stream::FuturesUnordered; // use futures_util::stream::FuturesUnordered;
// use tokio::task::JoinHandle; // use tokio::task::JoinHandle;
// use tokio::stream::StreamExt; // use tokio::stream::StreamExt;
use tokio::stream::Stream;
use tokio::task::LocalSet; use tokio::task::LocalSet;
use crate::runtime::Runtime; use crate::runtime::Runtime;
@ -70,7 +69,7 @@ impl Default for Arbiter {
impl Arbiter { impl Arbiter {
pub(crate) fn new_system(local: &LocalSet) -> Self { pub(crate) fn new_system(local: &LocalSet) -> Self {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded_channel();
let arb = Arbiter::with_sender(tx); let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
@ -98,7 +97,7 @@ impl Arbiter {
/// Stop arbiter from continuing it's event loop. /// Stop arbiter from continuing it's event loop.
pub fn stop(&self) { pub fn stop(&self) {
let _ = self.sender.unbounded_send(ArbiterCommand::Stop); let _ = self.sender.send(ArbiterCommand::Stop);
} }
/// Spawn new thread and run event loop in spawned thread. /// Spawn new thread and run event loop in spawned thread.
@ -107,14 +106,14 @@ impl Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed); let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id); let name = format!("actix-rt:worker:{}", id);
let sys = System::current(); let sys = System::current();
let (tx, rx) = unbounded(); let (tx, rx) = unbounded_channel();
let handle = thread::Builder::new() let handle = thread::Builder::new()
.name(name.clone()) .name(name.clone())
.spawn({ .spawn({
let tx = tx.clone(); let tx = tx.clone();
move || { move || {
let mut rt = Runtime::new().expect("Can not create Runtime"); let rt = Runtime::new().expect("Can not create Runtime");
let arb = Arbiter::with_sender(tx); let arb = Arbiter::with_sender(tx);
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
@ -126,7 +125,7 @@ impl Arbiter {
// register arbiter // register arbiter
let _ = System::current() let _ = System::current()
.sys() .sys()
.unbounded_send(SystemCommand::RegisterArbiter(id, arb)); .send(SystemCommand::RegisterArbiter(id, arb));
// start arbiter controller // start arbiter controller
// run loop // run loop
@ -135,7 +134,7 @@ impl Arbiter {
// unregister arbiter // unregister arbiter
let _ = System::current() let _ = System::current()
.sys() .sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id)); .send(SystemCommand::UnregisterArbiter(id));
} }
}) })
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
@ -181,9 +180,7 @@ impl Arbiter {
where where
F: Future<Output = ()> + Send + Unpin + 'static, F: Future<Output = ()> + Send + Unpin + 'static,
{ {
let _ = self let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future)));
.sender
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
} }
/// Send a function to the Arbiter's thread, and execute it. Any result from the function /// Send a function to the Arbiter's thread, and execute it. Any result from the function
@ -194,7 +191,7 @@ impl Arbiter {
{ {
let _ = self let _ = self
.sender .sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { .send(ArbiterCommand::ExecuteFn(Box::new(move || {
f(); f();
}))); })));
} }
@ -210,8 +207,8 @@ impl Arbiter {
let (tx, rx) = channel(); let (tx, rx) = channel();
let _ = self let _ = self
.sender .sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { .send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_canceled() { if !tx.is_closed() {
let _ = tx.send(f()); let _ = tx.send(f());
} }
}))); })));
@ -328,7 +325,7 @@ impl Future for ArbiterController {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match Pin::new(&mut self.rx).poll_next(cx) { match Pin::new(&mut self.rx).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item { Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Stop => return Poll::Ready(()),
@ -393,7 +390,7 @@ impl Future for SystemArbiter {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match Pin::new(&mut self.commands).poll_next(cx) { match Pin::new(&mut self.commands).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd { Poll::Ready(Some(cmd)) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {

View File

@ -2,8 +2,8 @@ use std::borrow::Cow;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use futures_channel::mpsc::unbounded; use tokio::sync::mpsc::unbounded_channel;
use futures_channel::oneshot::{channel, Receiver}; use tokio::sync::oneshot::{channel, Receiver};
use tokio::task::LocalSet; use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemArbiter}; use crate::arbiter::{Arbiter, SystemArbiter};
@ -72,7 +72,7 @@ impl Builder {
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded_channel();
let system = let system =
System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic);
@ -91,9 +91,9 @@ impl Builder {
F: FnOnce(), F: FnOnce(),
{ {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded_channel();
let mut rt = Runtime::new().unwrap(); let rt = Runtime::new().unwrap();
let system = System::construct( let system = System::construct(
sys_sender, sys_sender,
@ -157,7 +157,7 @@ impl SystemRunner {
/// This function will start event loop and will finish once the /// This function will start event loop and will finish once the
/// `System::stop()` function is called. /// `System::stop()` function is called.
pub fn run(self) -> io::Result<()> { pub fn run(self) -> io::Result<()> {
let SystemRunner { mut rt, stop, .. } = self; let SystemRunner { rt, stop, .. } = self;
// run loop // run loop
match rt.block_on(stop) { match rt.block_on(stop) {
@ -177,10 +177,7 @@ impl SystemRunner {
/// Execute a future and wait for result. /// Execute a future and wait for result.
#[inline] #[inline]
pub fn block_on<F, O>(&mut self, fut: F) -> O pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
where
F: Future<Output = O>,
{
self.rt.block_on(fut) self.rt.block_on(fut)
} }
} }

View File

@ -58,7 +58,7 @@ pub mod net {
/// Utilities for tracking time. /// Utilities for tracking time.
pub mod time { pub mod time {
pub use tokio::time::Instant; pub use tokio::time::Instant;
pub use tokio::time::{delay_for, delay_until, Delay};
pub use tokio::time::{interval, interval_at, Interval}; pub use tokio::time::{interval, interval_at, Interval};
pub use tokio::time::{sleep, sleep_until, Sleep};
pub use tokio::time::{timeout, Timeout}; pub use tokio::time::{timeout, Timeout};
} }

View File

@ -18,10 +18,9 @@ impl Runtime {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
/// Returns a new runtime initialized with default configuration values. /// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> { pub fn new() -> io::Result<Runtime> {
let rt = runtime::Builder::new() let rt = runtime::Builder::new_current_thread()
.enable_io() .enable_io()
.enable_time() .enable_time()
.basic_scheduler()
.build()?; .build()?;
Ok(Runtime { Ok(Runtime {
@ -48,7 +47,7 @@ impl Runtime {
/// ///
/// # fn dox() { /// # fn dox() {
/// // Create the runtime /// // Create the runtime
/// let mut rt = Runtime::new().unwrap(); /// let rt = Runtime::new().unwrap();
/// ///
/// // Spawn a future onto the runtime /// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|_| { /// rt.spawn(future::lazy(|_| {
@ -86,10 +85,10 @@ impl Runtime {
/// ///
/// The caller is responsible for ensuring that other spawned futures /// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`. /// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&mut self, f: F) -> F::Output pub fn block_on<F>(&self, f: F) -> F::Output
where where
F: Future, F: Future,
{ {
self.local.block_on(&mut self.rt, f) self.local.block_on(&self.rt, f)
} }
} }

View File

@ -3,7 +3,7 @@ use std::future::Future;
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use futures_channel::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::task::LocalSet; use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemCommand}; use crate::arbiter::{Arbiter, SystemCommand};
@ -70,7 +70,7 @@ impl System {
/// ///
/// # Examples /// # Examples
/// ///
/// ``` /// ```rust,ignore
/// use tokio::{runtime::Runtime, task::LocalSet}; /// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System; /// use actix_rt::System;
/// use futures_util::future::try_join_all; /// use futures_util::future::try_join_all;
@ -94,10 +94,9 @@ impl System {
/// } /// }
/// ///
/// ///
/// let mut runtime = tokio::runtime::Builder::new() /// let runtime = tokio::runtime::Builder::new_multi_thread()
/// .core_threads(2) /// .worker_threads(2)
/// .enable_all() /// .enable_all()
/// .threaded_scheduler()
/// .build() /// .build()
/// .unwrap(); /// .unwrap();
/// ///
@ -140,7 +139,7 @@ impl System {
/// ///
/// # Examples /// # Examples
/// ///
/// ``` /// ```rust,ignore
/// use tokio::runtime::Runtime; /// use tokio::runtime::Runtime;
/// use actix_rt::System; /// use actix_rt::System;
/// use futures_util::future::try_join_all; /// use futures_util::future::try_join_all;
@ -164,10 +163,9 @@ impl System {
/// } /// }
/// ///
/// ///
/// let runtime = tokio::runtime::Builder::new() /// let runtime = tokio::runtime::Builder::new_multi_thread()
/// .core_threads(2) /// .worker_threads(2)
/// .enable_all() /// .enable_all()
/// .threaded_scheduler()
/// .build() /// .build()
/// .unwrap(); /// .unwrap();
/// ///
@ -176,7 +174,7 @@ impl System {
/// ``` /// ```
pub fn attach_to_tokio<Fut, R>( pub fn attach_to_tokio<Fut, R>(
name: impl Into<String>, name: impl Into<String>,
mut runtime: tokio::runtime::Runtime, runtime: tokio::runtime::Runtime,
rest_operations: Fut, rest_operations: Fut,
) -> R ) -> R
where where
@ -233,7 +231,7 @@ impl System {
/// Stop the system with a particular exit code. /// Stop the system with a particular exit code.
pub fn stop_with_code(&self, code: i32) { pub fn stop_with_code(&self, code: i32) {
let _ = self.sys.unbounded_send(SystemCommand::Exit(code)); let _ = self.sys.send(SystemCommand::Exit(code));
} }
pub(crate) fn sys(&self) -> &UnboundedSender<SystemCommand> { pub(crate) fn sys(&self) -> &UnboundedSender<SystemCommand> {

View File

@ -5,7 +5,7 @@ fn await_for_timer() {
let time = Duration::from_secs(2); let time = Duration::from_secs(2);
let instant = Instant::now(); let instant = Instant::now();
actix_rt::System::new("test_wait_timer").block_on(async move { actix_rt::System::new("test_wait_timer").block_on(async move {
tokio::time::delay_for(time).await; tokio::time::sleep(time).await;
}); });
assert!( assert!(
instant.elapsed() >= time, instant.elapsed() >= time,
@ -20,7 +20,7 @@ fn join_another_arbiter() {
actix_rt::System::new("test_join_another_arbiter").block_on(async move { actix_rt::System::new("test_join_another_arbiter").block_on(async move {
let mut arbiter = actix_rt::Arbiter::new(); let mut arbiter = actix_rt::Arbiter::new();
arbiter.send(Box::pin(async move { arbiter.send(Box::pin(async move {
tokio::time::delay_for(time).await; tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop(); actix_rt::Arbiter::current().stop();
})); }));
arbiter.join().unwrap(); arbiter.join().unwrap();
@ -35,7 +35,7 @@ fn join_another_arbiter() {
let mut arbiter = actix_rt::Arbiter::new(); let mut arbiter = actix_rt::Arbiter::new();
arbiter.exec_fn(move || { arbiter.exec_fn(move || {
actix_rt::spawn(async move { actix_rt::spawn(async move {
tokio::time::delay_for(time).await; tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop(); actix_rt::Arbiter::current().stop();
}); });
}); });
@ -50,7 +50,7 @@ fn join_another_arbiter() {
actix_rt::System::new("test_join_another_arbiter").block_on(async move { actix_rt::System::new("test_join_another_arbiter").block_on(async move {
let mut arbiter = actix_rt::Arbiter::new(); let mut arbiter = actix_rt::Arbiter::new();
arbiter.send(Box::pin(async move { arbiter.send(Box::pin(async move {
tokio::time::delay_for(time).await; tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop(); actix_rt::Arbiter::current().stop();
})); }));
arbiter.stop(); arbiter.stop();
@ -104,17 +104,17 @@ fn non_static_block_on() {
let string = String::from("test_str"); let string = String::from("test_str");
let str = string.as_str(); let str = string.as_str();
let mut sys = actix_rt::System::new("borrow some"); let sys = actix_rt::System::new("borrow some");
sys.block_on(async { sys.block_on(async {
actix_rt::time::delay_for(Duration::from_millis(1)).await; actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str); assert_eq!("test_str", str);
}); });
let mut rt = actix_rt::Runtime::new().unwrap(); let rt = actix_rt::Runtime::new().unwrap();
rt.block_on(async { rt.block_on(async {
actix_rt::time::delay_for(Duration::from_millis(1)).await; actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str); assert_eq!("test_str", str);
}); });