1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-18 19:45:31 +02:00

Compare commits

..

2 Commits

Author SHA1 Message Date
Rob Ede
45e3a5c0e6 Merge branch 'master' into remove-mpsc 2021-02-24 09:48:50 +00:00
Rob Ede
564acfbf3a remove mpsc impl from -utils 2021-02-24 02:11:08 +00:00
15 changed files with 449 additions and 859 deletions

View File

@@ -235,7 +235,7 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer to underlying I/O stream.
pub fn flush<I>(
pub fn poll_flush<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), U::Error>>
@@ -271,7 +271,7 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer and shutdown underlying I/O stream.
pub fn close<I>(
pub fn poll_close<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), U::Error>>
@@ -319,11 +319,11 @@ where
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.flush(cx)
self.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close(cx)
self.poll_close(cx)
}
}

View File

@@ -26,7 +26,6 @@ macros = ["actix-macros"]
actix-macros = { version = "0.2.0", optional = true }
futures-core = { version = "0.3", default-features = false }
futures-intrusive = "0.4"
tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[dev-dependencies]

View File

@@ -9,9 +9,6 @@ use std::{
};
use futures_core::ready;
use futures_intrusive::channel::shared::{
oneshot_broadcast_channel, OneshotBroadcastReceiver, OneshotBroadcastSender,
};
use tokio::{sync::mpsc, task::LocalSet};
use crate::{
@@ -43,26 +40,11 @@ impl fmt::Debug for ArbiterCommand {
#[derive(Debug, Clone)]
pub struct ArbiterHandle {
tx: mpsc::UnboundedSender<ArbiterCommand>,
/// Is `None` for system arbiter.
stopped_rx: Option<OneshotBroadcastReceiver<()>>,
}
impl ArbiterHandle {
pub(crate) fn new(
tx: mpsc::UnboundedSender<ArbiterCommand>,
stopped_rx: OneshotBroadcastReceiver<()>,
) -> Self {
Self {
tx,
stopped_rx: Some(stopped_rx),
}
}
pub(crate) fn for_system(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
Self {
tx,
stopped_rx: None,
}
pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
Self { tx }
}
/// Send a future to the [Arbiter]'s thread and spawn it.
@@ -99,25 +81,6 @@ impl ArbiterHandle {
pub fn stop(&self) -> bool {
self.tx.send(ArbiterCommand::Stop).is_ok()
}
/// Will wait for associated [Arbiter] to complete all commands up until it is stopped.
///
/// For [Arbiter]s that have already stopped, the future will resolve immediately.
///
/// # Panics
/// Panics if called on the system Arbiter. In this situation the Arbiter's lifetime is
/// implicitly bound by the main thread's lifetime.
pub async fn join(self) {
match self.stopped_rx {
Some(rx) => {
rx.receive().await;
}
None => {
// TODO: decide if this is correct
panic!("cannot wait on the system Arbiter's completion")
}
}
}
}
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
@@ -126,10 +89,8 @@ impl ArbiterHandle {
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)]
pub struct Arbiter {
id: usize,
stopped_tx: OneshotBroadcastSender<()>,
cmd_tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: Option<thread::JoinHandle<()>>,
tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>,
}
impl Arbiter {
@@ -138,7 +99,7 @@ impl Arbiter {
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)]
pub fn new() -> ArbiterHandle {
pub fn new() -> Arbiter {
Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
})
@@ -148,109 +109,74 @@ impl Arbiter {
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> ArbiterHandle
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
{
eprintln!("get sys current");
let sys = System::current();
eprintln!("get sys id");
let system_id = sys.id();
eprintln!("calc arb id");
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();
// let ready_barrier = Arc::new(Barrier::new(2));
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
let (stopped_tx, stopped_rx) = oneshot_broadcast_channel::<()>();
eprintln!("make arb handle");
let hnd = ArbiterHandle::new(cmd_tx.clone(), stopped_rx);
eprintln!("make thread");
let thread_handle = thread::Builder::new()
.name(name.clone())
.spawn({
let hnd = hnd.clone();
// let ready_barrier = Arc::clone(&ready_barrier);
let tx = tx.clone();
move || {
eprintln!("thread: make rt");
let rt = Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx);
eprintln!("thread: set sys");
System::set_current(sys);
// // wait until register message is sent
// eprintln!("thread: wait for arb registered");
// ready_barrier.wait();
eprintln!("thread: set arb handle");
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
ready_tx.send(()).unwrap();
// run arbiter event processing loop
eprintln!("thread: block on arbiter loop");
rt.block_on(ArbiterLoop { cmd_rx });
rt.block_on(ArbiterRunner { rx });
// deregister arbiter
eprintln!("thread: send deregister arbiter message");
System::current()
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(arb_id))
.unwrap();
.send(SystemCommand::DeregisterArbiter(arb_id));
}
})
.unwrap_or_else(|err| {
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
});
let arb = Arbiter {
id: arb_id,
cmd_tx,
stopped_tx,
thread_handle: Some(thread_handle),
};
ready_rx.recv().unwrap();
// register arbiter
eprintln!("send register arbiter message");
System::current()
.tx()
.send(SystemCommand::RegisterArbiter(arb))
.unwrap();
// eprintln!("inform arbiter that it is registered");
// ready_barrier.wait();
eprintln!("arbiter::new done");
hnd
Arbiter { tx, thread_handle }
}
/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
pub(crate) fn for_system(local: &LocalSet) -> ArbiterHandle {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel();
let hnd = ArbiterHandle::for_system(cmd_tx);
let hnd = ArbiterHandle::new(tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
local.spawn_local(ArbiterLoop { cmd_rx });
local.spawn_local(ArbiterRunner { rx });
hnd
}
/// Return `Arbiter`'s numeric ID.
pub(crate) fn id(&self) -> usize {
self.id
/// Return a handle to the this Arbiter's message sender.
pub fn handle(&self) -> ArbiterHandle {
ArbiterHandle::new(self.tx.clone())
}
// /// Return a handle to the this Arbiter's message sender.
// pub fn handle(&self) -> ArbiterHandle {
// ArbiterHandle::new(self.cmd_tx.clone())
// }
/// Return a handle to the current thread's Arbiter's message sender.
///
/// # Panics
@@ -265,68 +191,57 @@ impl Arbiter {
/// Stop Arbiter from continuing it's event loop.
///
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
pub(crate) fn stop(&self) -> bool {
self.cmd_tx.send(ArbiterCommand::Stop).is_ok()
pub fn stop(&self) -> bool {
self.tx.send(ArbiterCommand::Stop).is_ok()
}
// /// Send a future to the Arbiter's thread and spawn it.
// ///
// /// If you require a result, include a response channel in the future.
// ///
// /// Returns true if future was sent successfully and false if the Arbiter has died.
// pub fn spawn<Fut>(&self, future: Fut) -> bool
// where
// Fut: Future<Output = ()> + Send + 'static,
// {
// self.cmd_tx
// .send(ArbiterCommand::Execute(Box::pin(future)))
// .is_ok()
// }
/// Send a future to the Arbiter's thread and spawn it.
///
/// If you require a result, include a response channel in the future.
///
/// Returns true if future was sent successfully and false if the Arbiter has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool
where
Fut: Future<Output = ()> + Send + 'static,
{
self.tx
.send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok()
}
// /// Send a function to the Arbiter's thread and execute it.
// ///
// /// Any result from the function is discarded. If you require a result, include a response
// /// channel in the function.
// ///
// /// Returns true if function was sent successfully and false if the Arbiter has died.
// pub fn spawn_fn<F>(&self, f: F) -> bool
// where
// F: FnOnce() + Send + 'static,
// {
// self.spawn(async { f() })
// }
}
/// Send a function to the Arbiter's thread and execute it.
///
/// Any result from the function is discarded. If you require a result, include a response
/// channel in the function.
///
/// Returns true if function was sent successfully and false if the Arbiter has died.
pub fn spawn_fn<F>(&self, f: F) -> bool
where
F: FnOnce() + Send + 'static,
{
self.spawn(async { f() })
}
impl Drop for Arbiter {
fn drop(&mut self) {
eprintln!("Arb::drop: joining arbiter thread");
match self.thread_handle.take().unwrap().join() {
Ok(()) => {}
Err(err) => {
eprintln!("arbiter {} thread panicked: {:?}", self.id(), err)
}
}
eprintln!("Arb::drop: sending stopped tx");
// could fail if all handles are dropped already so ignore result
let _ = self.stopped_tx.send(());
eprintln!("Arb::drop: done");
/// Wait for Arbiter's event loop to complete.
///
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
pub fn join(self) -> thread::Result<()> {
self.thread_handle.join()
}
}
/// A persistent future that processes [Arbiter] commands.
struct ArbiterLoop {
cmd_rx: mpsc::UnboundedReceiver<ArbiterCommand>,
struct ArbiterRunner {
rx: mpsc::UnboundedReceiver<ArbiterCommand>,
}
impl Future for ArbiterLoop {
impl Future for ArbiterRunner {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel
loop {
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
// channel closed; no more messages can be received
None => return Poll::Ready(()),

View File

@@ -16,12 +16,12 @@
//!
//! # Examples
//! ```
//! use std::sync::mpsc::channel as std_channel;
//! use std::sync::mpsc;
//! use actix_rt::{Arbiter, System};
//!
//! let sys = System::new();
//! let _ = System::new();
//!
//! let (tx, rx) = std_channel::<u32>();
//! let (tx, rx) = mpsc::channel::<u32>();
//!
//! let arbiter = Arbiter::new();
//! arbiter.spawn_fn(move || tx.send(42).unwrap());
@@ -30,10 +30,7 @@
//! assert_eq!(num, 42);
//!
//! arbiter.stop();
//! sys.block_on(arbiter.join());
//!
//! System::current().stop();
//! sys.run().unwrap();
//! arbiter.join().unwrap();
//! ```
#![deny(rust_2018_idioms, nonstandard_style)]

View File

@@ -54,9 +54,14 @@ impl System {
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::for_system(rt.local_set());
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_ctrl);
@@ -145,10 +150,7 @@ impl System {
}
/// Runner that keeps a [System]'s event loop alive until stop message is received.
///
/// Dropping the `SystemRunner` (eg. `let _ = System::new();`) will result in no further events
/// being processed. It is required you bind the runner and call `run` or call `block_on`.
#[must_use = "A SystemRunner does nothing unless `run` or `block_on` is called."]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
@@ -188,7 +190,7 @@ impl SystemRunner {
#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),
RegisterArbiter(Arbiter),
RegisterArbiter(usize, ArbiterHandle),
DeregisterArbiter(usize),
}
@@ -198,7 +200,7 @@ pub(crate) enum SystemCommand {
pub(crate) struct SystemController {
stop_tx: Option<oneshot::Sender<i32>>,
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
arbiters: HashMap<usize, Arbiter>,
arbiters: HashMap<usize, ArbiterHandle>,
}
impl SystemController {
@@ -219,42 +221,35 @@ impl Future for SystemController {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel
let code = loop {
loop {
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
// channel closed; no more messages can be received
None => break 0,
None => return Poll::Ready(()),
// process system command
Some(cmd) => match cmd {
SystemCommand::Exit(code) => {
// stop all arbiters
for arb in self.arbiters.values() {
eprintln!("SystemController: stopping arbiter {}", arb.id());
arb.stop();
}
eprintln!("SystemController: dropping arbiters");
// destroy all arbiters
// drop waits for threads to complete
self.arbiters.clear();
break code;
// stop event loop
// will only fire once
if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(code);
}
}
SystemCommand::RegisterArbiter(arb) => {
self.arbiters.insert(arb.id(), arb);
SystemCommand::RegisterArbiter(id, arb) => {
self.arbiters.insert(id, arb);
}
SystemCommand::DeregisterArbiter(id) => {
// implicit arbiter drop
let _ = self.arbiters.remove(&id);
self.arbiters.remove(&id);
}
},
}
};
self.stop_tx.take().unwrap().send(code).unwrap();
Poll::Ready(())
}
}
}

View File

@@ -1,44 +0,0 @@
//! Derived from this comment:
//! https://github.com/actix/actix/issues/464#issuecomment-779427825
use std::{thread, time::Duration};
use actix_rt::{Arbiter, System};
use tokio::sync::mpsc;
#[test]
fn actix_sample() {
let sys = System::new();
let arb = Arbiter::new();
let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
// create "actor"
arb.spawn_fn(move || {
let a = A;
actix_rt::spawn(async move {
while let Some(_) = rx.recv().await {
println!("{:?}", a);
}
});
});
System::current().stop();
// all arbiters must be dropped when sys.run returns
sys.run().unwrap();
thread::sleep(Duration::from_millis(100));
}
#[derive(Debug)]
struct A;
impl Drop for A {
fn drop(&mut self) {
println!("start drop");
thread::sleep(Duration::from_millis(200));
println!("finish drop");
}
}

View File

@@ -1,120 +0,0 @@
use std::{
sync::mpsc::channel as std_channel,
time::{Duration, Instant},
};
use actix_rt::{time, Arbiter, System};
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[test]
fn join_arbiter_wait_fut() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(async move {
time::sleep(time).await;
Arbiter::current().stop();
});
arbiter.join().await;
});
assert!(
instant.elapsed() >= time,
"Join on another arbiter should complete only when it calls stop"
);
}
#[test]
fn join_arbiter_wait_fn() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || {
actix_rt::spawn(async move {
time::sleep(time).await;
Arbiter::current().stop();
});
});
arbiter.join().await;
});
assert!(
instant.elapsed() >= time,
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
);
}
#[test]
fn join_arbiter_early_stop_call() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move {
time::sleep(time).await;
Arbiter::current().stop();
}));
arbiter.stop();
arbiter.join().await;
});
assert!(
instant.elapsed() < time,
"Premature stop of Arbiter should conclude regardless of it's current state."
);
}
#[test]
fn arbiter_spawn_fn_runs() {
let sys = System::new();
let (tx, rx) = std_channel::<u32>();
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || {
tx.send(42).unwrap();
System::current().stop();
});
let num = rx.recv().unwrap();
assert_eq!(num, 42);
sys.run().unwrap();
}
#[test]
fn arbiter_inner_panic() {
let sys = System::new();
let (tx, rx) = std_channel::<u32>();
let arbiter = Arbiter::new();
// spawned panics should not cause arbiter to crash
arbiter.spawn(async { panic!("inner panic; will be caught") });
arbiter.spawn_fn(|| panic!("inner panic; will be caught"));
arbiter.spawn(async move { tx.send(42).unwrap() });
let num = rx.recv().unwrap();
assert_eq!(num, 42);
System::current().stop();
sys.run().unwrap();
}

View File

@@ -1,82 +0,0 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel as std_channel,
Arc,
},
time::Duration,
};
use actix_rt::{Arbiter, System};
#[test]
fn wait_for_spawns() {
let rt = actix_rt::Runtime::new().unwrap();
let handle = rt.spawn(async {
println!("running on the runtime");
// assertion panic is caught at task boundary
assert_eq!(1, 2);
});
assert!(rt.block_on(handle).is_err());
}
#[test]
fn new_system_with_tokio() {
let (tx, rx) = std_channel();
let res = System::with_tokio_rt(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_keep_alive(Duration::from_millis(1000))
.worker_threads(2)
.max_blocking_threads(2)
.on_thread_start(|| {})
.on_thread_stop(|| {})
.build()
.unwrap()
})
.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
tokio::task::spawn(async move {
tx.send(42).unwrap();
})
.await
.unwrap();
123usize
});
assert_eq!(res, 123);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn new_arbiter_with_tokio() {
let sys = System::new();
let arb = Arbiter::with_tokio_rt(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
let counter = Arc::new(AtomicBool::new(true));
let counter1 = counter.clone();
let did_spawn = arb.spawn(async move {
actix_rt::time::sleep(Duration::from_millis(1)).await;
counter1.store(false, Ordering::SeqCst);
Arbiter::current().stop();
System::current().stop();
});
sys.run().unwrap();
assert!(did_spawn);
assert_eq!(false, counter.load(Ordering::SeqCst));
}

View File

@@ -1,130 +0,0 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
use actix_rt::{time, Arbiter, System};
use tokio::sync::oneshot;
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
}
#[test]
fn try_current_with_system() {
System::new().block_on(async { assert!(System::try_current().is_some()) });
}
#[test]
fn non_static_block_on() {
let string = String::from("test_str");
let string = string.as_str();
let sys = System::new();
sys.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
let rt = actix_rt::Runtime::new().unwrap();
rt.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
System::current().stop();
sys.run().unwrap();
}
#[test]
fn await_for_timer() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
time::sleep(time).await;
});
assert!(
instant.elapsed() >= time,
"Calling `block_on` should poll awaited future to completion."
);
}
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
let (tx, rx) = oneshot::channel();
let sys = System::current();
thread::spawn(|| {
// this thread will have no arbiter in it's thread local so call will panic
Arbiter::current();
})
.join()
.unwrap_err();
let thread = thread::spawn(|| {
// this thread will have no arbiter in it's thread local so use the system handle instead
System::set_current(sys);
let sys = System::current();
let arb = sys.arbiter();
arb.spawn(async move {
tx.send(42u32).unwrap();
System::current().stop();
});
});
assert_eq!(runner.block_on(rx).unwrap(), 42);
thread.join().unwrap();
}
struct Atom(Arc<AtomicBool>);
impl Drop for Atom {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}
#[test]
fn system_stop_arbiter_join_barrier() {
let sys = System::new();
let arb = Arbiter::new();
let atom = Atom(Arc::new(AtomicBool::new(false)));
// arbiter should be alive to receive spawn msg
assert!(Arbiter::current().spawn_fn(|| {}));
assert!(arb.spawn_fn(move || {
// thread should get dropped during sleep
thread::sleep(Duration::from_secs(2));
// pointless load to move atom into scope
atom.0.load(Ordering::SeqCst);
panic!("spawned fn (thread) should be dropped during sleep");
}));
System::current().stop();
sys.run().unwrap();
// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
assert!(!arb.spawn_fn(|| {}));
}

300
actix-rt/tests/tests.rs Normal file
View File

@@ -0,0 +1,300 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
thread,
time::{Duration, Instant},
};
use actix_rt::{Arbiter, System};
use tokio::sync::oneshot;
#[test]
fn await_for_timer() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
tokio::time::sleep(time).await;
});
assert!(
instant.elapsed() >= time,
"Block on should poll awaited future to completion"
);
}
#[test]
fn join_another_arbiter() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await;
Arbiter::current().stop();
}));
arbiter.join().unwrap();
});
assert!(
instant.elapsed() >= time,
"Join on another arbiter should complete only when it calls stop"
);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || {
actix_rt::spawn(async move {
tokio::time::sleep(time).await;
Arbiter::current().stop();
});
});
arbiter.join().unwrap();
});
assert!(
instant.elapsed() >= time,
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await;
Arbiter::current().stop();
}));
arbiter.stop();
arbiter.join().unwrap();
});
assert!(
instant.elapsed() < time,
"Premature stop of arbiter should conclude regardless of it's current state"
);
}
#[test]
fn non_static_block_on() {
let string = String::from("test_str");
let string = string.as_str();
let sys = System::new();
sys.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
let rt = actix_rt::Runtime::new().unwrap();
rt.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
}
#[test]
fn wait_for_spawns() {
let rt = actix_rt::Runtime::new().unwrap();
let handle = rt.spawn(async {
println!("running on the runtime");
// assertion panic is caught at task boundary
assert_eq!(1, 2);
});
assert!(rt.block_on(handle).is_err());
}
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
let (tx, rx) = channel::<u32>();
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || tx.send(42).unwrap());
let num = rx.recv().unwrap();
assert_eq!(num, 42);
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
let (tx, rx) = channel::<u32>();
let arbiter = Arbiter::new();
let handle = arbiter.handle();
drop(arbiter);
handle.spawn_fn(move || {
tx.send(42).unwrap();
System::current().stop()
});
let num = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert_eq!(num, 42);
handle.stop();
sys.run().unwrap();
}
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
let arbiter = Arbiter::new();
arbiter.spawn_fn(|| panic!("test"));
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
let arbiter = Arbiter::new();
arbiter.spawn(async { panic!("test") });
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
let (tx, rx) = oneshot::channel();
let sys = System::current();
thread::spawn(|| {
// this thread will have no arbiter in it's thread local so call will panic
Arbiter::current();
})
.join()
.unwrap_err();
let thread = thread::spawn(|| {
// this thread will have no arbiter in it's thread local so use the system handle instead
System::set_current(sys);
let sys = System::current();
let arb = sys.arbiter();
arb.spawn(async move {
tx.send(42u32).unwrap();
System::current().stop();
});
});
assert_eq!(runner.block_on(rx).unwrap(), 42);
thread.join().unwrap();
}
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
let arb = Arbiter::new();
// arbiter should be alive to receive spawn msg
assert!(Arbiter::current().spawn_fn(|| {}));
assert!(arb.spawn_fn(|| {}));
System::current().stop();
sys.run().unwrap();
// account for slightly slow thread de-spawns (only observed on windows)
thread::sleep(Duration::from_millis(100));
// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
assert!(!arb.spawn_fn(|| {}));
arb.join().unwrap();
}
#[test]
fn new_system_with_tokio() {
let (tx, rx) = channel();
let res = System::with_tokio_rt(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_keep_alive(Duration::from_millis(1000))
.worker_threads(2)
.max_blocking_threads(2)
.on_thread_start(|| {})
.on_thread_stop(|| {})
.build()
.unwrap()
})
.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
tokio::task::spawn(async move {
tx.send(42).unwrap();
})
.await
.unwrap();
123usize
});
assert_eq!(res, 123);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn new_arbiter_with_tokio() {
let _ = System::new();
let arb = Arbiter::with_tokio_rt(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
let counter = Arc::new(AtomicBool::new(true));
let counter1 = counter.clone();
let did_spawn = arb.spawn(async move {
actix_rt::time::sleep(Duration::from_millis(1)).await;
counter1.store(false, Ordering::SeqCst);
Arbiter::current().stop();
});
assert!(did_spawn);
arb.join().unwrap();
assert_eq!(false, counter.load(Ordering::SeqCst));
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
}
#[test]
fn try_current_with_system() {
System::new().block_on(async { assert!(System::try_current().is_some()) });
}

View File

@@ -24,6 +24,7 @@ futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
log = "0.4"
pin-project-lite = "0.2.0"
tokio = { version = "1", features = ["sync"] }
[dev-dependencies]
actix-rt = "2.0.0"

View File

@@ -1,21 +1,20 @@
//! Framed dispatcher service and related utilities.
#![allow(type_alias_bounds)]
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::{fmt, mem};
use core::{
fmt,
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures_core::stream::Stream;
use log::debug;
use pin_project_lite::pin_project;
use tokio::sync::mpsc;
use crate::mpsc;
/// Framed transport errors
/// Framed transport errors.
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
Service(E),
Encoder(<U as Encoder<I>>::Error),
@@ -64,8 +63,7 @@ pub enum Message<T> {
}
pin_project! {
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
/// Dispatcher is a future that reads frames from Framed object and passes them to the service.
pub struct Dispatcher<S, T, U, I>
where
S: Service<<U as Decoder>::Item, Response = I>,
@@ -82,8 +80,8 @@ pin_project! {
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
rx: mpsc::UnboundedReceiver<Result<Message<I>, S::Error>>,
tx: mpsc::UnboundedSender<Result<Message<I>, S::Error>>,
}
}
@@ -134,26 +132,7 @@ where
where
F: IntoService<S, <U as Decoder>::Item>,
{
let (tx, rx) = mpsc::channel();
Dispatcher {
framed,
rx,
tx,
service: service.into_service(),
state: State::Processing,
}
}
/// Construct new `Dispatcher` instance with customer `mpsc::Receiver`
pub fn with_rx<F>(
framed: Framed<T, U>,
service: F,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self
where
F: IntoService<S, <U as Decoder>::Item>,
{
let tx = rx.sender();
let (tx, rx) = mpsc::unbounded_channel();
Dispatcher {
framed,
rx,
@@ -164,28 +143,28 @@ where
}
/// Get sink
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
pub fn tx(&self) -> mpsc::UnboundedSender<Result<Message<I>, S::Error>> {
self.tx.clone()
}
/// Get reference to a service wrapped by `Dispatcher` instance.
pub fn get_ref(&self) -> &S {
pub fn service(&self) -> &S {
&self.service
}
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
pub fn get_mut(&mut self) -> &mut S {
pub fn service_mut(&mut self) -> &mut S {
&mut self.service
}
/// Get reference to a framed instance wrapped by `Dispatcher`
/// instance.
pub fn get_framed(&self) -> &Framed<T, U> {
pub fn framed(&self) -> &Framed<T, U> {
&self.framed
}
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> {
pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
&mut self.framed
}
@@ -246,7 +225,7 @@ where
loop {
let mut this = self.as_mut().project();
while !this.framed.is_write_buf_full() {
match Pin::new(&mut this.rx).poll_next(cx) {
match this.rx.poll_recv(cx) {
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state = State::FramedError(DispatcherError::Encoder(err));
@@ -266,7 +245,7 @@ where
}
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
match this.framed.poll_flush(cx) {
Poll::Pending => break,
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => {
@@ -298,41 +277,43 @@ where
type Output = Result<(), DispatcherError<S::Error, U, I>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let this = self.as_mut().project();
let this = self.as_mut().project();
return match this.state {
State::Processing => {
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
continue;
} else {
Poll::Pending
}
match this.state {
State::Processing => {
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
self.poll(cx)
} else {
Poll::Pending
}
State::Error(_) => {
// flush write buffer
if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() {
return Poll::Pending;
}
Poll::Ready(Err(this.state.take_error()))
}
State::Error(_) => {
// flush write buffer
if !this.framed.is_write_buf_empty() && this.framed.poll_flush(cx).is_pending()
{
return Poll::Pending;
}
State::FlushAndStop => {
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(this.state.take_error()))
}
State::FlushAndStop => {
if !this.framed.is_write_buf_empty() {
this.framed.poll_flush(cx).map(|res| {
if let Err(err) = res {
debug!("Error sending data: {:?}", err);
}
} else {
Poll::Ready(Ok(()))
}
Ok(())
})
} else {
Poll::Ready(Ok(()))
}
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
State::Stopping => Poll::Ready(Ok(())),
};
}
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
State::Stopping => Poll::Ready(Ok(())),
}
}
}

View File

@@ -7,6 +7,5 @@
pub mod counter;
pub mod dispatcher;
pub mod mpsc;
pub mod task;
pub mod timeout;

View File

@@ -1,224 +0,0 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
use core::any::Any;
use core::cell::RefCell;
use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::collections::VecDeque;
use std::error::Error;
use std::rc::Rc;
use futures_core::stream::Stream;
use futures_sink::Sink;
use crate::task::LocalWaker;
/// Creates a unbounded in-memory channel with buffered storage.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Rc::new(RefCell::new(Shared {
has_receiver: true,
buffer: VecDeque::new(),
blocked_recv: LocalWaker::new(),
}));
let sender = Sender {
shared: shared.clone(),
};
let receiver = Receiver { shared };
(sender, receiver)
}
#[derive(Debug)]
struct Shared<T> {
buffer: VecDeque<T>,
blocked_recv: LocalWaker,
has_receiver: bool,
}
/// The transmission end of a channel.
///
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Sender<T> {
shared: Rc<RefCell<Shared<T>>>,
}
impl<T> Unpin for Sender<T> {}
impl<T> Sender<T> {
/// Sends the provided message along this channel.
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
let mut shared = self.shared.borrow_mut();
if !shared.has_receiver {
return Err(SendError(item)); // receiver was dropped
};
shared.buffer.push_back(item);
shared.blocked_recv.wake();
Ok(())
}
/// Closes the sender half
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.shared.borrow_mut().has_receiver = false;
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
shared: self.shared.clone(),
}
}
}
impl<T> Sink<T> for Sender<T> {
type Error = SendError<T>;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
self.send(item)
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), SendError<T>>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let count = Rc::strong_count(&self.shared);
let shared = self.shared.borrow_mut();
// check is last sender is about to drop
if shared.has_receiver && count == 2 {
// Wake up receiver as its stream has ended
shared.blocked_recv.wake();
}
}
}
/// The receiving end of a channel which implements the `Stream` trait.
///
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Receiver<T> {
shared: Rc<RefCell<Shared<T>>>,
}
impl<T> Receiver<T> {
/// Create Sender
pub fn sender(&self) -> Sender<T> {
Sender {
shared: self.shared.clone(),
}
}
}
impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut shared = self.shared.borrow_mut();
if Rc::strong_count(&self.shared) == 1 {
// All senders have been dropped, so drain the buffer and end the
// stream.
Poll::Ready(shared.buffer.pop_front())
} else if let Some(msg) = shared.buffer.pop_front() {
Poll::Ready(Some(msg))
} else {
shared.blocked_recv.register(cx.waker());
Poll::Pending
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut shared = self.shared.borrow_mut();
shared.buffer.clear();
shared.has_receiver = false;
}
}
/// Error type for sending, used when the receiving end of a channel is
/// dropped
pub struct SendError<T>(T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("SendError").field(&"...").finish()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}
impl<T: Any> Error for SendError<T> {
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::lazy;
use futures_util::{stream::Stream, StreamExt};
#[actix_rt::test]
async fn test_mpsc() {
let (tx, mut rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.next().await.unwrap(), "test");
let tx2 = tx.clone();
tx2.send("test2").unwrap();
assert_eq!(rx.next().await.unwrap(), "test2");
assert_eq!(
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
Poll::Pending
);
drop(tx2);
assert_eq!(
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
Poll::Pending
);
drop(tx);
assert_eq!(rx.next().await, None);
let (tx, rx) = channel();
tx.send("test").unwrap();
drop(rx);
assert!(tx.send("test").is_err());
let (mut tx, _) = channel();
let tx2 = tx.clone();
tx.close();
assert!(tx.send("test").is_err());
assert!(tx2.send("test").is_err());
}
}

View File

@@ -1,2 +1,5 @@
max_width = 96
reorder_imports = true
#wrap_comments = true
#fn_args_density = "Compressed"
#use_small_heuristics = false