diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 9b5fb636..c3479db1 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -6,6 +6,18 @@ * Add `System::attach_to_tokio` method. [#173] +### Changed + +* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. + Remove `'static` lifetime requirement for `System::run` and `Builder::run`. + `Arbiter::spawn` would panic when `System` is not in scope. [#207] + +### Fixed + +* Fix work load issue by removing `PENDDING` thread local. [#207] + +[#207]: https://github.com/actix/actix-net/pull/207 + ## [1.1.1] - 2020-04-30 ### Fixed diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 68bca814..57710a7f 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -17,11 +17,10 @@ path = "src/lib.rs" [dependencies] actix-macros = "0.1.0" -copyless = "0.1.4" -futures-channel = "0.3.4" -futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } -smallvec = "1" + +futures-channel = "0.3.7" tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } [dev-dependencies] +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } tokio = { version = "0.2.6", features = ["full"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 295d2624..3fe81b99 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,6 +1,7 @@ use std::any::{Any, TypeId}; -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::collections::HashMap; +use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; @@ -8,24 +9,23 @@ use std::{fmt, thread}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot::{channel, Canceled, Sender}; -use futures_util::{ - future::{self, Future, FutureExt}, - stream::Stream, -}; +// use futures_util::stream::FuturesUnordered; +// use tokio::task::JoinHandle; +// use tokio::stream::StreamExt; +use tokio::stream::Stream; +use tokio::task::LocalSet; use crate::runtime::Runtime; use crate::system::System; -use copyless::BoxHelper; - -use smallvec::SmallVec; -pub use tokio::task::JoinHandle; - thread_local!( static ADDR: RefCell> = RefCell::new(None); - static RUNNING: Cell = Cell::new(false); - static Q: RefCell>>>> = RefCell::new(Vec::new()); - static PENDING: RefCell; 8]>> = RefCell::new(SmallVec::new()); + // TODO: Commented out code are for Arbiter::local_join function. + // It can be safely removed if this function is not used in actix-*. + // + // /// stores join handle for spawned async tasks. + // static HANDLE: RefCell>> = + // RefCell::new(FuturesUnordered::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -69,14 +69,14 @@ impl Default for Arbiter { } impl Arbiter { - pub(crate) fn new_system() -> Self { + pub(crate) fn new_system(local: &LocalSet) -> Self { let (tx, rx) = unbounded(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - RUNNING.with(|cell| cell.set(false)); STORAGE.with(|cell| cell.borrow_mut().clear()); - Arbiter::spawn(ArbiterController { stop: None, rx }); + + local.spawn_local(ArbiterController { rx }); arb } @@ -91,8 +91,9 @@ impl Arbiter { } /// Check if current arbiter is running. + #[deprecated(note = "Thread local variables for running state of Arbiter is removed")] pub fn is_running() -> bool { - RUNNING.with(|cell| cell.get()) + false } /// Stop arbiter from continuing it's event loop. @@ -106,69 +107,47 @@ impl Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); let sys = System::current(); - let (arb_tx, arb_rx) = unbounded(); - let arb_tx2 = arb_tx.clone(); + let (tx, rx) = unbounded(); let handle = thread::Builder::new() .name(name.clone()) - .spawn(move || { - let mut rt = Runtime::new().expect("Can not create Runtime"); - let arb = Arbiter::with_sender(arb_tx); + .spawn({ + let tx = tx.clone(); + move || { + let mut rt = Runtime::new().expect("Can not create Runtime"); + let arb = Arbiter::with_sender(tx); - let (stop, stop_rx) = channel(); - RUNNING.with(|cell| cell.set(true)); - STORAGE.with(|cell| cell.borrow_mut().clear()); + STORAGE.with(|cell| cell.borrow_mut().clear()); - System::set_current(sys); + System::set_current(sys); - // start arbiter controller - rt.spawn(ArbiterController { - stop: Some(stop), - rx: arb_rx, - }); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - // register arbiter - let _ = System::current() - .sys() - .unbounded_send(SystemCommand::RegisterArbiter(id, arb)); + // register arbiter + let _ = System::current() + .sys() + .unbounded_send(SystemCommand::RegisterArbiter(id, arb)); - // run loop - let _ = rt.block_on(stop_rx).unwrap_or(1); + // start arbiter controller + // run loop + rt.block_on(ArbiterController { rx }); - // unregister arbiter - let _ = System::current() - .sys() - .unbounded_send(SystemCommand::UnregisterArbiter(id)); + // unregister arbiter + let _ = System::current() + .sys() + .unbounded_send(SystemCommand::UnregisterArbiter(id)); + } }) .unwrap_or_else(|err| { panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) }); Arbiter { - sender: arb_tx2, + sender: tx, thread_handle: Some(handle), } } - pub(crate) fn run_system(rt: Option<&Runtime>) { - RUNNING.with(|cell| cell.set(true)); - Q.with(|cell| { - let mut v = cell.borrow_mut(); - for fut in v.drain(..) { - if let Some(rt) = rt { - rt.spawn(fut); - } else { - tokio::task::spawn_local(fut); - } - } - }); - } - - pub(crate) fn stop_system() { - RUNNING.with(|cell| cell.set(false)); - } - /// Spawn a future on the current thread. This does not create a new Arbiter /// or Arbiter address, it is simply a helper for spawning futures on the current /// thread. @@ -176,26 +155,12 @@ impl Arbiter { where F: Future + 'static, { - RUNNING.with(move |cell| { - if cell.get() { - // Spawn the future on running executor - let len = PENDING.with(move |cell| { - let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(future)); - p.len() - }); - if len > 7 { - // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); - } - } else { - // Box the future and push it to the queue, this results in double boxing - // because the executor boxes the future again, but works for now - Q.with(move |cell| { - cell.borrow_mut().push(Pin::from(Box::alloc().init(future))) - }); - } - }); + // HANDLE.with(|handle| { + // let handle = handle.borrow(); + // handle.push(tokio::task::spawn_local(future)); + // }); + // let _ = tokio::task::spawn_local(CleanupPending); + let _ = tokio::task::spawn_local(future); } /// Executes a future on the current thread. This does not create a new Arbiter @@ -206,7 +171,9 @@ impl Arbiter { F: FnOnce() -> R + 'static, R: Future + 'static, { - Arbiter::spawn(future::lazy(|_| f()).flatten()) + Arbiter::spawn(async { + f(); + }) } /// Send a future to the Arbiter's thread, and spawn it. @@ -313,40 +280,33 @@ impl Arbiter { /// Returns a future that will be completed once all currently spawned futures /// have completed. - pub fn local_join() -> impl Future { - PENDING.with(move |cell| { - let current = cell.replace(SmallVec::new()); - future::join_all(current).map(|_| ()) - }) + #[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")] + pub async fn local_join() { + // let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut())); + // async move { + // handle.collect::>().await; + // } + unimplemented!("Arbiter::local_join function is removed.") } } -/// Future used for cleaning-up already finished `JoinHandle`s -/// from the `PENDING` list so the vector doesn't grow indefinitely -struct CleanupPending; - -impl Future for CleanupPending { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - PENDING.with(move |cell| { - let mut pending = cell.borrow_mut(); - let mut i = 0; - while i != pending.len() { - if Pin::new(&mut pending[i]).poll(cx).is_ready() { - pending.remove(i); - } else { - i += 1; - } - } - }); - - Poll::Ready(()) - } -} +// /// Future used for cleaning-up already finished `JoinHandle`s +// /// from the `PENDING` list so the vector doesn't grow indefinitely +// struct CleanupPending; +// +// impl Future for CleanupPending { +// type Output = (); +// +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// HANDLE.with(move |handle| { +// recycle_join_handle(&mut *handle.borrow_mut(), cx); +// }); +// +// Poll::Ready(()) +// } +// } struct ArbiterController { - stop: Option>, rx: UnboundedReceiver, } @@ -371,22 +331,14 @@ impl Future for ArbiterController { match Pin::new(&mut self.rx).poll_next(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(item)) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Poll::Ready(()); - } + ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { - let len = PENDING.with(move |cell| { - let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(fut)); - p.len() - }); - if len > 7 { - // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); - } + // HANDLE.with(|handle| { + // let mut handle = handle.borrow_mut(); + // handle.push(tokio::task::spawn_local(fut)); + // recycle_join_handle(&mut *handle, cx); + // }); + tokio::task::spawn_local(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); @@ -398,6 +350,20 @@ impl Future for ArbiterController { } } +// fn recycle_join_handle(handle: &mut FuturesUnordered>, cx: &mut Context<'_>) { +// let _ = Pin::new(&mut *handle).poll_next(cx); +// +// // Try to recycle more join handles and free up memory. +// // +// // this is a guess. The yield limit for FuturesUnordered is 32. +// // So poll an extra 3 times would make the total poll below 128. +// if handle.len() > 64 { +// (0..3).for_each(|_| { +// let _ = Pin::new(&mut *handle).poll_next(cx); +// }) +// } +// } + #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index f4d9b1bf..83aed064 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,9 +1,9 @@ use std::borrow::Cow; +use std::future::Future; use std::io; use futures_channel::mpsc::unbounded; use futures_channel::oneshot::{channel, Receiver}; -use futures_util::future::{lazy, Future, FutureExt}; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; @@ -65,7 +65,7 @@ impl Builder { /// Function `f` get called within tokio runtime context. pub fn run(self, f: F) -> io::Result<()> where - F: FnOnce() + 'static, + F: FnOnce(), { self.create_runtime(f).run() } @@ -74,7 +74,8 @@ impl Builder { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + let system = + System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); @@ -87,21 +88,26 @@ impl Builder { fn create_runtime(self, f: F) -> SystemRunner where - F: FnOnce() + 'static, + F: FnOnce(), { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + let mut rt = Runtime::new().unwrap(); + + let system = System::construct( + sys_sender, + Arbiter::new_system(rt.local()), + self.stop_on_panic, + ); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); - let mut rt = Runtime::new().unwrap(); rt.spawn(arb); // init system arbiter and run configuration method - rt.block_on(lazy(move |_| f())); + rt.block_on(async { f() }); SystemRunner { rt, stop, system } } @@ -120,27 +126,21 @@ impl AsyncSystemRunner { let AsyncSystemRunner { stop, .. } = self; // run loop - lazy(|_| { - Arbiter::run_system(None); - async { - let res = match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } + async { + match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - res + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } - }) - .flatten() + } } } @@ -160,8 +160,7 @@ impl SystemRunner { let SystemRunner { mut rt, stop, .. } = self; // run loop - Arbiter::run_system(Some(&rt)); - let result = match rt.block_on(stop) { + match rt.block_on(stop) { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -173,19 +172,15 @@ impl SystemRunner { } } Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - result + } } /// Execute a future and wait for result. + #[inline] pub fn block_on(&mut self, fut: F) -> O where - F: Future + 'static, + F: Future, { - Arbiter::run_system(Some(&self.rt)); - let res = self.rt.block_on(fut); - Arbiter::stop_system(); - res + self.rt.block_on(fut) } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index dccd9202..b2e23c0f 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -4,6 +4,8 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +use std::future::Future; + #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub use actix_macros::{main, test}; @@ -22,15 +24,12 @@ pub use self::system::System; /// # Panics /// /// This function panics if actix system is not running. +#[inline] pub fn spawn(f: F) where - F: futures_util::future::Future + 'static, + F: Future + 'static, { - if !System::is_set() { - panic!("System is not running"); - } - - Arbiter::spawn(f); + Arbiter::spawn(f) } /// Asynchronous signal handling diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index d3231192..7ee02b02 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -30,6 +30,10 @@ impl Runtime { }) } + pub(super) fn local(&self) -> &LocalSet { + &self.local + } + /// Spawn a future onto the single-threaded runtime. /// /// See [module level][mod] documentation for more details. @@ -84,7 +88,7 @@ impl Runtime { /// complete execution by calling `block_on` or `run`. pub fn block_on(&mut self, f: F) -> F::Output where - F: Future + 'static, + F: Future, { self.local.block_on(&mut self.rt, f) } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 2e1ae174..16b96439 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -256,7 +256,7 @@ impl System { /// Function `f` get called within tokio runtime context. pub fn run(f: F) -> io::Result<()> where - F: FnOnce() + 'static, + F: FnOnce(), { Self::builder().run(f) } diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 8e775bab..b3265476 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -1,19 +1,5 @@ use std::time::{Duration, Instant}; -#[test] -fn start_and_stop() { - actix_rt::System::new("start_and_stop").block_on(async move { - assert!( - actix_rt::Arbiter::is_running(), - "System doesn't seem to have started" - ); - }); - assert!( - !actix_rt::Arbiter::is_running(), - "System doesn't seem to have stopped" - ); -} - #[test] fn await_for_timer() { let time = Duration::from_secs(2); @@ -76,39 +62,65 @@ fn join_another_arbiter() { ); } +// #[test] +// fn join_current_arbiter() { +// let time = Duration::from_secs(2); +// +// let instant = Instant::now(); +// actix_rt::System::new("test_join_current_arbiter").block_on(async move { +// actix_rt::spawn(async move { +// tokio::time::delay_for(time).await; +// actix_rt::Arbiter::current().stop(); +// }); +// actix_rt::Arbiter::local_join().await; +// }); +// assert!( +// instant.elapsed() >= time, +// "Join on current arbiter should wait for all spawned futures" +// ); +// +// let large_timer = Duration::from_secs(20); +// let instant = Instant::now(); +// actix_rt::System::new("test_join_current_arbiter").block_on(async move { +// actix_rt::spawn(async move { +// tokio::time::delay_for(time).await; +// actix_rt::Arbiter::current().stop(); +// }); +// let f = actix_rt::Arbiter::local_join(); +// actix_rt::spawn(async move { +// tokio::time::delay_for(large_timer).await; +// actix_rt::Arbiter::current().stop(); +// }); +// f.await; +// }); +// assert!( +// instant.elapsed() < large_timer, +// "local_join should await only for the already spawned futures" +// ); +// } + #[test] -fn join_current_arbiter() { - let time = Duration::from_secs(2); +fn non_static_block_on() { + let string = String::from("test_str"); + let str = string.as_str(); - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::delay_for(time).await; - actix_rt::Arbiter::current().stop(); - }); - actix_rt::Arbiter::local_join().await; - }); - assert!( - instant.elapsed() >= time, - "Join on current arbiter should wait for all spawned futures" - ); + let mut sys = actix_rt::System::new("borrow some"); - let large_timer = Duration::from_secs(20); - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::delay_for(time).await; - actix_rt::Arbiter::current().stop(); - }); - let f = actix_rt::Arbiter::local_join(); - actix_rt::spawn(async move { - tokio::time::delay_for(large_timer).await; - actix_rt::Arbiter::current().stop(); - }); - f.await; + sys.block_on(async { + actix_rt::time::delay_for(Duration::from_millis(1)).await; + assert_eq!("test_str", str); }); - assert!( - instant.elapsed() < large_timer, - "local_join should await only for the already spawned futures" - ); + + let mut rt = actix_rt::Runtime::new().unwrap(); + + rt.block_on(async { + actix_rt::time::delay_for(Duration::from_millis(1)).await; + assert_eq!("test_str", str); + }); + + actix_rt::System::run(|| { + assert_eq!("test_str", str); + actix_rt::System::current().stop(); + }) + .unwrap(); } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index ce309c94..838c3cf1 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -22,13 +22,16 @@ fn test_bind() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); - let srv = Server::build() - .workers(1) - .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .start(); + let mut sys = actix_rt::System::new("test"); + + let srv = sys.block_on(lazy(|_| { + Server::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .start() + })); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); @@ -46,14 +49,16 @@ fn test_listen() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); + let mut sys = actix_rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - Server::build() - .disable_signals() - .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .start(); + sys.block_on(lazy(|_| { + Server::build() + .disable_signals() + .workers(1) + .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .start() + })); let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); @@ -78,19 +83,21 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); - let srv: Server = Server::build() - .backlog(100) - .disable_signals() - .bind("test", addr, move || { - fn_service(|io: TcpStream| async move { - let mut f = Framed::new(io, BytesCodec); - f.send(Bytes::from_static(b"test")).await.unwrap(); - Ok::<_, ()>(()) + let mut sys = actix_rt::System::new("test"); + let srv = sys.block_on(lazy(|_| { + Server::build() + .backlog(100) + .disable_signals() + .bind("test", addr, move || { + fn_service(|io: TcpStream| async move { + let mut f = Framed::new(io, BytesCodec); + f.send(Bytes::from_static(b"test")).await.unwrap(); + Ok::<_, ()>(()) + }) }) - }) - .unwrap() - .start(); + .unwrap() + .start() + })); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); @@ -144,29 +151,31 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); - let sys = actix_rt::System::new("test"); - let srv = Server::build() - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .apply(move |rt| { - let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); - rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Relaxed); - })) - }) - }) - .unwrap() - .workers(1) - .start(); + let mut sys = actix_rt::System::new("test"); + let srv = sys.block_on(lazy(|_| { + Server::build() + .disable_signals() + .configure(move |cfg| { + let num = num.clone(); + let lst = net::TcpListener::bind(addr3).unwrap(); + cfg.bind("addr1", addr1) + .unwrap() + .bind("addr2", addr2) + .unwrap() + .listen("addr3", lst) + .apply(move |rt| { + let num = num.clone(); + rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); + rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); + rt.on_start(lazy(move |_| { + let _ = num.fetch_add(1, Relaxed); + })) + }) + }) + .unwrap() + .workers(1) + .start() + })); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); diff --git a/actix-testing/src/lib.rs b/actix-testing/src/lib.rs index efcdd394..eadfe6c9 100644 --- a/actix-testing/src/lib.rs +++ b/actix-testing/src/lib.rs @@ -83,15 +83,18 @@ impl TestServer { // run server in separate thread thread::spawn(move || { - let sys = System::new("actix-test-server"); + let mut sys = System::new("actix-test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - Server::build() - .listen("test", tcp, factory)? - .workers(1) - .disable_signals() - .start(); + sys.block_on(async { + Server::build() + .listen("test", tcp, factory) + .unwrap() + .workers(1) + .disable_signals() + .start(); + }); tx.send((System::current(), local_addr)).unwrap(); sys.run()