mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-18 19:45:31 +02:00
Compare commits
2 Commits
rt-fix
...
remove-mps
Author | SHA1 | Date | |
---|---|---|---|
|
45e3a5c0e6 | ||
|
564acfbf3a |
@@ -235,7 +235,7 @@ impl<T, U> Framed<T, U> {
|
||||
}
|
||||
|
||||
/// Flush write buffer to underlying I/O stream.
|
||||
pub fn flush<I>(
|
||||
pub fn poll_flush<I>(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), U::Error>>
|
||||
@@ -271,7 +271,7 @@ impl<T, U> Framed<T, U> {
|
||||
}
|
||||
|
||||
/// Flush write buffer and shutdown underlying I/O stream.
|
||||
pub fn close<I>(
|
||||
pub fn poll_close<I>(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), U::Error>>
|
||||
@@ -319,11 +319,11 @@ where
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.flush(cx)
|
||||
self.poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.close(cx)
|
||||
self.poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -26,7 +26,6 @@ macros = ["actix-macros"]
|
||||
actix-macros = { version = "0.2.0", optional = true }
|
||||
|
||||
futures-core = { version = "0.3", default-features = false }
|
||||
futures-intrusive = "0.4"
|
||||
tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@@ -9,9 +9,6 @@ use std::{
|
||||
};
|
||||
|
||||
use futures_core::ready;
|
||||
use futures_intrusive::channel::shared::{
|
||||
oneshot_broadcast_channel, OneshotBroadcastReceiver, OneshotBroadcastSender,
|
||||
};
|
||||
use tokio::{sync::mpsc, task::LocalSet};
|
||||
|
||||
use crate::{
|
||||
@@ -43,26 +40,11 @@ impl fmt::Debug for ArbiterCommand {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ArbiterHandle {
|
||||
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||
/// Is `None` for system arbiter.
|
||||
stopped_rx: Option<OneshotBroadcastReceiver<()>>,
|
||||
}
|
||||
|
||||
impl ArbiterHandle {
|
||||
pub(crate) fn new(
|
||||
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||
stopped_rx: OneshotBroadcastReceiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
stopped_rx: Some(stopped_rx),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn for_system(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
stopped_rx: None,
|
||||
}
|
||||
pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
|
||||
Self { tx }
|
||||
}
|
||||
|
||||
/// Send a future to the [Arbiter]'s thread and spawn it.
|
||||
@@ -99,25 +81,6 @@ impl ArbiterHandle {
|
||||
pub fn stop(&self) -> bool {
|
||||
self.tx.send(ArbiterCommand::Stop).is_ok()
|
||||
}
|
||||
|
||||
/// Will wait for associated [Arbiter] to complete all commands up until it is stopped.
|
||||
///
|
||||
/// For [Arbiter]s that have already stopped, the future will resolve immediately.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if called on the system Arbiter. In this situation the Arbiter's lifetime is
|
||||
/// implicitly bound by the main thread's lifetime.
|
||||
pub async fn join(self) {
|
||||
match self.stopped_rx {
|
||||
Some(rx) => {
|
||||
rx.receive().await;
|
||||
}
|
||||
None => {
|
||||
// TODO: decide if this is correct
|
||||
panic!("cannot wait on the system Arbiter's completion")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
|
||||
@@ -126,10 +89,8 @@ impl ArbiterHandle {
|
||||
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
||||
#[derive(Debug)]
|
||||
pub struct Arbiter {
|
||||
id: usize,
|
||||
stopped_tx: OneshotBroadcastSender<()>,
|
||||
cmd_tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||
thread_handle: Option<thread::JoinHandle<()>>,
|
||||
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||
thread_handle: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Arbiter {
|
||||
@@ -138,7 +99,7 @@ impl Arbiter {
|
||||
/// # Panics
|
||||
/// Panics if a [System] is not registered on the current thread.
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> ArbiterHandle {
|
||||
pub fn new() -> Arbiter {
|
||||
Self::with_tokio_rt(|| {
|
||||
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
|
||||
})
|
||||
@@ -148,109 +109,74 @@ impl Arbiter {
|
||||
///
|
||||
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||
#[doc(hidden)]
|
||||
pub fn with_tokio_rt<F>(runtime_factory: F) -> ArbiterHandle
|
||||
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
|
||||
where
|
||||
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
|
||||
{
|
||||
eprintln!("get sys current");
|
||||
|
||||
let sys = System::current();
|
||||
eprintln!("get sys id");
|
||||
let system_id = sys.id();
|
||||
eprintln!("calc arb id");
|
||||
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
|
||||
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
// let ready_barrier = Arc::new(Barrier::new(2));
|
||||
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
||||
|
||||
let (stopped_tx, stopped_rx) = oneshot_broadcast_channel::<()>();
|
||||
|
||||
eprintln!("make arb handle");
|
||||
let hnd = ArbiterHandle::new(cmd_tx.clone(), stopped_rx);
|
||||
|
||||
eprintln!("make thread");
|
||||
let thread_handle = thread::Builder::new()
|
||||
.name(name.clone())
|
||||
.spawn({
|
||||
let hnd = hnd.clone();
|
||||
// let ready_barrier = Arc::clone(&ready_barrier);
|
||||
|
||||
let tx = tx.clone();
|
||||
move || {
|
||||
eprintln!("thread: make rt");
|
||||
let rt = Runtime::from(runtime_factory());
|
||||
let hnd = ArbiterHandle::new(tx);
|
||||
|
||||
eprintln!("thread: set sys");
|
||||
System::set_current(sys);
|
||||
|
||||
// // wait until register message is sent
|
||||
// eprintln!("thread: wait for arb registered");
|
||||
// ready_barrier.wait();
|
||||
|
||||
eprintln!("thread: set arb handle");
|
||||
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||
|
||||
// register arbiter
|
||||
let _ = System::current()
|
||||
.tx()
|
||||
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
|
||||
|
||||
ready_tx.send(()).unwrap();
|
||||
|
||||
// run arbiter event processing loop
|
||||
eprintln!("thread: block on arbiter loop");
|
||||
rt.block_on(ArbiterLoop { cmd_rx });
|
||||
rt.block_on(ArbiterRunner { rx });
|
||||
|
||||
// deregister arbiter
|
||||
eprintln!("thread: send deregister arbiter message");
|
||||
System::current()
|
||||
let _ = System::current()
|
||||
.tx()
|
||||
.send(SystemCommand::DeregisterArbiter(arb_id))
|
||||
.unwrap();
|
||||
.send(SystemCommand::DeregisterArbiter(arb_id));
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
|
||||
});
|
||||
|
||||
let arb = Arbiter {
|
||||
id: arb_id,
|
||||
cmd_tx,
|
||||
stopped_tx,
|
||||
thread_handle: Some(thread_handle),
|
||||
};
|
||||
ready_rx.recv().unwrap();
|
||||
|
||||
// register arbiter
|
||||
eprintln!("send register arbiter message");
|
||||
System::current()
|
||||
.tx()
|
||||
.send(SystemCommand::RegisterArbiter(arb))
|
||||
.unwrap();
|
||||
|
||||
// eprintln!("inform arbiter that it is registered");
|
||||
// ready_barrier.wait();
|
||||
|
||||
eprintln!("arbiter::new done");
|
||||
hnd
|
||||
Arbiter { tx, thread_handle }
|
||||
}
|
||||
|
||||
/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
|
||||
pub(crate) fn for_system(local: &LocalSet) -> ArbiterHandle {
|
||||
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
|
||||
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let hnd = ArbiterHandle::for_system(cmd_tx);
|
||||
let hnd = ArbiterHandle::new(tx);
|
||||
|
||||
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||
|
||||
local.spawn_local(ArbiterLoop { cmd_rx });
|
||||
local.spawn_local(ArbiterRunner { rx });
|
||||
|
||||
hnd
|
||||
}
|
||||
|
||||
/// Return `Arbiter`'s numeric ID.
|
||||
pub(crate) fn id(&self) -> usize {
|
||||
self.id
|
||||
/// Return a handle to the this Arbiter's message sender.
|
||||
pub fn handle(&self) -> ArbiterHandle {
|
||||
ArbiterHandle::new(self.tx.clone())
|
||||
}
|
||||
|
||||
// /// Return a handle to the this Arbiter's message sender.
|
||||
// pub fn handle(&self) -> ArbiterHandle {
|
||||
// ArbiterHandle::new(self.cmd_tx.clone())
|
||||
// }
|
||||
|
||||
/// Return a handle to the current thread's Arbiter's message sender.
|
||||
///
|
||||
/// # Panics
|
||||
@@ -265,68 +191,57 @@ impl Arbiter {
|
||||
/// Stop Arbiter from continuing it's event loop.
|
||||
///
|
||||
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
|
||||
pub(crate) fn stop(&self) -> bool {
|
||||
self.cmd_tx.send(ArbiterCommand::Stop).is_ok()
|
||||
pub fn stop(&self) -> bool {
|
||||
self.tx.send(ArbiterCommand::Stop).is_ok()
|
||||
}
|
||||
|
||||
// /// Send a future to the Arbiter's thread and spawn it.
|
||||
// ///
|
||||
// /// If you require a result, include a response channel in the future.
|
||||
// ///
|
||||
// /// Returns true if future was sent successfully and false if the Arbiter has died.
|
||||
// pub fn spawn<Fut>(&self, future: Fut) -> bool
|
||||
// where
|
||||
// Fut: Future<Output = ()> + Send + 'static,
|
||||
// {
|
||||
// self.cmd_tx
|
||||
// .send(ArbiterCommand::Execute(Box::pin(future)))
|
||||
// .is_ok()
|
||||
// }
|
||||
/// Send a future to the Arbiter's thread and spawn it.
|
||||
///
|
||||
/// If you require a result, include a response channel in the future.
|
||||
///
|
||||
/// Returns true if future was sent successfully and false if the Arbiter has died.
|
||||
pub fn spawn<Fut>(&self, future: Fut) -> bool
|
||||
where
|
||||
Fut: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.tx
|
||||
.send(ArbiterCommand::Execute(Box::pin(future)))
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
// /// Send a function to the Arbiter's thread and execute it.
|
||||
// ///
|
||||
// /// Any result from the function is discarded. If you require a result, include a response
|
||||
// /// channel in the function.
|
||||
// ///
|
||||
// /// Returns true if function was sent successfully and false if the Arbiter has died.
|
||||
// pub fn spawn_fn<F>(&self, f: F) -> bool
|
||||
// where
|
||||
// F: FnOnce() + Send + 'static,
|
||||
// {
|
||||
// self.spawn(async { f() })
|
||||
// }
|
||||
}
|
||||
/// Send a function to the Arbiter's thread and execute it.
|
||||
///
|
||||
/// Any result from the function is discarded. If you require a result, include a response
|
||||
/// channel in the function.
|
||||
///
|
||||
/// Returns true if function was sent successfully and false if the Arbiter has died.
|
||||
pub fn spawn_fn<F>(&self, f: F) -> bool
|
||||
where
|
||||
F: FnOnce() + Send + 'static,
|
||||
{
|
||||
self.spawn(async { f() })
|
||||
}
|
||||
|
||||
impl Drop for Arbiter {
|
||||
fn drop(&mut self) {
|
||||
eprintln!("Arb::drop: joining arbiter thread");
|
||||
match self.thread_handle.take().unwrap().join() {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
eprintln!("arbiter {} thread panicked: {:?}", self.id(), err)
|
||||
}
|
||||
}
|
||||
eprintln!("Arb::drop: sending stopped tx");
|
||||
|
||||
// could fail if all handles are dropped already so ignore result
|
||||
let _ = self.stopped_tx.send(());
|
||||
|
||||
eprintln!("Arb::drop: done");
|
||||
/// Wait for Arbiter's event loop to complete.
|
||||
///
|
||||
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.thread_handle.join()
|
||||
}
|
||||
}
|
||||
|
||||
/// A persistent future that processes [Arbiter] commands.
|
||||
struct ArbiterLoop {
|
||||
cmd_rx: mpsc::UnboundedReceiver<ArbiterCommand>,
|
||||
struct ArbiterRunner {
|
||||
rx: mpsc::UnboundedReceiver<ArbiterCommand>,
|
||||
}
|
||||
|
||||
impl Future for ArbiterLoop {
|
||||
impl Future for ArbiterRunner {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// process all items currently buffered in channel
|
||||
loop {
|
||||
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
|
||||
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
|
||||
// channel closed; no more messages can be received
|
||||
None => return Poll::Ready(()),
|
||||
|
||||
|
@@ -16,12 +16,12 @@
|
||||
//!
|
||||
//! # Examples
|
||||
//! ```
|
||||
//! use std::sync::mpsc::channel as std_channel;
|
||||
//! use std::sync::mpsc;
|
||||
//! use actix_rt::{Arbiter, System};
|
||||
//!
|
||||
//! let sys = System::new();
|
||||
//! let _ = System::new();
|
||||
//!
|
||||
//! let (tx, rx) = std_channel::<u32>();
|
||||
//! let (tx, rx) = mpsc::channel::<u32>();
|
||||
//!
|
||||
//! let arbiter = Arbiter::new();
|
||||
//! arbiter.spawn_fn(move || tx.send(42).unwrap());
|
||||
@@ -30,10 +30,7 @@
|
||||
//! assert_eq!(num, 42);
|
||||
//!
|
||||
//! arbiter.stop();
|
||||
//! sys.block_on(arbiter.join());
|
||||
//!
|
||||
//! System::current().stop();
|
||||
//! sys.run().unwrap();
|
||||
//! arbiter.join().unwrap();
|
||||
//! ```
|
||||
|
||||
#![deny(rust_2018_idioms, nonstandard_style)]
|
||||
|
@@ -54,9 +54,14 @@ impl System {
|
||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let rt = Runtime::from(runtime_factory());
|
||||
let sys_arbiter = Arbiter::for_system(rt.local_set());
|
||||
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
|
||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||
|
||||
system
|
||||
.tx()
|
||||
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
||||
.unwrap();
|
||||
|
||||
// init background system arbiter
|
||||
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
||||
rt.spawn(sys_ctrl);
|
||||
@@ -145,10 +150,7 @@ impl System {
|
||||
}
|
||||
|
||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||
///
|
||||
/// Dropping the `SystemRunner` (eg. `let _ = System::new();`) will result in no further events
|
||||
/// being processed. It is required you bind the runner and call `run` or call `block_on`.
|
||||
#[must_use = "A SystemRunner does nothing unless `run` or `block_on` is called."]
|
||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||
#[derive(Debug)]
|
||||
pub struct SystemRunner {
|
||||
rt: Runtime,
|
||||
@@ -188,7 +190,7 @@ impl SystemRunner {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum SystemCommand {
|
||||
Exit(i32),
|
||||
RegisterArbiter(Arbiter),
|
||||
RegisterArbiter(usize, ArbiterHandle),
|
||||
DeregisterArbiter(usize),
|
||||
}
|
||||
|
||||
@@ -198,7 +200,7 @@ pub(crate) enum SystemCommand {
|
||||
pub(crate) struct SystemController {
|
||||
stop_tx: Option<oneshot::Sender<i32>>,
|
||||
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
||||
arbiters: HashMap<usize, Arbiter>,
|
||||
arbiters: HashMap<usize, ArbiterHandle>,
|
||||
}
|
||||
|
||||
impl SystemController {
|
||||
@@ -219,42 +221,35 @@ impl Future for SystemController {
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// process all items currently buffered in channel
|
||||
let code = loop {
|
||||
loop {
|
||||
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
|
||||
// channel closed; no more messages can be received
|
||||
None => break 0,
|
||||
None => return Poll::Ready(()),
|
||||
|
||||
// process system command
|
||||
Some(cmd) => match cmd {
|
||||
SystemCommand::Exit(code) => {
|
||||
// stop all arbiters
|
||||
for arb in self.arbiters.values() {
|
||||
eprintln!("SystemController: stopping arbiter {}", arb.id());
|
||||
arb.stop();
|
||||
}
|
||||
|
||||
eprintln!("SystemController: dropping arbiters");
|
||||
// destroy all arbiters
|
||||
// drop waits for threads to complete
|
||||
self.arbiters.clear();
|
||||
|
||||
break code;
|
||||
// stop event loop
|
||||
// will only fire once
|
||||
if let Some(stop_tx) = self.stop_tx.take() {
|
||||
let _ = stop_tx.send(code);
|
||||
}
|
||||
}
|
||||
|
||||
SystemCommand::RegisterArbiter(arb) => {
|
||||
self.arbiters.insert(arb.id(), arb);
|
||||
SystemCommand::RegisterArbiter(id, arb) => {
|
||||
self.arbiters.insert(id, arb);
|
||||
}
|
||||
|
||||
SystemCommand::DeregisterArbiter(id) => {
|
||||
// implicit arbiter drop
|
||||
let _ = self.arbiters.remove(&id);
|
||||
self.arbiters.remove(&id);
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
self.stop_tx.take().unwrap().send(code).unwrap();
|
||||
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,44 +0,0 @@
|
||||
//! Derived from this comment:
|
||||
//! https://github.com/actix/actix/issues/464#issuecomment-779427825
|
||||
|
||||
use std::{thread, time::Duration};
|
||||
|
||||
use actix_rt::{Arbiter, System};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[test]
|
||||
fn actix_sample() {
|
||||
let sys = System::new();
|
||||
let arb = Arbiter::new();
|
||||
|
||||
let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
|
||||
|
||||
// create "actor"
|
||||
arb.spawn_fn(move || {
|
||||
let a = A;
|
||||
|
||||
actix_rt::spawn(async move {
|
||||
while let Some(_) = rx.recv().await {
|
||||
println!("{:?}", a);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
System::current().stop();
|
||||
|
||||
// all arbiters must be dropped when sys.run returns
|
||||
sys.run().unwrap();
|
||||
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct A;
|
||||
|
||||
impl Drop for A {
|
||||
fn drop(&mut self) {
|
||||
println!("start drop");
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
println!("finish drop");
|
||||
}
|
||||
}
|
@@ -1,120 +0,0 @@
|
||||
use std::{
|
||||
sync::mpsc::channel as std_channel,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use actix_rt::{time, Arbiter, System};
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_arbiter_new_panic() {
|
||||
Arbiter::new();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn join_arbiter_wait_fut() {
|
||||
let time = Duration::from_secs(1);
|
||||
let instant = Instant::now();
|
||||
|
||||
System::new().block_on(async move {
|
||||
let arbiter = Arbiter::new();
|
||||
|
||||
arbiter.spawn(async move {
|
||||
time::sleep(time).await;
|
||||
Arbiter::current().stop();
|
||||
});
|
||||
|
||||
arbiter.join().await;
|
||||
});
|
||||
|
||||
assert!(
|
||||
instant.elapsed() >= time,
|
||||
"Join on another arbiter should complete only when it calls stop"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn join_arbiter_wait_fn() {
|
||||
let time = Duration::from_secs(1);
|
||||
let instant = Instant::now();
|
||||
|
||||
System::new().block_on(async move {
|
||||
let arbiter = Arbiter::new();
|
||||
|
||||
arbiter.spawn_fn(move || {
|
||||
actix_rt::spawn(async move {
|
||||
time::sleep(time).await;
|
||||
Arbiter::current().stop();
|
||||
});
|
||||
});
|
||||
|
||||
arbiter.join().await;
|
||||
});
|
||||
|
||||
assert!(
|
||||
instant.elapsed() >= time,
|
||||
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn join_arbiter_early_stop_call() {
|
||||
let time = Duration::from_secs(1);
|
||||
let instant = Instant::now();
|
||||
|
||||
System::new().block_on(async move {
|
||||
let arbiter = Arbiter::new();
|
||||
|
||||
arbiter.spawn(Box::pin(async move {
|
||||
time::sleep(time).await;
|
||||
Arbiter::current().stop();
|
||||
}));
|
||||
|
||||
arbiter.stop();
|
||||
arbiter.join().await;
|
||||
});
|
||||
|
||||
assert!(
|
||||
instant.elapsed() < time,
|
||||
"Premature stop of Arbiter should conclude regardless of it's current state."
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn arbiter_spawn_fn_runs() {
|
||||
let sys = System::new();
|
||||
|
||||
let (tx, rx) = std_channel::<u32>();
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
arbiter.spawn_fn(move || {
|
||||
tx.send(42).unwrap();
|
||||
System::current().stop();
|
||||
});
|
||||
|
||||
let num = rx.recv().unwrap();
|
||||
assert_eq!(num, 42);
|
||||
|
||||
sys.run().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn arbiter_inner_panic() {
|
||||
let sys = System::new();
|
||||
|
||||
let (tx, rx) = std_channel::<u32>();
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
|
||||
// spawned panics should not cause arbiter to crash
|
||||
arbiter.spawn(async { panic!("inner panic; will be caught") });
|
||||
arbiter.spawn_fn(|| panic!("inner panic; will be caught"));
|
||||
|
||||
arbiter.spawn(async move { tx.send(42).unwrap() });
|
||||
|
||||
let num = rx.recv().unwrap();
|
||||
assert_eq!(num, 42);
|
||||
|
||||
System::current().stop();
|
||||
sys.run().unwrap();
|
||||
}
|
@@ -1,82 +0,0 @@
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::channel as std_channel,
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use actix_rt::{Arbiter, System};
|
||||
|
||||
#[test]
|
||||
fn wait_for_spawns() {
|
||||
let rt = actix_rt::Runtime::new().unwrap();
|
||||
|
||||
let handle = rt.spawn(async {
|
||||
println!("running on the runtime");
|
||||
// assertion panic is caught at task boundary
|
||||
assert_eq!(1, 2);
|
||||
});
|
||||
|
||||
assert!(rt.block_on(handle).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_system_with_tokio() {
|
||||
let (tx, rx) = std_channel();
|
||||
|
||||
let res = System::with_tokio_rt(move || {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.thread_keep_alive(Duration::from_millis(1000))
|
||||
.worker_threads(2)
|
||||
.max_blocking_threads(2)
|
||||
.on_thread_start(|| {})
|
||||
.on_thread_stop(|| {})
|
||||
.build()
|
||||
.unwrap()
|
||||
})
|
||||
.block_on(async {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
tx.send(42).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
123usize
|
||||
});
|
||||
|
||||
assert_eq!(res, 123);
|
||||
assert_eq!(rx.recv().unwrap(), 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_arbiter_with_tokio() {
|
||||
let sys = System::new();
|
||||
|
||||
let arb = Arbiter::with_tokio_rt(|| {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let counter = Arc::new(AtomicBool::new(true));
|
||||
|
||||
let counter1 = counter.clone();
|
||||
let did_spawn = arb.spawn(async move {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
counter1.store(false, Ordering::SeqCst);
|
||||
Arbiter::current().stop();
|
||||
System::current().stop();
|
||||
});
|
||||
|
||||
sys.run().unwrap();
|
||||
|
||||
assert!(did_spawn);
|
||||
assert_eq!(false, counter.load(Ordering::SeqCst));
|
||||
}
|
@@ -1,130 +0,0 @@
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use actix_rt::{time, Arbiter, System};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_current_panic() {
|
||||
System::current();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_current_no_system() {
|
||||
assert!(System::try_current().is_none())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_current_with_system() {
|
||||
System::new().block_on(async { assert!(System::try_current().is_some()) });
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_static_block_on() {
|
||||
let string = String::from("test_str");
|
||||
let string = string.as_str();
|
||||
|
||||
let sys = System::new();
|
||||
|
||||
sys.block_on(async {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
assert_eq!("test_str", string);
|
||||
});
|
||||
|
||||
let rt = actix_rt::Runtime::new().unwrap();
|
||||
|
||||
rt.block_on(async {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
assert_eq!("test_str", string);
|
||||
});
|
||||
|
||||
System::current().stop();
|
||||
sys.run().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn await_for_timer() {
|
||||
let time = Duration::from_secs(1);
|
||||
let instant = Instant::now();
|
||||
|
||||
System::new().block_on(async move {
|
||||
time::sleep(time).await;
|
||||
});
|
||||
|
||||
assert!(
|
||||
instant.elapsed() >= time,
|
||||
"Calling `block_on` should poll awaited future to completion."
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn system_arbiter_spawn() {
|
||||
let runner = System::new();
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let sys = System::current();
|
||||
|
||||
thread::spawn(|| {
|
||||
// this thread will have no arbiter in it's thread local so call will panic
|
||||
Arbiter::current();
|
||||
})
|
||||
.join()
|
||||
.unwrap_err();
|
||||
|
||||
let thread = thread::spawn(|| {
|
||||
// this thread will have no arbiter in it's thread local so use the system handle instead
|
||||
System::set_current(sys);
|
||||
let sys = System::current();
|
||||
|
||||
let arb = sys.arbiter();
|
||||
arb.spawn(async move {
|
||||
tx.send(42u32).unwrap();
|
||||
System::current().stop();
|
||||
});
|
||||
});
|
||||
|
||||
assert_eq!(runner.block_on(rx).unwrap(), 42);
|
||||
thread.join().unwrap();
|
||||
}
|
||||
|
||||
struct Atom(Arc<AtomicBool>);
|
||||
|
||||
impl Drop for Atom {
|
||||
fn drop(&mut self) {
|
||||
self.0.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn system_stop_arbiter_join_barrier() {
|
||||
let sys = System::new();
|
||||
let arb = Arbiter::new();
|
||||
|
||||
let atom = Atom(Arc::new(AtomicBool::new(false)));
|
||||
|
||||
// arbiter should be alive to receive spawn msg
|
||||
assert!(Arbiter::current().spawn_fn(|| {}));
|
||||
assert!(arb.spawn_fn(move || {
|
||||
// thread should get dropped during sleep
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
|
||||
// pointless load to move atom into scope
|
||||
atom.0.load(Ordering::SeqCst);
|
||||
|
||||
panic!("spawned fn (thread) should be dropped during sleep");
|
||||
}));
|
||||
|
||||
System::current().stop();
|
||||
sys.run().unwrap();
|
||||
|
||||
// arbiter should be dead and return false
|
||||
assert!(!Arbiter::current().spawn_fn(|| {}));
|
||||
assert!(!arb.spawn_fn(|| {}));
|
||||
}
|
300
actix-rt/tests/tests.rs
Normal file
300
actix-rt/tests/tests.rs
Normal file
@@ -0,0 +1,300 @@
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::channel,
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use actix_rt::{Arbiter, System};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
#[test]
|
||||
fn await_for_timer() {
|
||||
let time = Duration::from_secs(1);
|
||||
let instant = Instant::now();
|
||||
System::new().block_on(async move {
|
||||
tokio::time::sleep(time).await;
|
||||
});
|
||||
assert!(
|
||||
instant.elapsed() >= time,
|
||||
"Block on should poll awaited future to completion"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn join_another_arbiter() {
|
||||
let time = Duration::from_secs(1);
|
||||
let instant = Instant::now();
|
||||
System::new().block_on(async move {
|
||||
let arbiter = Arbiter::new();
|
||||
arbiter.spawn(Box::pin(async move {
|
||||
tokio::time::sleep(time).await;
|
||||
Arbiter::current().stop();
|
||||
}));
|
||||
arbiter.join().unwrap();
|
||||
});
|
||||
assert!(
|
||||
instant.elapsed() >= time,
|
||||
"Join on another arbiter should complete only when it calls stop"
|
||||
);
|
||||
|
||||
let instant = Instant::now();
|
||||
System::new().block_on(async move {
|
||||
let arbiter = Arbiter::new();
|
||||
arbiter.spawn_fn(move || {
|
||||
actix_rt::spawn(async move {
|
||||
tokio::time::sleep(time).await;
|
||||
Arbiter::current().stop();
|
||||
});
|
||||
});
|
||||
arbiter.join().unwrap();
|
||||
});
|
||||
assert!(
|
||||
instant.elapsed() >= time,
|
||||
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
|
||||
);
|
||||
|
||||
let instant = Instant::now();
|
||||
System::new().block_on(async move {
|
||||
let arbiter = Arbiter::new();
|
||||
arbiter.spawn(Box::pin(async move {
|
||||
tokio::time::sleep(time).await;
|
||||
Arbiter::current().stop();
|
||||
}));
|
||||
arbiter.stop();
|
||||
arbiter.join().unwrap();
|
||||
});
|
||||
assert!(
|
||||
instant.elapsed() < time,
|
||||
"Premature stop of arbiter should conclude regardless of it's current state"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_static_block_on() {
|
||||
let string = String::from("test_str");
|
||||
let string = string.as_str();
|
||||
|
||||
let sys = System::new();
|
||||
|
||||
sys.block_on(async {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
assert_eq!("test_str", string);
|
||||
});
|
||||
|
||||
let rt = actix_rt::Runtime::new().unwrap();
|
||||
|
||||
rt.block_on(async {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
assert_eq!("test_str", string);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_for_spawns() {
|
||||
let rt = actix_rt::Runtime::new().unwrap();
|
||||
|
||||
let handle = rt.spawn(async {
|
||||
println!("running on the runtime");
|
||||
// assertion panic is caught at task boundary
|
||||
assert_eq!(1, 2);
|
||||
});
|
||||
|
||||
assert!(rt.block_on(handle).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn arbiter_spawn_fn_runs() {
|
||||
let _ = System::new();
|
||||
|
||||
let (tx, rx) = channel::<u32>();
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
arbiter.spawn_fn(move || tx.send(42).unwrap());
|
||||
|
||||
let num = rx.recv().unwrap();
|
||||
assert_eq!(num, 42);
|
||||
|
||||
arbiter.stop();
|
||||
arbiter.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn arbiter_handle_spawn_fn_runs() {
|
||||
let sys = System::new();
|
||||
|
||||
let (tx, rx) = channel::<u32>();
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
let handle = arbiter.handle();
|
||||
drop(arbiter);
|
||||
|
||||
handle.spawn_fn(move || {
|
||||
tx.send(42).unwrap();
|
||||
System::current().stop()
|
||||
});
|
||||
|
||||
let num = rx.recv_timeout(Duration::from_secs(2)).unwrap();
|
||||
assert_eq!(num, 42);
|
||||
|
||||
handle.stop();
|
||||
sys.run().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn arbiter_drop_no_panic_fn() {
|
||||
let _ = System::new();
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
arbiter.spawn_fn(|| panic!("test"));
|
||||
|
||||
arbiter.stop();
|
||||
arbiter.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn arbiter_drop_no_panic_fut() {
|
||||
let _ = System::new();
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
arbiter.spawn(async { panic!("test") });
|
||||
|
||||
arbiter.stop();
|
||||
arbiter.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_current_panic() {
|
||||
System::current();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_arbiter_new_panic() {
|
||||
Arbiter::new();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn system_arbiter_spawn() {
|
||||
let runner = System::new();
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let sys = System::current();
|
||||
|
||||
thread::spawn(|| {
|
||||
// this thread will have no arbiter in it's thread local so call will panic
|
||||
Arbiter::current();
|
||||
})
|
||||
.join()
|
||||
.unwrap_err();
|
||||
|
||||
let thread = thread::spawn(|| {
|
||||
// this thread will have no arbiter in it's thread local so use the system handle instead
|
||||
System::set_current(sys);
|
||||
let sys = System::current();
|
||||
|
||||
let arb = sys.arbiter();
|
||||
arb.spawn(async move {
|
||||
tx.send(42u32).unwrap();
|
||||
System::current().stop();
|
||||
});
|
||||
});
|
||||
|
||||
assert_eq!(runner.block_on(rx).unwrap(), 42);
|
||||
thread.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn system_stop_stops_arbiters() {
|
||||
let sys = System::new();
|
||||
let arb = Arbiter::new();
|
||||
|
||||
// arbiter should be alive to receive spawn msg
|
||||
assert!(Arbiter::current().spawn_fn(|| {}));
|
||||
assert!(arb.spawn_fn(|| {}));
|
||||
|
||||
System::current().stop();
|
||||
sys.run().unwrap();
|
||||
|
||||
// account for slightly slow thread de-spawns (only observed on windows)
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// arbiter should be dead and return false
|
||||
assert!(!Arbiter::current().spawn_fn(|| {}));
|
||||
assert!(!arb.spawn_fn(|| {}));
|
||||
|
||||
arb.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_system_with_tokio() {
|
||||
let (tx, rx) = channel();
|
||||
|
||||
let res = System::with_tokio_rt(move || {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.thread_keep_alive(Duration::from_millis(1000))
|
||||
.worker_threads(2)
|
||||
.max_blocking_threads(2)
|
||||
.on_thread_start(|| {})
|
||||
.on_thread_stop(|| {})
|
||||
.build()
|
||||
.unwrap()
|
||||
})
|
||||
.block_on(async {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
tx.send(42).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
123usize
|
||||
});
|
||||
|
||||
assert_eq!(res, 123);
|
||||
assert_eq!(rx.recv().unwrap(), 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_arbiter_with_tokio() {
|
||||
let _ = System::new();
|
||||
|
||||
let arb = Arbiter::with_tokio_rt(|| {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let counter = Arc::new(AtomicBool::new(true));
|
||||
|
||||
let counter1 = counter.clone();
|
||||
let did_spawn = arb.spawn(async move {
|
||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||
counter1.store(false, Ordering::SeqCst);
|
||||
Arbiter::current().stop();
|
||||
});
|
||||
|
||||
assert!(did_spawn);
|
||||
|
||||
arb.join().unwrap();
|
||||
|
||||
assert_eq!(false, counter.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_current_no_system() {
|
||||
assert!(System::try_current().is_none())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_current_with_system() {
|
||||
System::new().block_on(async { assert!(System::try_current().is_some()) });
|
||||
}
|
@@ -24,6 +24,7 @@ futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-sink = { version = "0.3.7", default-features = false }
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.2.0"
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
|
@@ -1,21 +1,20 @@
|
||||
//! Framed dispatcher service and related utilities.
|
||||
|
||||
#![allow(type_alias_bounds)]
|
||||
|
||||
use core::future::Future;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
use core::{fmt, mem};
|
||||
use core::{
|
||||
fmt,
|
||||
future::Future,
|
||||
mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||
use actix_service::{IntoService, Service};
|
||||
use futures_core::stream::Stream;
|
||||
use log::debug;
|
||||
use pin_project_lite::pin_project;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::mpsc;
|
||||
|
||||
/// Framed transport errors
|
||||
/// Framed transport errors.
|
||||
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
|
||||
Service(E),
|
||||
Encoder(<U as Encoder<I>>::Error),
|
||||
@@ -64,8 +63,7 @@ pub enum Message<T> {
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// Dispatcher is a future that reads frames from Framed object
|
||||
/// and passes them to the service.
|
||||
/// Dispatcher is a future that reads frames from Framed object and passes them to the service.
|
||||
pub struct Dispatcher<S, T, U, I>
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
@@ -82,8 +80,8 @@ pin_project! {
|
||||
state: State<S, U, I>,
|
||||
#[pin]
|
||||
framed: Framed<T, U>,
|
||||
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
|
||||
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
|
||||
rx: mpsc::UnboundedReceiver<Result<Message<I>, S::Error>>,
|
||||
tx: mpsc::UnboundedSender<Result<Message<I>, S::Error>>,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,26 +132,7 @@ where
|
||||
where
|
||||
F: IntoService<S, <U as Decoder>::Item>,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel();
|
||||
Dispatcher {
|
||||
framed,
|
||||
rx,
|
||||
tx,
|
||||
service: service.into_service(),
|
||||
state: State::Processing,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new `Dispatcher` instance with customer `mpsc::Receiver`
|
||||
pub fn with_rx<F>(
|
||||
framed: Framed<T, U>,
|
||||
service: F,
|
||||
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
|
||||
) -> Self
|
||||
where
|
||||
F: IntoService<S, <U as Decoder>::Item>,
|
||||
{
|
||||
let tx = rx.sender();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
Dispatcher {
|
||||
framed,
|
||||
rx,
|
||||
@@ -164,28 +143,28 @@ where
|
||||
}
|
||||
|
||||
/// Get sink
|
||||
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
|
||||
pub fn tx(&self) -> mpsc::UnboundedSender<Result<Message<I>, S::Error>> {
|
||||
self.tx.clone()
|
||||
}
|
||||
|
||||
/// Get reference to a service wrapped by `Dispatcher` instance.
|
||||
pub fn get_ref(&self) -> &S {
|
||||
pub fn service(&self) -> &S {
|
||||
&self.service
|
||||
}
|
||||
|
||||
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
|
||||
pub fn get_mut(&mut self) -> &mut S {
|
||||
pub fn service_mut(&mut self) -> &mut S {
|
||||
&mut self.service
|
||||
}
|
||||
|
||||
/// Get reference to a framed instance wrapped by `Dispatcher`
|
||||
/// instance.
|
||||
pub fn get_framed(&self) -> &Framed<T, U> {
|
||||
pub fn framed(&self) -> &Framed<T, U> {
|
||||
&self.framed
|
||||
}
|
||||
|
||||
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
|
||||
pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> {
|
||||
pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
|
||||
&mut self.framed
|
||||
}
|
||||
|
||||
@@ -246,7 +225,7 @@ where
|
||||
loop {
|
||||
let mut this = self.as_mut().project();
|
||||
while !this.framed.is_write_buf_full() {
|
||||
match Pin::new(&mut this.rx).poll_next(cx) {
|
||||
match this.rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
|
||||
if let Err(err) = this.framed.as_mut().write(msg) {
|
||||
*this.state = State::FramedError(DispatcherError::Encoder(err));
|
||||
@@ -266,7 +245,7 @@ where
|
||||
}
|
||||
|
||||
if !this.framed.is_write_buf_empty() {
|
||||
match this.framed.flush(cx) {
|
||||
match this.framed.poll_flush(cx) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(Ok(_)) => (),
|
||||
Poll::Ready(Err(err)) => {
|
||||
@@ -298,41 +277,43 @@ where
|
||||
type Output = Result<(), DispatcherError<S::Error, U, I>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
let this = self.as_mut().project();
|
||||
let this = self.as_mut().project();
|
||||
|
||||
return match this.state {
|
||||
State::Processing => {
|
||||
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
|
||||
continue;
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
match this.state {
|
||||
State::Processing => {
|
||||
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
|
||||
self.poll(cx)
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
State::Error(_) => {
|
||||
// flush write buffer
|
||||
if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(Err(this.state.take_error()))
|
||||
}
|
||||
|
||||
State::Error(_) => {
|
||||
// flush write buffer
|
||||
if !this.framed.is_write_buf_empty() && this.framed.poll_flush(cx).is_pending()
|
||||
{
|
||||
return Poll::Pending;
|
||||
}
|
||||
State::FlushAndStop => {
|
||||
if !this.framed.is_write_buf_empty() {
|
||||
match this.framed.flush(cx) {
|
||||
Poll::Ready(Err(err)) => {
|
||||
debug!("Error sending data: {:?}", err);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
|
||||
|
||||
Poll::Ready(Err(this.state.take_error()))
|
||||
}
|
||||
|
||||
State::FlushAndStop => {
|
||||
if !this.framed.is_write_buf_empty() {
|
||||
this.framed.poll_flush(cx).map(|res| {
|
||||
if let Err(err) = res {
|
||||
debug!("Error sending data: {:?}", err);
|
||||
}
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
|
||||
State::Stopping => Poll::Ready(Ok(())),
|
||||
};
|
||||
}
|
||||
|
||||
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
|
||||
State::Stopping => Poll::Ready(Ok(())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -7,6 +7,5 @@
|
||||
|
||||
pub mod counter;
|
||||
pub mod dispatcher;
|
||||
pub mod mpsc;
|
||||
pub mod task;
|
||||
pub mod timeout;
|
||||
|
@@ -1,224 +0,0 @@
|
||||
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
|
||||
|
||||
use core::any::Any;
|
||||
use core::cell::RefCell;
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::error::Error;
|
||||
use std::rc::Rc;
|
||||
|
||||
use futures_core::stream::Stream;
|
||||
use futures_sink::Sink;
|
||||
|
||||
use crate::task::LocalWaker;
|
||||
|
||||
/// Creates a unbounded in-memory channel with buffered storage.
|
||||
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||
let shared = Rc::new(RefCell::new(Shared {
|
||||
has_receiver: true,
|
||||
buffer: VecDeque::new(),
|
||||
blocked_recv: LocalWaker::new(),
|
||||
}));
|
||||
let sender = Sender {
|
||||
shared: shared.clone(),
|
||||
};
|
||||
let receiver = Receiver { shared };
|
||||
(sender, receiver)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Shared<T> {
|
||||
buffer: VecDeque<T>,
|
||||
blocked_recv: LocalWaker,
|
||||
has_receiver: bool,
|
||||
}
|
||||
|
||||
/// The transmission end of a channel.
|
||||
///
|
||||
/// This is created by the `channel` function.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender<T> {
|
||||
shared: Rc<RefCell<Shared<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Unpin for Sender<T> {}
|
||||
|
||||
impl<T> Sender<T> {
|
||||
/// Sends the provided message along this channel.
|
||||
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
if !shared.has_receiver {
|
||||
return Err(SendError(item)); // receiver was dropped
|
||||
};
|
||||
shared.buffer.push_back(item);
|
||||
shared.blocked_recv.wake();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Closes the sender half
|
||||
///
|
||||
/// This prevents any further messages from being sent on the channel while
|
||||
/// still enabling the receiver to drain messages that are buffered.
|
||||
pub fn close(&mut self) {
|
||||
self.shared.borrow_mut().has_receiver = false;
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Sender<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Sender {
|
||||
shared: self.shared.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Sink<T> for Sender<T> {
|
||||
type Error = SendError<T>;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
|
||||
self.send(item)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), SendError<T>>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
fn drop(&mut self) {
|
||||
let count = Rc::strong_count(&self.shared);
|
||||
let shared = self.shared.borrow_mut();
|
||||
|
||||
// check is last sender is about to drop
|
||||
if shared.has_receiver && count == 2 {
|
||||
// Wake up receiver as its stream has ended
|
||||
shared.blocked_recv.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The receiving end of a channel which implements the `Stream` trait.
|
||||
///
|
||||
/// This is created by the `channel` function.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver<T> {
|
||||
shared: Rc<RefCell<Shared<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Receiver<T> {
|
||||
/// Create Sender
|
||||
pub fn sender(&self) -> Sender<T> {
|
||||
Sender {
|
||||
shared: self.shared.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Unpin for Receiver<T> {}
|
||||
|
||||
impl<T> Stream for Receiver<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
if Rc::strong_count(&self.shared) == 1 {
|
||||
// All senders have been dropped, so drain the buffer and end the
|
||||
// stream.
|
||||
Poll::Ready(shared.buffer.pop_front())
|
||||
} else if let Some(msg) = shared.buffer.pop_front() {
|
||||
Poll::Ready(Some(msg))
|
||||
} else {
|
||||
shared.blocked_recv.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Receiver<T> {
|
||||
fn drop(&mut self) {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
shared.buffer.clear();
|
||||
shared.has_receiver = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Error type for sending, used when the receiving end of a channel is
|
||||
/// dropped
|
||||
pub struct SendError<T>(T);
|
||||
|
||||
impl<T> fmt::Debug for SendError<T> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_tuple("SendError").field(&"...").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Display for SendError<T> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(fmt, "send failed because receiver is gone")
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Any> Error for SendError<T> {
|
||||
fn description(&self) -> &str {
|
||||
"send failed because receiver is gone"
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> SendError<T> {
|
||||
/// Returns the message that was attempted to be sent but failed.
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures_util::future::lazy;
|
||||
use futures_util::{stream::Stream, StreamExt};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_mpsc() {
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
assert_eq!(rx.next().await.unwrap(), "test");
|
||||
|
||||
let tx2 = tx.clone();
|
||||
tx2.send("test2").unwrap();
|
||||
assert_eq!(rx.next().await.unwrap(), "test2");
|
||||
|
||||
assert_eq!(
|
||||
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
|
||||
Poll::Pending
|
||||
);
|
||||
drop(tx2);
|
||||
assert_eq!(
|
||||
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
|
||||
Poll::Pending
|
||||
);
|
||||
drop(tx);
|
||||
assert_eq!(rx.next().await, None);
|
||||
|
||||
let (tx, rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
drop(rx);
|
||||
assert!(tx.send("test").is_err());
|
||||
|
||||
let (mut tx, _) = channel();
|
||||
let tx2 = tx.clone();
|
||||
tx.close();
|
||||
assert!(tx.send("test").is_err());
|
||||
assert!(tx2.send("test").is_err());
|
||||
}
|
||||
}
|
@@ -1,2 +1,5 @@
|
||||
max_width = 96
|
||||
reorder_imports = true
|
||||
#wrap_comments = true
|
||||
#fn_args_density = "Compressed"
|
||||
#use_small_heuristics = false
|
||||
|
Reference in New Issue
Block a user