1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-17 13:33:31 +01:00

actix-rt-less (#408)

This commit is contained in:
Rob Ede 2021-11-04 20:30:43 +00:00 committed by GitHub
parent 81d7295486
commit 5b537c7b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1075 additions and 844 deletions

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408]
[#408]: https://github.com/actix/actix-net/pull/408
## 2.3.0 - 2021-10-11 ## 2.3.0 - 2021-10-11

View File

@ -240,6 +240,15 @@ impl Arbiter {
}) })
} }
/// Try to get current running arbiter handle.
///
/// Returns `None` if no Arbiter has been started.
///
/// Unlike [`current`](Self::current), this never panics.
pub fn try_current() -> Option<ArbiterHandle> {
HANDLE.with(|cell| cell.borrow().clone())
}
/// Stop Arbiter from continuing it's event loop. /// Stop Arbiter from continuing it's event loop.
/// ///
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.

View File

@ -130,7 +130,7 @@ impl System {
/// ///
/// Returns `None` if no System has been started. /// Returns `None` if no System has been started.
/// ///
/// Contrary to `current`, this never panics. /// Unlike [`current`](Self::current), this never panics.
pub fn try_current() -> Option<System> { pub fn try_current() -> Option<System> {
CURRENT.with(|cell| cell.borrow().clone()) CURRENT.with(|cell| cell.borrow().clone())
} }

View File

@ -1,11 +1,16 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Server can be started in regular Tokio runtime. [#408]
* Expose new `Server` type whose `Future` impl resolves when server stops. [#408]
* Rename `Server` to `ServerHandle`. [#407] * Rename `Server` to `ServerHandle`. [#407]
* Add `Server::handle` to obtain handle to server. [#408]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#407] * Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#407]
* Deprecate crate-level `new` shortcut for server builder. [#408]
* Minimum supported Rust version (MSRV) is now 1.52. * Minimum supported Rust version (MSRV) is now 1.52.
[#407]: https://github.com/actix/actix-net/pull/407 [#407]: https://github.com/actix/actix-net/pull/407
[#408]: https://github.com/actix/actix-net/pull/408
## 2.0.0-beta.6 - 2021-10-11 ## 2.0.0-beta.6 - 2021-10-11

View File

@ -38,4 +38,4 @@ actix-rt = "2.0.0"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1.5.1", features = ["io-util"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@ -10,7 +10,7 @@
//! the length of each line it echos and the total size of data sent when the connection is closed. //! the length of each line it echos and the total size of data sent when the connection is closed.
use std::{ use std::{
env, io, io,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
@ -23,12 +23,10 @@ use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::future::ok; use futures_util::future::ok;
use log::{error, info}; use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
#[actix_rt::main] async fn run() -> io::Result<()> {
async fn main() -> io::Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
env::set_var("RUST_LOG", "info");
env_logger::init();
let count = Arc::new(AtomicUsize::new(0)); let count = Arc::new(AtomicUsize::new(0));
@ -88,3 +86,16 @@ async fn main() -> io::Result<()> {
.run() .run()
.await .await
} }
#[tokio::main]
async fn main() -> io::Result<()> {
run().await?;
Ok(())
}
// alternatively:
// #[actix_rt::main]
// async fn main() -> io::Result<()> {
// run().await?;
// Ok(())
// }

View File

@ -1,17 +1,18 @@
use std::time::Duration; use std::{io, thread, time::Duration};
use std::{io, thread};
use actix_rt::{ use actix_rt::time::Instant;
time::{sleep, Instant},
System,
};
use log::{debug, error, info}; use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use crate::server::ServerHandle; use crate::{
use crate::socket::MioListener; availability::Availability,
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; socket::MioListener,
use crate::worker::{Conn, WorkerHandleAccept}; waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer},
ServerBuilder, ServerHandle,
};
const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510);
struct ServerSocketInfo { struct ServerSocketInfo {
token: usize, token: usize,
@ -20,200 +21,110 @@ struct ServerSocketInfo {
/// Timeout is used to mark the deadline when this socket's listener should be registered again /// Timeout is used to mark the deadline when this socket's listener should be registered again
/// after an error. /// after an error.
timeout: Option<Instant>, timeout: Option<actix_rt::time::Instant>,
}
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `Worker`.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
srv: Option<ServerHandle>,
poll: Option<Poll>,
waker: WakerQueue,
}
impl AcceptLoop {
pub fn new(srv: ServerHandle) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
Self {
srv: Some(srv),
poll: Some(poll),
waker,
}
}
pub(crate) fn waker_owned(&self) -> WakerQueue {
self.waker.clone()
}
pub fn wake(&self, i: WakerInterest) {
self.waker.wake(i);
}
pub(crate) fn start(
&mut self,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
Accept::start(poll, waker, socks, srv, handles);
}
} }
/// poll instance of the server. /// poll instance of the server.
struct Accept { pub(crate) struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker_queue: WakerQueue,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: ServerHandle, srv: ServerHandle,
next: usize, next: usize,
avail: Availability, avail: Availability,
/// use the smallest duration from sockets timeout.
timeout: Option<Duration>,
paused: bool, paused: bool,
} }
/// Array of u128 with every bit as marker for a worker handle's availability.
#[derive(Debug, Default)]
struct Availability([u128; 4]);
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
impl Accept { impl Accept {
pub(crate) fn start( pub(crate) fn start(
poll: Poll, sockets: Vec<(usize, MioListener)>,
waker: WakerQueue, builder: &ServerBuilder,
socks: Vec<(usize, MioListener)>, ) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> {
srv: ServerHandle, let handle_server = ServerHandle::new(builder.cmd_tx.clone());
handles: Vec<WorkerHandleAccept>,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
// actix system.
let sys = System::current();
thread::Builder::new()
.name("actix-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(&mut sockets); // construct poll instance and its waker
let poll = Poll::new()?;
let waker_queue = WakerQueue::new(poll.registry())?;
// start workers and collect handles
let (handles_accept, handles_server) = (0..builder.threads)
.map(|idx| {
// clone service factories
let factories = builder
.factories
.iter()
.map(|f| f.clone_factory())
.collect::<Vec<_>>();
// start worker using service factories
ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config)
}) })
.unwrap(); .collect::<io::Result<Vec<_>>>()?
.into_iter()
.unzip();
let (mut accept, mut sockets) = Accept::new_with_sockets(
poll,
waker_queue.clone(),
sockets,
handles_accept,
handle_server,
)?;
thread::Builder::new()
.name("actix-server acceptor".to_owned())
.spawn(move || accept.poll_with(&mut sockets))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok((waker_queue, handles_server))
} }
fn new_with_sockets( fn new_with_sockets(
poll: Poll, poll: Poll,
waker: WakerQueue, waker_queue: WakerQueue,
socks: Vec<(usize, MioListener)>, sockets: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>, accept_handles: Vec<WorkerHandleAccept>,
srv: ServerHandle, server_handle: ServerHandle,
) -> (Accept, Vec<ServerSocketInfo>) { ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
let sockets = socks let sockets = sockets
.into_iter() .into_iter()
.map(|(token, mut lst)| { .map(|(token, mut lst)| {
// Start listening for incoming connections // Start listening for incoming connections
poll.registry() poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE) .register(&mut lst, MioToken(token), Interest::READABLE)?;
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
ServerSocketInfo { Ok(ServerSocketInfo {
token, token,
lst, lst,
timeout: None, timeout: None,
} })
}) })
.collect(); .collect::<io::Result<_>>()?;
let mut avail = Availability::default(); let mut avail = Availability::default();
// Assume all handles are avail at construct time. // Assume all handles are avail at construct time.
avail.set_available_all(&handles); avail.set_available_all(&accept_handles);
let accept = Accept { let accept = Accept {
poll, poll,
waker, waker_queue,
handles, handles: accept_handles,
srv, srv: server_handle,
next: 0, next: 0,
avail, avail,
timeout: None,
paused: false, paused: false,
}; };
(accept, sockets) Ok((accept, sockets))
} }
/// blocking wait for readiness events triggered by mio
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(256);
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { if let Err(e) = self.poll.poll(&mut events, None) {
@ -239,6 +150,9 @@ impl Accept {
} }
} }
} }
// check for timeout and re-register sockets
self.process_timeout(sockets);
} }
} }
@ -249,7 +163,7 @@ impl Accept {
loop { loop {
// take guard with every iteration so no new interest can be added // take guard with every iteration so no new interest can be added
// until the current task is done. // until the current task is done.
let mut guard = self.waker.guard(); let mut guard = self.waker_queue.guard();
match guard.pop_front() { match guard.pop_front() {
// worker notify it becomes available. // worker notify it becomes available.
Some(WakerInterest::WorkerAvailable(idx)) => { Some(WakerInterest::WorkerAvailable(idx)) => {
@ -261,6 +175,7 @@ impl Accept {
self.accept_all(sockets); self.accept_all(sockets);
} }
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
drop(guard); drop(guard);
@ -272,12 +187,7 @@ impl Accept {
self.accept_all(sockets); self.accept_all(sockets);
} }
} }
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
}
Some(WakerInterest::Pause) => { Some(WakerInterest::Pause) => {
drop(guard); drop(guard);
@ -287,6 +197,7 @@ impl Accept {
self.deregister_all(sockets); self.deregister_all(sockets);
} }
} }
Some(WakerInterest::Resume) => { Some(WakerInterest::Resume) => {
drop(guard); drop(guard);
@ -300,6 +211,7 @@ impl Accept {
self.accept_all(sockets); self.accept_all(sockets);
} }
} }
Some(WakerInterest::Stop) => { Some(WakerInterest::Stop) => {
if !self.paused { if !self.paused {
self.deregister_all(sockets); self.deregister_all(sockets);
@ -307,6 +219,7 @@ impl Accept {
return true; return true;
} }
// waker queue is drained // waker queue is drained
None => { None => {
// Reset the WakerQueue before break so it does not grow infinitely // Reset the WakerQueue before break so it does not grow infinitely
@ -318,25 +231,44 @@ impl Accept {
} }
} }
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now(); // always remove old timeouts
sockets if self.timeout.take().is_some() {
.iter_mut() let now = Instant::now();
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst { sockets
info.timeout = Some(inst); .iter_mut()
} else if !self.paused { // Only sockets that had an associated timeout were deregistered.
self.register_logged(info); .filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
// still timed out; try to set new timeout
info.timeout = Some(inst);
self.set_timeout(inst - now);
} else if !self.paused {
// timeout expired; register socket again
self.register_logged(info);
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
}
/// Update accept timeout with `duration` if it is shorter than current timeout.
fn set_timeout(&mut self, duration: Duration) {
match self.timeout {
Some(ref mut timeout) => {
if *timeout > duration {
*timeout = duration;
} }
}
// Drop the timeout if server is paused and socket timeout is expired. None => self.timeout = Some(duration),
// When server recovers from pause it will register all sockets without }
// a timeout value so this socket register will be delayed till then.
});
} }
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
@ -382,12 +314,12 @@ impl Accept {
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation: // This is a best effort implementation with following limitation:
// //
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout // Every ServerSocketInfo with associated timeout will be skipped and it's timeout is
// is removed in the process. // removed in the process.
// //
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered // (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before
// before expected timing. // expected timing.
sockets sockets
.iter_mut() .iter_mut()
// Take all timeout. // Take all timeout.
@ -476,13 +408,7 @@ impl Accept {
// the poll would need it mark which socket and when it's // the poll would need it mark which socket and when it's
// listener should be registered // listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500)); info.timeout = Some(Instant::now() + Duration::from_millis(500));
self.set_timeout(TIMEOUT_DURATION_ON_ERROR);
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
return; return;
} }
@ -521,67 +447,14 @@ impl Accept {
} }
} }
#[cfg(test)] /// This function defines errors that are per-connection; if we get this error from the `accept()`
mod test { /// system call it means the next connection might be ready to be accepted.
use super::Availability; ///
/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is
fn single(aval: &mut Availability, idx: usize) { /// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could
aval.set_available(idx, true); /// enter into a temporary spin loop.
assert!(aval.available()); fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
aval.set_available(idx, true); || e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
} }

View File

@ -0,0 +1,121 @@
use crate::worker::WorkerHandleAccept;
/// Array of u128 with every bit as marker for a worker handle's availability.
#[derive(Debug, Default)]
pub(crate) struct Availability([u128; 4]);
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
pub(crate) fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
pub(crate) fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
pub(crate) fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
pub(crate) fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
pub(crate) fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
aval.set_available(idx, true);
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
}

View File

@ -1,43 +1,31 @@
use std::{ use std::{io, time::Duration};
future::Future,
io, mem, use actix_rt::net::TcpStream;
pin::Pin, use log::{info, trace};
task::{Context, Poll}, use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
time::Duration,
use crate::{
server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
},
worker::ServerWorkerConfig,
Server,
}; };
use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; /// [Server] builder.
use log::{error, info};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
};
use crate::accept::AcceptLoop;
use crate::join_all;
use crate::server::{ServerCommand, ServerHandle};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
/// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
threads: usize, pub(crate) threads: usize,
token: usize, pub(crate) token: usize,
backlog: u32, pub(crate) backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>, pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
services: Vec<Box<dyn InternalServiceFactory>>, pub(crate) sockets: Vec<(usize, String, MioListener)>,
sockets: Vec<(usize, String, MioListener)>, pub(crate) exit: bool,
accept: AcceptLoop, pub(crate) listen_os_signals: bool,
exit: bool, pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
no_signals: bool, pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
cmd: UnboundedReceiver<ServerCommand>, pub(crate) worker_config: ServerWorkerConfig,
server: ServerHandle,
notify: Vec<oneshot::Sender<()>>,
worker_config: ServerWorkerConfig,
} }
impl Default for ServerBuilder { impl Default for ServerBuilder {
@ -49,22 +37,18 @@ impl Default for ServerBuilder {
impl ServerBuilder { impl ServerBuilder {
/// Create new Server builder instance /// Create new Server builder instance
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel(); let (cmd_tx, cmd_rx) = unbounded_channel();
let server = ServerHandle::new(tx);
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
token: 0, token: 0,
handles: Vec::new(), factories: Vec::new(),
services: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
accept: AcceptLoop::new(server.clone()),
backlog: 2048, backlog: 2048,
exit: false, exit: false,
no_signals: false, listen_os_signals: true,
cmd: rx, cmd_tx,
notify: Vec::new(), cmd_rx,
server,
worker_config: ServerWorkerConfig::default(), worker_config: ServerWorkerConfig::default(),
} }
} }
@ -134,9 +118,9 @@ impl ServerBuilder {
self self
} }
/// Disable signal handling. /// Disable OS signal handling.
pub fn disable_signals(mut self) -> Self { pub fn disable_signals(mut self) -> Self {
self.no_signals = true; self.listen_os_signals = false;
self self
} }
@ -160,9 +144,11 @@ impl ServerBuilder {
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
trace!("binding server to: {:?}", &sockets);
for lst in sockets { for lst in sockets {
let token = self.next_token(); let token = self.next_token();
self.services.push(StreamNewService::create( self.factories.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory.clone(), factory.clone(),
@ -171,11 +157,57 @@ impl ServerBuilder {
self.sockets self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); .push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
} }
Ok(self) Ok(self)
} }
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let token = self.next_token();
self.factories.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Starts processing incoming connections and return server controller.
pub fn run(self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
Server::new(self)
}
}
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
}
#[cfg(unix)]
impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
#[cfg(unix)]
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServiceFactory<actix_rt::net::UnixStream>,
@ -198,7 +230,6 @@ impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
/// ///
/// Useful when running as a systemd service and a socket FD is acquired externally. /// Useful when running as a systemd service and a socket FD is acquired externally.
#[cfg(unix)]
pub fn listen_uds<F, N: AsRef<str>>( pub fn listen_uds<F, N: AsRef<str>>(
mut self, mut self,
name: N, name: N,
@ -212,7 +243,7 @@ impl ServerBuilder {
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let token = self.next_token(); let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create( self.factories.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory, factory,
@ -222,223 +253,6 @@ impl ServerBuilder {
.push((token, name.as_ref().to_string(), MioListener::from(lst))); .push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self) Ok(self)
} }
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> ServerHandle {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
for (_, name, lst) in &self.sockets {
info!(
r#"Starting service: "{}", workers: {}, listening on: {}"#,
name,
self.threads,
lst.local_addr()
);
}
// start workers
let handles = (0..self.threads)
.map(|idx| {
let (handle_accept, handle_server) =
self.start_worker(idx, self.accept.waker_owned());
self.handles.push((idx, handle_server));
handle_accept
})
.collect();
// start accept thread
self.accept.start(
mem::take(&mut self.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect(),
handles,
);
// handle signals
if !self.no_signals {
Signals::start(self.server.clone());
}
// start http server actor
let server = self.server.clone();
rt::spawn(self);
server
}
}
fn start_worker(
&self,
idx: usize,
waker_queue: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, waker_queue, self.worker_config)
}
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.accept.wake(WakerInterest::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.accept.wake(WakerInterest::Resume);
let _ = tx.send(());
}
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received; starting graceful shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
}
}
ServerCommand::Notify(tx) => {
self.notify.push(tx);
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
self.accept.wake(WakerInterest::Stop);
let notify = std::mem::take(&mut self.notify);
// stop workers
let stop = self
.handles
.iter()
.map(move |worker| worker.1.stop(graceful))
.collect();
rt::spawn(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(stop).await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
if exit {
sleep(Duration::from_millis(300)).await;
System::current().stop();
}
});
}
ServerCommand::WorkerFaulted(idx) => {
let mut found = false;
for i in 0..self.handles.len() {
if self.handles[i].0 == idx {
self.handles.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker {} has died; restarting", idx);
let mut new_idx = self.handles.len();
'found: loop {
for i in 0..self.handles.len() {
if self.handles[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (handle_accept, handle_server) =
self.start_worker(new_idx, self.accept.waker_owned());
self.handles.push((new_idx, handle_server));
self.accept.wake(WakerInterest::Worker(handle_accept));
}
}
}
}
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
}
impl Future for ServerBuilder {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.cmd).poll_recv(cx) {
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
_ => return Poll::Pending,
}
}
}
} }
pub(super) fn bind_addr<S: ToSocketAddrs>( pub(super) fn bind_addr<S: ToSocketAddrs>(

View File

@ -0,0 +1,53 @@
use std::future::Future;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use crate::server::ServerCommand;
/// Server handle.
#[derive(Debug, Clone)]
pub struct ServerHandle {
cmd_tx: UnboundedSender<ServerCommand>,
}
impl ServerHandle {
pub(crate) fn new(cmd_tx: UnboundedSender<ServerCommand>) -> Self {
ServerHandle { cmd_tx }
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.cmd_tx.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections.
///
/// May drop socket pending connection. All open connections remain active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Pause(tx));
async {
let _ = rx.await;
}
}
/// Resume accepting incoming connections.
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async {
let _ = rx.await;
}
}
}

View File

@ -0,0 +1,144 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_core::future::{BoxFuture, LocalBoxFuture};
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
fut: Vec<JoinFuture<T>>,
}
pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + Send + 'static>) -> JoinAll<T> {
let fut = fut
.into_iter()
.map(|f| JoinFuture::Future(Box::pin(f)))
.collect();
JoinAll { fut }
}
enum JoinFuture<T> {
Future(BoxFuture<'static, T>),
Result(Option<T>),
}
impl<T> Unpin for JoinAll<T> {}
impl<T> Future for JoinAll<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinFuture::Future(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
pub(crate) fn join_all_local<T>(
fut: Vec<impl Future<Output = T> + 'static>,
) -> JoinAllLocal<T> {
let fut = fut
.into_iter()
.map(|f| JoinLocalFuture::LocalFuture(Box::pin(f)))
.collect();
JoinAllLocal { fut }
}
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAllLocal<T> {
fut: Vec<JoinLocalFuture<T>>,
}
enum JoinLocalFuture<T> {
LocalFuture(LocalBoxFuture<'static, T>),
Result(Option<T>),
}
impl<T> Unpin for JoinAllLocal<T> {}
impl<T> Future for JoinAllLocal<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::LocalFuture(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinLocalFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use actix_utils::future::ready;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
#[actix_rt::test]
async fn test_join_all_local() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all_local(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -5,7 +5,10 @@
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
mod accept; mod accept;
mod availability;
mod builder; mod builder;
mod handle;
mod join_all;
mod server; mod server;
mod service; mod service;
mod signals; mod signals;
@ -15,89 +18,17 @@ mod waker_queue;
mod worker; mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::server::{Server, ServerHandle}; pub use self::handle::ServerHandle;
pub use self::server::Server;
pub use self::service::ServiceFactory; pub use self::service::ServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;
#[doc(hidden)] #[doc(hidden)]
pub use self::socket::FromStream; pub use self::socket::FromStream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Start server building process /// Start server building process
#[doc(hidden)]
#[deprecated(since = "2.0.0", note = "Use `Server::build()`.")]
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
ServerBuilder::default() ServerBuilder::default()
} }
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
fut: Vec<JoinFuture<T>>,
}
pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + 'static>) -> JoinAll<T> {
let fut = fut
.into_iter()
.map(|f| JoinFuture::Future(Box::pin(f)))
.collect();
JoinAll { fut }
}
enum JoinFuture<T> {
Future(Pin<Box<dyn Future<Output = T>>>),
Result(Option<T>),
}
impl<T> Unpin for JoinAll<T> {}
impl<T> Future for JoinAll<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinFuture::Future(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use actix_utils::future::ready;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -1,125 +1,359 @@
use std::future::Future; use std::{
use std::io; future::Future,
use std::pin::Pin; io, mem,
use std::task::{Context, Poll}; pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::mpsc::UnboundedSender; use actix_rt::{time::sleep, System};
use tokio::sync::oneshot; use futures_core::future::BoxFuture;
use log::{error, info};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::builder::ServerBuilder; use crate::{
use crate::signals::Signal; accept::Accept,
builder::ServerBuilder,
join_all::join_all,
service::InternalServiceFactory,
signals::{Signal, Signals},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle,
};
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum ServerCommand { pub(crate) enum ServerCommand {
/// TODO
WorkerFaulted(usize), WorkerFaulted(usize),
/// Contains return channel to notify caller of successful state change.
Pause(oneshot::Sender<()>), Pause(oneshot::Sender<()>),
/// Contains return channel to notify caller of successful state change.
Resume(oneshot::Sender<()>), Resume(oneshot::Sender<()>),
Signal(Signal),
/// TODO
Stop { Stop {
/// True if shut down should be graceful. /// True if shut down should be graceful.
graceful: bool, graceful: bool,
/// Return channel to notify caller that shutdown is complete.
completion: Option<oneshot::Sender<()>>, completion: Option<oneshot::Sender<()>>,
}, },
/// Notify of server stop
Notify(oneshot::Sender<()>),
} }
#[derive(Debug)] /// General purpose TCP server that runs services receiving Tokio `TcpStream`s.
#[non_exhaustive] ///
pub struct Server; /// Handles creating worker threads, restarting faulted workers, connection accepting, and
/// back-pressure logic.
impl Server { ///
/// Start server building process. /// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
pub fn build() -> ServerBuilder { /// distributes connections with a round-robin strategy.
ServerBuilder::default() ///
} /// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve
} /// when the server has fully shut down.
/// Server handle.
/// ///
/// # Shutdown Signals /// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. /// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown.
/// ///
/// A graceful shutdown will wait for all workers to stop first. /// A graceful shutdown will wait for all workers to stop first.
#[derive(Debug)] ///
pub struct ServerHandle( /// # Examples
UnboundedSender<ServerCommand>, /// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`.
Option<oneshot::Receiver<()>>, ///
); /// ```no_run
/// use std::io;
///
/// use actix_rt::net::TcpStream;
/// use actix_server::Server;
/// use actix_service::{fn_service, ServiceFactoryExt as _};
/// use bytes::BytesMut;
/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
///
/// #[actix_rt::main]
/// async fn main() -> io::Result<()> {
/// let bind_addr = ("127.0.0.1", 8080);
///
/// Server::build()
/// .bind("echo", bind_addr, move || {
/// fn_service(move |mut stream: TcpStream| {
/// async move {
/// let mut size = 0;
/// let mut buf = BytesMut::new();
///
/// loop {
/// match stream.read_buf(&mut buf).await {
/// // end of stream; bail from loop
/// Ok(0) => break,
///
/// // write bytes back to stream
/// Ok(bytes_read) => {
/// stream.write_all(&buf[size..]).await.unwrap();
/// size += bytes_read;
/// }
///
/// Err(err) => {
/// eprintln!("Stream Error: {:?}", err);
/// return Err(());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// })
/// .map_err(|err| eprintln!("Service Error: {:?}", err))
/// })?
/// .run()
/// .await
/// }
/// ```
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum Server {
Server(ServerInner),
Error(Option<io::Error>),
}
impl ServerHandle { impl Server {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self { /// Create server build.
ServerHandle(tx, None) pub fn build() -> ServerBuilder {
ServerBuilder::default()
} }
pub(crate) fn signal(&self, sig: Signal) { pub(crate) fn new(mut builder: ServerBuilder) -> Self {
let _ = self.0.send(ServerCommand::Signal(sig)); let sockets = mem::take(&mut builder.sockets)
} .into_iter()
.map(|t| (t.0, t.2))
.collect();
pub(crate) fn worker_faulted(&self, idx: usize) { // Give log information on what runtime will be used.
let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); let is_tokio = tokio::runtime::Handle::try_current().is_ok();
} let is_actix = actix_rt::System::try_current().is_some();
/// Pause accepting incoming connections match (is_tokio, is_actix) {
/// (true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"),
/// If socket contains some pending connection, they might be dropped. (_, true) => info!("Actix runtime found. Starting in Actix runtime"),
/// All opened connection remains active. (_, _) => info!(
pub fn pause(&self) -> impl Future<Output = ()> { "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime"
let (tx, rx) = oneshot::channel(); ),
let _ = self.0.send(ServerCommand::Pause(tx)); }
async {
let _ = rx.await; for (_, name, lst) in &builder.sockets {
info!(
r#"Starting service: "{}", workers: {}, listening on: {}"#,
name,
builder.threads,
lst.local_addr()
);
}
match Accept::start(sockets, &builder) {
Ok((waker_queue, worker_handles)) => {
// construct OS signals listener future
let signals = (builder.listen_os_signals).then(Signals::new);
Self::Server(ServerInner {
cmd_tx: builder.cmd_tx.clone(),
cmd_rx: builder.cmd_rx,
signals,
waker_queue,
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
exit: builder.exit,
stop_task: None,
})
}
Err(err) => Self::Error(Some(err)),
} }
} }
/// Resume accepting incoming connections /// Get a handle for ServerFuture that can be used to change state of actix server.
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
/// ///
/// If server starts with `spawn()` method, then spawned thread get terminated. /// See [ServerHandle](ServerHandle) for usage.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> { pub fn handle(&self) -> ServerHandle {
let (tx, rx) = oneshot::channel(); match self {
let _ = self.0.send(ServerCommand::Stop { Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()),
graceful, Server::Error(err) => {
completion: Some(tx), // TODO: i don't think this is the best way to handle server startup fail
}); panic!(
async { "server handle can not be obtained because server failed to start up: {}",
let _ = rx.await; err.as_ref().unwrap()
);
}
} }
} }
} }
impl Clone for ServerHandle { impl Future for Server {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for ServerHandle {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); match self.as_mut().get_mut() {
Server::Error(err) => Poll::Ready(Err(err
.take()
.expect("Server future cannot be polled after error"))),
if this.1.is_none() { Server::Server(inner) => {
let (tx, rx) = oneshot::channel(); // poll Signals
if this.0.send(ServerCommand::Notify(tx)).is_err() { if let Some(ref mut signals) = inner.signals {
return Poll::Ready(Ok(())); if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
inner.stop_task = inner.handle_signal(signal);
// drop signals listener
inner.signals = None;
}
}
// handle stop tasks and eager drain command channel
loop {
if let Some(ref mut fut) = inner.stop_task {
// only resolve stop task and exit
return fut.as_mut().poll(cx).map(|_| Ok(()));
}
match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(cmd)) => {
// if stop task is required, set it and loop
inner.stop_task = inner.handle_cmd(cmd);
}
_ => return Poll::Pending,
}
}
}
}
}
}
pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
exit: bool,
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
signals: Option<Signals>,
waker_queue: WakerQueue,
stop_task: Option<BoxFuture<'static, ()>>,
}
impl ServerInner {
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
None
}
ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
None
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
self.waker_queue.wake(WakerInterest::Stop);
// stop workers
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
Some(Box::pin(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(workers_stop).await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
}
ServerCommand::WorkerFaulted(idx) => {
// TODO: maybe just return with warning log if not found ?
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
error!("Worker {} has died; restarting", idx);
let factories = self
.services
.iter()
.map(|service| service.clone_factory())
.collect();
match ServerWorker::start(
idx,
factories,
self.waker_queue.clone(),
self.worker_config,
) {
Ok((handle_accept, handle_server)) => {
*self
.worker_handles
.iter_mut()
.find(|wrk| wrk.idx == idx)
.unwrap() = handle_server;
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
}
Err(err) => error!("can not restart worker {}: {}", idx, err),
};
None
}
}
}
fn handle_signal(&mut self, signal: Signal) -> Option<BoxFuture<'static, ()>> {
match signal {
Signal::Int => {
info!("SIGINT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received; starting graceful shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
} }
this.1 = Some(rx);
}
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Ok(())),
} }
} }
} }

View File

@ -1,12 +1,16 @@
use std::future::Future; use std::{
use std::pin::Pin; fmt,
use std::task::{Context, Poll}; future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::server::ServerHandle; use log::trace;
/// Types of process signals. /// Types of process signals.
#[allow(dead_code)] // #[allow(dead_code)]
#[derive(PartialEq, Clone, Copy, Debug)] #[derive(Debug, Clone, Copy, PartialEq)]
#[allow(dead_code)] // variants are never constructed on non-unix
pub(crate) enum Signal { pub(crate) enum Signal {
/// `SIGINT` /// `SIGINT`
Int, Int,
@ -18,10 +22,18 @@ pub(crate) enum Signal {
Quit, Quit,
} }
impl fmt::Display for Signal {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Signal::Int => "SIGINT",
Signal::Term => "SIGTERM",
Signal::Quit => "SIGQUIT",
})
}
}
/// Process signal listener. /// Process signal listener.
pub(crate) struct Signals { pub(crate) struct Signals {
srv: ServerHandle,
#[cfg(not(unix))] #[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
@ -30,14 +42,15 @@ pub(crate) struct Signals {
} }
impl Signals { impl Signals {
/// Spawns a signal listening future that is able to send commands to the `Server`. /// Constructs an OS signal listening future.
pub(crate) fn start(srv: ServerHandle) { pub(crate) fn new() -> Self {
trace!("setting up OS signal listener");
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
actix_rt::spawn(Signals { Signals {
srv,
signals: Box::pin(actix_rt::signal::ctrl_c()), signals: Box::pin(actix_rt::signal::ctrl_c()),
}); }
} }
#[cfg(unix)] #[cfg(unix)]
@ -66,33 +79,30 @@ impl Signals {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
actix_rt::spawn(Signals { srv, signals }); Signals { signals }
} }
} }
} }
impl Future for Signals { impl Future for Signals {
type Output = (); type Output = Signal;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))] #[cfg(not(unix))]
match self.signals.as_mut().poll(cx) { {
Poll::Ready(_) => { self.signals.as_mut().poll(cx).map(|_| Signal::Int)
self.srv.signal(Signal::Int);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
} }
#[cfg(unix)] #[cfg(unix)]
{ {
for (sig, fut) in self.signals.iter_mut() { for (sig, fut) in self.signals.iter_mut() {
// TODO: match on if let Some ?
if Pin::new(fut).poll_recv(cx).is_ready() { if Pin::new(fut).poll_recv(cx).is_ready() {
let sig = *sig; trace!("{} received", sig);
self.srv.signal(sig); return Poll::Ready(*sig);
return Poll::Ready(());
} }
} }
Poll::Pending Poll::Pending
} }
} }

View File

@ -1,9 +1,9 @@
use std::sync::mpsc; use std::sync::mpsc;
use std::{net, thread}; use std::{io, net, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServiceFactory}; use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory};
/// A testing server. /// A testing server.
/// ///
@ -34,7 +34,8 @@ pub struct TestServerRuntime {
addr: net::SocketAddr, addr: net::SocketAddr,
host: String, host: String,
port: u16, port: u16,
system: System, server_handle: ServerHandle,
thread_handle: Option<thread::JoinHandle<io::Result<()>>>,
} }
impl TestServer { impl TestServer {
@ -46,20 +47,22 @@ impl TestServer {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread
thread::spawn(move || { let thread_handle = thread::spawn(move || {
let sys = System::new(); System::new().block_on(async {
factory(Server::build()).workers(1).disable_signals().run(); let server = factory(Server::build()).workers(1).disable_signals().run();
tx.send(server.handle()).unwrap();
tx.send(System::current()).unwrap(); server.await
sys.run() })
}); });
let system = rx.recv().unwrap();
let server_handle = rx.recv().unwrap();
TestServerRuntime { TestServerRuntime {
system,
addr: "127.0.0.1:0".parse().unwrap(), addr: "127.0.0.1:0".parse().unwrap(),
host: "127.0.0.1".to_string(), host: "127.0.0.1".to_string(),
port: 0, port: 0,
server_handle,
thread_handle: Some(thread_handle),
} }
} }
@ -68,24 +71,25 @@ impl TestServer {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread
thread::spawn(move || { let thread_handle = thread::spawn(move || {
let sys = System::new(); let sys = System::new();
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
sys.block_on(async { sys.block_on(async {
Server::build() let server = Server::build()
.listen("test", tcp, factory) .listen("test", tcp, factory)
.unwrap() .unwrap()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.run(); .run();
tx.send((System::current(), local_addr)).unwrap();
}); tx.send((server.handle(), local_addr)).unwrap();
sys.run() server.await
})
}); });
let (system, addr) = rx.recv().unwrap(); let (server_handle, addr) = rx.recv().unwrap();
let host = format!("{}", addr.ip()); let host = format!("{}", addr.ip());
let port = addr.port(); let port = addr.port();
@ -94,7 +98,8 @@ impl TestServer {
addr, addr,
host, host,
port, port,
system, server_handle,
thread_handle: Some(thread_handle),
} }
} }
@ -127,7 +132,8 @@ impl TestServerRuntime {
/// Stop server. /// Stop server.
fn stop(&mut self) { fn stop(&mut self) {
self.system.stop(); let _ = self.server_handle.stop(false);
self.thread_handle.take().unwrap().join().unwrap().unwrap();
} }
/// Connect to server, returning a Tokio `TcpStream`. /// Connect to server, returning a Tokio `TcpStream`.

View File

@ -78,12 +78,7 @@ pub(crate) enum WakerInterest {
Pause, Pause,
Resume, Resume,
Stop, Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting /// `Worker` is an interest that is triggered after a worker faults. This is determined by
/// connection `Accept` would deregister socket listener temporary and wake up the poll and /// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`.
/// register them again after the delayed future resolve.
Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandleAccept`.
Worker(WorkerHandleAccept), Worker(WorkerHandleAccept),
} }

View File

@ -1,6 +1,6 @@
use std::{ use std::{
future::Future, future::Future,
mem, io, mem,
pin::Pin, pin::Pin,
rc::Rc, rc::Rc,
sync::{ sync::{
@ -14,7 +14,7 @@ use std::{
use actix_rt::{ use actix_rt::{
spawn, spawn,
time::{sleep, Instant, Sleep}, time::{sleep, Instant, Sleep},
Arbiter, Arbiter, ArbiterHandle, System,
}; };
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace}; use log::{error, info, trace};
@ -23,12 +23,14 @@ use tokio::sync::{
oneshot, oneshot,
}; };
use crate::join_all; use crate::{
use crate::service::{BoxedServerService, InternalServiceFactory}; join_all::join_all_local,
use crate::socket::MioStream; service::{BoxedServerService, InternalServiceFactory},
use crate::waker_queue::{WakerInterest, WakerQueue}; socket::MioStream,
waker_queue::{WakerInterest, WakerQueue},
};
/// Stop worker message. Returns `true` on successful graceful shutdown. /// Stop worker message. Returns `true` on successful graceful shutdown
/// and `false` if some connections still alive when shutdown execute. /// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop { pub(crate) struct Stop {
graceful: bool, graceful: bool,
@ -41,19 +43,20 @@ pub(crate) struct Conn {
pub token: usize, pub token: usize,
} }
/// Create accept and server worker handles.
fn handle_pair( fn handle_pair(
idx: usize, idx: usize,
tx1: UnboundedSender<Conn>, conn_tx: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>, stop_tx: UnboundedSender<Stop>,
counter: Counter, counter: Counter,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept { let accept = WorkerHandleAccept {
idx, idx,
tx: tx1, conn_tx,
counter, counter,
}; };
let server = WorkerHandleServer { idx, tx: tx2 }; let server = WorkerHandleServer { idx, stop_tx };
(accept, server) (accept, server)
} }
@ -149,13 +152,13 @@ impl Drop for WorkerCounterGuard {
} }
} }
/// Handle to worker that can send connection message to worker and share the /// Handle to worker that can send connection message to worker and share the availability of worker
/// availability of worker to other thread. /// to other threads.
/// ///
/// Held by [Accept](crate::accept::Accept). /// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept { pub(crate) struct WorkerHandleAccept {
idx: usize, idx: usize,
tx: UnboundedSender<Conn>, conn_tx: UnboundedSender<Conn>,
counter: Counter, counter: Counter,
} }
@ -166,8 +169,8 @@ impl WorkerHandleAccept {
} }
#[inline(always)] #[inline(always)]
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0) self.conn_tx.send(conn).map_err(|msg| msg.0)
} }
#[inline(always)] #[inline(always)]
@ -181,15 +184,14 @@ impl WorkerHandleAccept {
/// Held by [ServerBuilder](crate::builder::ServerBuilder). /// Held by [ServerBuilder](crate::builder::ServerBuilder).
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct WorkerHandleServer { pub(crate) struct WorkerHandleServer {
#[allow(dead_code)] pub(crate) idx: usize,
idx: usize, stop_tx: UnboundedSender<Stop>,
tx: UnboundedSender<Stop>,
} }
impl WorkerHandleServer { impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx.send(Stop { graceful, tx }); let _ = self.stop_tx.send(Stop { graceful, tx });
rx rx
} }
} }
@ -222,7 +224,7 @@ impl WorkerService {
} }
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WorkerServiceStatus { enum WorkerServiceStatus {
Available, Available,
Unavailable, Unavailable,
@ -233,7 +235,7 @@ enum WorkerServiceStatus {
} }
/// Config for worker behavior passed down from server builder. /// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)] #[derive(Debug, Clone, Copy)]
pub(crate) struct ServerWorkerConfig { pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration, shutdown_timeout: Duration,
max_blocking_threads: usize, max_blocking_threads: usize,
@ -272,7 +274,9 @@ impl ServerWorker {
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue, waker_queue: WakerQueue,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
trace!("starting server worker {}", idx);
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel(); let (tx2, rx2) = unbounded_channel();
@ -289,65 +293,83 @@ impl ServerWorker {
Arbiter::new() Arbiter::new()
}; };
#[cfg(not(all(target_os = "linux", feature = "io-uring")))] // get actix system context if it is set
let arbiter = Arbiter::with_tokio_rt(move || { let sys = System::try_current();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
});
arbiter.spawn(async move { // TODO: wait for server startup with sync channel
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
// a second spawn to run !Send future tasks. std::thread::Builder::new()
spawn(async move { .name("eofibef".to_owned())
let res = join_all(fut) .spawn(move || {
.await // forward existing actix system context
.into_iter() if let Some(sys) = sys {
.collect::<Result<Vec<_>, _>>(); System::set_current(sys);
let services = match res { }
Ok(res) => res
.into_iter() let rt = tokio::runtime::Builder::new_current_thread()
.fold(Vec::new(), |mut services, (factory, token, service)| { .enable_all()
assert_eq!(token, services.len()); .max_blocking_threads(config.max_blocking_threads)
services.push(WorkerService { .build()
factory, .unwrap();
service,
status: WorkerServiceStatus::Unavailable, rt.block_on(tokio::task::LocalSet::new().run_until(async move {
}); let fut = factories
services .iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
}) })
.into_boxed_slice(), .collect::<Vec<_>>();
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
};
// a third spawn to make sure ServerWorker runs as non boxed future. // a second spawn to run !Send future tasks.
spawn(ServerWorker { spawn(async move {
rx, let res = join_all_local(fut)
rx2, .await
services, .into_iter()
counter: WorkerCounter::new(idx, waker_queue, counter_clone), .collect::<Result<Vec<_>, _>>();
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
});
handle_pair(idx, tx1, tx2, counter) let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
return;
}
};
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
})
.await
.expect("task 3 panic");
})
.await
.expect("task 2 panic");
}))
})
.expect("worker thread error/panic");
Ok(handle_pair(idx, tx1, tx2, counter))
} }
fn restart_service(&mut self, idx: usize, factory_id: usize) { fn restart_service(&mut self, idx: usize, factory_id: usize) {
@ -438,7 +460,7 @@ struct Shutdown {
/// Start time of shutdown. /// Start time of shutdown.
start_from: Instant, start_from: Instant,
/// Notify of the shutdown outcome (force/grace) to stop caller. /// Notify caller of the shutdown outcome (graceful/force).
tx: oneshot::Sender<bool>, tx: oneshot::Sender<bool>,
} }
@ -450,8 +472,8 @@ impl Default for WorkerState {
impl Drop for ServerWorker { impl Drop for ServerWorker {
fn drop(&mut self) { fn drop(&mut self) {
// Stop the Arbiter ServerWorker runs on on drop. trace!("stopping ServerWorker Arbiter");
Arbiter::current().stop(); Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
} }
} }

View File

@ -30,7 +30,7 @@ fn test_bind() {
})? })?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.handle(), actix_rt::System::current()));
srv.await srv.await
}) })
@ -61,7 +61,7 @@ fn test_listen() {
})? })?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.handle(), actix_rt::System::current()));
srv.await srv.await
}) })
@ -103,7 +103,7 @@ fn test_start() {
})? })?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.handle(), actix_rt::System::current()));
srv.await srv.await
}) })
@ -166,7 +166,7 @@ async fn test_max_concurrent_connections() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {
let server = Server::build() let srv = Server::build()
// Set a relative higher backlog. // Set a relative higher backlog.
.backlog(12) .backlog(12)
// max connection for a worker is 3. // max connection for a worker is 3.
@ -187,9 +187,9 @@ async fn test_max_concurrent_connections() {
})? })?
.run(); .run();
let _ = tx.send((server.clone(), actix_rt::System::current())); let _ = tx.send((srv.handle(), actix_rt::System::current()));
server.await srv.await
}) })
}); });
@ -260,7 +260,7 @@ async fn test_service_restart() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num.clone(); let num = num.clone();
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {
let server = Server::build() let srv = Server::build()
.backlog(1) .backlog(1)
.disable_signals() .disable_signals()
.bind("addr1", addr1, move || { .bind("addr1", addr1, move || {
@ -280,12 +280,12 @@ async fn test_service_restart() {
.workers(1) .workers(1)
.run(); .run();
let _ = tx.send((server.clone(), actix_rt::System::current())); let _ = tx.send((srv.handle(), actix_rt::System::current()));
server.await srv.await
}) })
}); });
let (server, sys) = rx.recv().unwrap(); let (srv, sys) = rx.recv().unwrap();
for _ in 0..5 { for _ in 0..5 {
TcpStream::connect(addr1) TcpStream::connect(addr1)
@ -307,12 +307,12 @@ async fn test_service_restart() {
assert!(num_clone.load(Ordering::SeqCst) > 5); assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5);
let _ = server.stop(false); let _ = srv.stop(false);
sys.stop(); sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
#[ignore] #[cfg_attr(not(target_os = "linux"), ignore)]
#[actix_rt::test] #[actix_rt::test]
async fn worker_restart() { async fn worker_restart() {
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
@ -379,19 +379,19 @@ async fn worker_restart() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let counter = counter.clone(); let counter = counter.clone();
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {
let server = Server::build() let srv = Server::build()
.disable_signals() .disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))? .bind("addr", addr, move || TestServiceFactory(counter.clone()))?
.workers(2) .workers(2)
.run(); .run();
let _ = tx.send((server.clone(), actix_rt::System::current())); let _ = tx.send((srv.handle(), actix_rt::System::current()));
server.await srv.await
}) })
}); });
let (server, sys) = rx.recv().unwrap(); let (srv, sys) = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
@ -448,7 +448,7 @@ async fn worker_restart() {
assert_eq!("3", id); assert_eq!("3", id);
stream.shutdown().await.unwrap(); stream.shutdown().await.unwrap();
let _ = server.stop(false); let _ = srv.stop(false);
sys.stop(); sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }