1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-20 07:40:33 +01:00

remove RUNNING Q PENDING thread locals from actix-rt (#207)

This commit is contained in:
fakeshadow 2020-12-27 07:26:02 +08:00 committed by GitHub
parent 43ce25cda1
commit 518bf3f6a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 273 additions and 274 deletions

View File

@ -6,6 +6,18 @@
* Add `System::attach_to_tokio` method. [#173] * Add `System::attach_to_tokio` method. [#173]
### Changed
* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`.
Remove `'static` lifetime requirement for `System::run` and `Builder::run`.
`Arbiter::spawn` would panic when `System` is not in scope. [#207]
### Fixed
* Fix work load issue by removing `PENDDING` thread local. [#207]
[#207]: https://github.com/actix/actix-net/pull/207
## [1.1.1] - 2020-04-30 ## [1.1.1] - 2020-04-30
### Fixed ### Fixed

View File

@ -17,11 +17,10 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-macros = "0.1.0" actix-macros = "0.1.0"
copyless = "0.1.4"
futures-channel = "0.3.4" futures-channel = "0.3.7"
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
[dev-dependencies] [dev-dependencies]
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
tokio = { version = "0.2.6", features = ["full"] } tokio = { version = "0.2.6", features = ["full"] }

View File

@ -1,6 +1,7 @@
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell}; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -8,24 +9,23 @@ use std::{fmt, thread};
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot::{channel, Canceled, Sender}; use futures_channel::oneshot::{channel, Canceled, Sender};
use futures_util::{ // use futures_util::stream::FuturesUnordered;
future::{self, Future, FutureExt}, // use tokio::task::JoinHandle;
stream::Stream, // use tokio::stream::StreamExt;
}; use tokio::stream::Stream;
use tokio::task::LocalSet;
use crate::runtime::Runtime; use crate::runtime::Runtime;
use crate::system::System; use crate::system::System;
use copyless::BoxHelper;
use smallvec::SmallVec;
pub use tokio::task::JoinHandle;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false); // TODO: Commented out code are for Arbiter::local_join function.
static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new()); // It can be safely removed if this function is not used in actix-*.
static PENDING: RefCell<SmallVec<[JoinHandle<()>; 8]>> = RefCell::new(SmallVec::new()); //
// /// stores join handle for spawned async tasks.
// static HANDLE: RefCell<FuturesUnordered<JoinHandle<()>>> =
// RefCell::new(FuturesUnordered::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new()); static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
); );
@ -69,14 +69,14 @@ impl Default for Arbiter {
} }
impl Arbiter { impl Arbiter {
pub(crate) fn new_system() -> Self { pub(crate) fn new_system(local: &LocalSet) -> Self {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let arb = Arbiter::with_sender(tx); let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
Arbiter::spawn(ArbiterController { stop: None, rx });
local.spawn_local(ArbiterController { rx });
arb arb
} }
@ -91,8 +91,9 @@ impl Arbiter {
} }
/// Check if current arbiter is running. /// Check if current arbiter is running.
#[deprecated(note = "Thread local variables for running state of Arbiter is removed")]
pub fn is_running() -> bool { pub fn is_running() -> bool {
RUNNING.with(|cell| cell.get()) false
} }
/// Stop arbiter from continuing it's event loop. /// Stop arbiter from continuing it's event loop.
@ -106,26 +107,20 @@ impl Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed); let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id); let name = format!("actix-rt:worker:{}", id);
let sys = System::current(); let sys = System::current();
let (arb_tx, arb_rx) = unbounded(); let (tx, rx) = unbounded();
let arb_tx2 = arb_tx.clone();
let handle = thread::Builder::new() let handle = thread::Builder::new()
.name(name.clone()) .name(name.clone())
.spawn(move || { .spawn({
let tx = tx.clone();
move || {
let mut rt = Runtime::new().expect("Can not create Runtime"); let mut rt = Runtime::new().expect("Can not create Runtime");
let arb = Arbiter::with_sender(arb_tx); let arb = Arbiter::with_sender(tx);
let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys); System::set_current(sys);
// start arbiter controller
rt.spawn(ArbiterController {
stop: Some(stop),
rx: arb_rx,
});
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register arbiter // register arbiter
@ -133,42 +128,26 @@ impl Arbiter {
.sys() .sys()
.unbounded_send(SystemCommand::RegisterArbiter(id, arb)); .unbounded_send(SystemCommand::RegisterArbiter(id, arb));
// start arbiter controller
// run loop // run loop
let _ = rt.block_on(stop_rx).unwrap_or(1); rt.block_on(ArbiterController { rx });
// unregister arbiter // unregister arbiter
let _ = System::current() let _ = System::current()
.sys() .sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id)); .unbounded_send(SystemCommand::UnregisterArbiter(id));
}
}) })
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
}); });
Arbiter { Arbiter {
sender: arb_tx2, sender: tx,
thread_handle: Some(handle), thread_handle: Some(handle),
} }
} }
pub(crate) fn run_system(rt: Option<&Runtime>) {
RUNNING.with(|cell| cell.set(true));
Q.with(|cell| {
let mut v = cell.borrow_mut();
for fut in v.drain(..) {
if let Some(rt) = rt {
rt.spawn(fut);
} else {
tokio::task::spawn_local(fut);
}
}
});
}
pub(crate) fn stop_system() {
RUNNING.with(|cell| cell.set(false));
}
/// Spawn a future on the current thread. This does not create a new Arbiter /// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current /// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread. /// thread.
@ -176,26 +155,12 @@ impl Arbiter {
where where
F: Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
RUNNING.with(move |cell| { // HANDLE.with(|handle| {
if cell.get() { // let handle = handle.borrow();
// Spawn the future on running executor // handle.push(tokio::task::spawn_local(future));
let len = PENDING.with(move |cell| { // });
let mut p = cell.borrow_mut(); // let _ = tokio::task::spawn_local(CleanupPending);
p.push(tokio::task::spawn_local(future)); let _ = tokio::task::spawn_local(future);
p.len()
});
if len > 7 {
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending);
}
} else {
// Box the future and push it to the queue, this results in double boxing
// because the executor boxes the future again, but works for now
Q.with(move |cell| {
cell.borrow_mut().push(Pin::from(Box::alloc().init(future)))
});
}
});
} }
/// Executes a future on the current thread. This does not create a new Arbiter /// Executes a future on the current thread. This does not create a new Arbiter
@ -206,7 +171,9 @@ impl Arbiter {
F: FnOnce() -> R + 'static, F: FnOnce() -> R + 'static,
R: Future<Output = ()> + 'static, R: Future<Output = ()> + 'static,
{ {
Arbiter::spawn(future::lazy(|_| f()).flatten()) Arbiter::spawn(async {
f();
})
} }
/// Send a future to the Arbiter's thread, and spawn it. /// Send a future to the Arbiter's thread, and spawn it.
@ -313,40 +280,33 @@ impl Arbiter {
/// Returns a future that will be completed once all currently spawned futures /// Returns a future that will be completed once all currently spawned futures
/// have completed. /// have completed.
pub fn local_join() -> impl Future<Output = ()> { #[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")]
PENDING.with(move |cell| { pub async fn local_join() {
let current = cell.replace(SmallVec::new()); // let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut()));
future::join_all(current).map(|_| ()) // async move {
}) // handle.collect::<Vec<_>>().await;
// }
unimplemented!("Arbiter::local_join function is removed.")
} }
} }
/// Future used for cleaning-up already finished `JoinHandle`s // /// Future used for cleaning-up already finished `JoinHandle`s
/// from the `PENDING` list so the vector doesn't grow indefinitely // /// from the `PENDING` list so the vector doesn't grow indefinitely
struct CleanupPending; // struct CleanupPending;
//
impl Future for CleanupPending { // impl Future for CleanupPending {
type Output = (); // type Output = ();
//
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
PENDING.with(move |cell| { // HANDLE.with(move |handle| {
let mut pending = cell.borrow_mut(); // recycle_join_handle(&mut *handle.borrow_mut(), cx);
let mut i = 0; // });
while i != pending.len() { //
if Pin::new(&mut pending[i]).poll(cx).is_ready() { // Poll::Ready(())
pending.remove(i); // }
} else { // }
i += 1;
}
}
});
Poll::Ready(())
}
}
struct ArbiterController { struct ArbiterController {
stop: Option<Sender<i32>>,
rx: UnboundedReceiver<ArbiterCommand>, rx: UnboundedReceiver<ArbiterCommand>,
} }
@ -371,22 +331,14 @@ impl Future for ArbiterController {
match Pin::new(&mut self.rx).poll_next(cx) { match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item { Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => { ArbiterCommand::Stop => return Poll::Ready(()),
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
};
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => { ArbiterCommand::Execute(fut) => {
let len = PENDING.with(move |cell| { // HANDLE.with(|handle| {
let mut p = cell.borrow_mut(); // let mut handle = handle.borrow_mut();
p.push(tokio::task::spawn_local(fut)); // handle.push(tokio::task::spawn_local(fut));
p.len() // recycle_join_handle(&mut *handle, cx);
}); // });
if len > 7 { tokio::task::spawn_local(fut);
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending);
}
} }
ArbiterCommand::ExecuteFn(f) => { ArbiterCommand::ExecuteFn(f) => {
f.call_box(); f.call_box();
@ -398,6 +350,20 @@ impl Future for ArbiterController {
} }
} }
// fn recycle_join_handle(handle: &mut FuturesUnordered<JoinHandle<()>>, cx: &mut Context<'_>) {
// let _ = Pin::new(&mut *handle).poll_next(cx);
//
// // Try to recycle more join handles and free up memory.
// //
// // this is a guess. The yield limit for FuturesUnordered is 32.
// // So poll an extra 3 times would make the total poll below 128.
// if handle.len() > 64 {
// (0..3).for_each(|_| {
// let _ = Pin::new(&mut *handle).poll_next(cx);
// })
// }
// }
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum SystemCommand { pub(crate) enum SystemCommand {
Exit(i32), Exit(i32),

View File

@ -1,9 +1,9 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::future::Future;
use std::io; use std::io;
use futures_channel::mpsc::unbounded; use futures_channel::mpsc::unbounded;
use futures_channel::oneshot::{channel, Receiver}; use futures_channel::oneshot::{channel, Receiver};
use futures_util::future::{lazy, Future, FutureExt};
use tokio::task::LocalSet; use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemArbiter}; use crate::arbiter::{Arbiter, SystemArbiter};
@ -65,7 +65,7 @@ impl Builder {
/// Function `f` get called within tokio runtime context. /// Function `f` get called within tokio runtime context.
pub fn run<F>(self, f: F) -> io::Result<()> pub fn run<F>(self, f: F) -> io::Result<()>
where where
F: FnOnce() + 'static, F: FnOnce(),
{ {
self.create_runtime(f).run() self.create_runtime(f).run()
} }
@ -74,7 +74,8 @@ impl Builder {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); let system =
System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic);
// system arbiter // system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver); let arb = SystemArbiter::new(stop_tx, sys_receiver);
@ -87,21 +88,26 @@ impl Builder {
fn create_runtime<F>(self, f: F) -> SystemRunner fn create_runtime<F>(self, f: F) -> SystemRunner
where where
F: FnOnce() + 'static, F: FnOnce(),
{ {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); let mut rt = Runtime::new().unwrap();
let system = System::construct(
sys_sender,
Arbiter::new_system(rt.local()),
self.stop_on_panic,
);
// system arbiter // system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver); let arb = SystemArbiter::new(stop_tx, sys_receiver);
let mut rt = Runtime::new().unwrap();
rt.spawn(arb); rt.spawn(arb);
// init system arbiter and run configuration method // init system arbiter and run configuration method
rt.block_on(lazy(move |_| f())); rt.block_on(async { f() });
SystemRunner { rt, stop, system } SystemRunner { rt, stop, system }
} }
@ -120,10 +126,8 @@ impl AsyncSystemRunner {
let AsyncSystemRunner { stop, .. } = self; let AsyncSystemRunner { stop, .. } = self;
// run loop // run loop
lazy(|_| {
Arbiter::run_system(None);
async { async {
let res = match stop.await { match stop.await {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -135,12 +139,8 @@ impl AsyncSystemRunner {
} }
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
Arbiter::stop_system();
res
} }
}) }
.flatten()
} }
} }
@ -160,8 +160,7 @@ impl SystemRunner {
let SystemRunner { mut rt, stop, .. } = self; let SystemRunner { mut rt, stop, .. } = self;
// run loop // run loop
Arbiter::run_system(Some(&rt)); match rt.block_on(stop) {
let result = match rt.block_on(stop) {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -173,19 +172,15 @@ impl SystemRunner {
} }
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}; }
Arbiter::stop_system();
result
} }
/// Execute a future and wait for result. /// Execute a future and wait for result.
#[inline]
pub fn block_on<F, O>(&mut self, fut: F) -> O pub fn block_on<F, O>(&mut self, fut: F) -> O
where where
F: Future<Output = O> + 'static, F: Future<Output = O>,
{ {
Arbiter::run_system(Some(&self.rt)); self.rt.block_on(fut)
let res = self.rt.block_on(fut);
Arbiter::stop_system();
res
} }
} }

View File

@ -4,6 +4,8 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::future::Future;
#[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg(not(test))] // Work around for rust-lang/rust#62127
pub use actix_macros::{main, test}; pub use actix_macros::{main, test};
@ -22,15 +24,12 @@ pub use self::system::System;
/// # Panics /// # Panics
/// ///
/// This function panics if actix system is not running. /// This function panics if actix system is not running.
#[inline]
pub fn spawn<F>(f: F) pub fn spawn<F>(f: F)
where where
F: futures_util::future::Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
if !System::is_set() { Arbiter::spawn(f)
panic!("System is not running");
}
Arbiter::spawn(f);
} }
/// Asynchronous signal handling /// Asynchronous signal handling

View File

@ -30,6 +30,10 @@ impl Runtime {
}) })
} }
pub(super) fn local(&self) -> &LocalSet {
&self.local
}
/// Spawn a future onto the single-threaded runtime. /// Spawn a future onto the single-threaded runtime.
/// ///
/// See [module level][mod] documentation for more details. /// See [module level][mod] documentation for more details.
@ -84,7 +88,7 @@ impl Runtime {
/// complete execution by calling `block_on` or `run`. /// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&mut self, f: F) -> F::Output pub fn block_on<F>(&mut self, f: F) -> F::Output
where where
F: Future + 'static, F: Future,
{ {
self.local.block_on(&mut self.rt, f) self.local.block_on(&mut self.rt, f)
} }

View File

@ -256,7 +256,7 @@ impl System {
/// Function `f` get called within tokio runtime context. /// Function `f` get called within tokio runtime context.
pub fn run<F>(f: F) -> io::Result<()> pub fn run<F>(f: F) -> io::Result<()>
where where
F: FnOnce() + 'static, F: FnOnce(),
{ {
Self::builder().run(f) Self::builder().run(f)
} }

View File

@ -1,19 +1,5 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[test]
fn start_and_stop() {
actix_rt::System::new("start_and_stop").block_on(async move {
assert!(
actix_rt::Arbiter::is_running(),
"System doesn't seem to have started"
);
});
assert!(
!actix_rt::Arbiter::is_running(),
"System doesn't seem to have stopped"
);
}
#[test] #[test]
fn await_for_timer() { fn await_for_timer() {
let time = Duration::from_secs(2); let time = Duration::from_secs(2);
@ -76,39 +62,65 @@ fn join_another_arbiter() {
); );
} }
// #[test]
// fn join_current_arbiter() {
// let time = Duration::from_secs(2);
//
// let instant = Instant::now();
// actix_rt::System::new("test_join_current_arbiter").block_on(async move {
// actix_rt::spawn(async move {
// tokio::time::delay_for(time).await;
// actix_rt::Arbiter::current().stop();
// });
// actix_rt::Arbiter::local_join().await;
// });
// assert!(
// instant.elapsed() >= time,
// "Join on current arbiter should wait for all spawned futures"
// );
//
// let large_timer = Duration::from_secs(20);
// let instant = Instant::now();
// actix_rt::System::new("test_join_current_arbiter").block_on(async move {
// actix_rt::spawn(async move {
// tokio::time::delay_for(time).await;
// actix_rt::Arbiter::current().stop();
// });
// let f = actix_rt::Arbiter::local_join();
// actix_rt::spawn(async move {
// tokio::time::delay_for(large_timer).await;
// actix_rt::Arbiter::current().stop();
// });
// f.await;
// });
// assert!(
// instant.elapsed() < large_timer,
// "local_join should await only for the already spawned futures"
// );
// }
#[test] #[test]
fn join_current_arbiter() { fn non_static_block_on() {
let time = Duration::from_secs(2); let string = String::from("test_str");
let str = string.as_str();
let instant = Instant::now(); let mut sys = actix_rt::System::new("borrow some");
actix_rt::System::new("test_join_current_arbiter").block_on(async move {
actix_rt::spawn(async move {
tokio::time::delay_for(time).await;
actix_rt::Arbiter::current().stop();
});
actix_rt::Arbiter::local_join().await;
});
assert!(
instant.elapsed() >= time,
"Join on current arbiter should wait for all spawned futures"
);
let large_timer = Duration::from_secs(20); sys.block_on(async {
let instant = Instant::now(); actix_rt::time::delay_for(Duration::from_millis(1)).await;
actix_rt::System::new("test_join_current_arbiter").block_on(async move { assert_eq!("test_str", str);
actix_rt::spawn(async move {
tokio::time::delay_for(time).await;
actix_rt::Arbiter::current().stop();
}); });
let f = actix_rt::Arbiter::local_join();
actix_rt::spawn(async move { let mut rt = actix_rt::Runtime::new().unwrap();
tokio::time::delay_for(large_timer).await;
actix_rt::Arbiter::current().stop(); rt.block_on(async {
actix_rt::time::delay_for(Duration::from_millis(1)).await;
assert_eq!("test_str", str);
}); });
f.await;
}); actix_rt::System::run(|| {
assert!( assert_eq!("test_str", str);
instant.elapsed() < large_timer, actix_rt::System::current().stop();
"local_join should await only for the already spawned futures" })
); .unwrap();
} }

View File

@ -22,13 +22,16 @@ fn test_bind() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let mut sys = actix_rt::System::new("test");
let srv = Server::build()
let srv = sys.block_on(lazy(|_| {
Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) .bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.start(); .start()
}));
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run(); let _ = sys.run();
}); });
@ -46,14 +49,16 @@ fn test_listen() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let mut sys = actix_rt::System::new("test");
let lst = net::TcpListener::bind(addr).unwrap(); let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(lazy(|_| {
Server::build() Server::build()
.disable_signals() .disable_signals()
.workers(1) .workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.start(); .start()
}));
let _ = tx.send(actix_rt::System::current()); let _ = tx.send(actix_rt::System::current());
let _ = sys.run(); let _ = sys.run();
}); });
@ -78,8 +83,9 @@ fn test_start() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let mut sys = actix_rt::System::new("test");
let srv: Server = Server::build() let srv = sys.block_on(lazy(|_| {
Server::build()
.backlog(100) .backlog(100)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, move || {
@ -90,7 +96,8 @@ fn test_start() {
}) })
}) })
.unwrap() .unwrap()
.start(); .start()
}));
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run(); let _ = sys.run();
@ -144,8 +151,9 @@ fn test_configure() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num2.clone(); let num = num2.clone();
let sys = actix_rt::System::new("test"); let mut sys = actix_rt::System::new("test");
let srv = Server::build() let srv = sys.block_on(lazy(|_| {
Server::build()
.disable_signals() .disable_signals()
.configure(move |cfg| { .configure(move |cfg| {
let num = num.clone(); let num = num.clone();
@ -166,7 +174,8 @@ fn test_configure() {
}) })
.unwrap() .unwrap()
.workers(1) .workers(1)
.start(); .start()
}));
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run(); let _ = sys.run();
}); });

View File

@ -83,15 +83,18 @@ impl TestServer {
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let sys = System::new("actix-test-server"); let mut sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
sys.block_on(async {
Server::build() Server::build()
.listen("test", tcp, factory)? .listen("test", tcp, factory)
.unwrap()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.start(); .start();
});
tx.send((System::current(), local_addr)).unwrap(); tx.send((System::current(), local_addr)).unwrap();
sys.run() sys.run()