From 5b537c7b10033bf8f78be31976a60a9f01bfa70d Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 4 Nov 2021 20:30:43 +0000 Subject: [PATCH] actix-rt-less (#408) --- actix-rt/CHANGES.md | 3 + actix-rt/src/arbiter.rs | 9 + actix-rt/src/system.rs | 2 +- actix-server/CHANGES.md | 5 + actix-server/Cargo.toml | 2 +- actix-server/examples/tcp-echo.rs | 23 +- actix-server/src/accept.rs | 391 ++++++++++------------------- actix-server/src/availability.rs | 121 +++++++++ actix-server/src/builder.rs | 352 ++++++-------------------- actix-server/src/handle.rs | 53 ++++ actix-server/src/join_all.rs | 144 +++++++++++ actix-server/src/lib.rs | 83 +----- actix-server/src/server.rs | 404 +++++++++++++++++++++++------- actix-server/src/signals.rs | 58 +++-- actix-server/src/test_server.rs | 44 ++-- actix-server/src/waker_queue.rs | 9 +- actix-server/src/worker.rs | 182 ++++++++------ actix-server/tests/test_server.rs | 34 +-- 18 files changed, 1075 insertions(+), 844 deletions(-) create mode 100644 actix-server/src/availability.rs create mode 100644 actix-server/src/handle.rs create mode 100644 actix-server/src/join_all.rs diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 642cf27a..0ac9f24a 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408] + +[#408]: https://github.com/actix/actix-net/pull/408 ## 2.3.0 - 2021-10-11 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 97084f05..43b1bdc3 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -240,6 +240,15 @@ impl Arbiter { }) } + /// Try to get current running arbiter handle. + /// + /// Returns `None` if no Arbiter has been started. + /// + /// Unlike [`current`](Self::current), this never panics. + pub fn try_current() -> Option { + HANDLE.with(|cell| cell.borrow().clone()) + } + /// Stop Arbiter from continuing it's event loop. /// /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 23d692a4..e32d6209 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -130,7 +130,7 @@ impl System { /// /// Returns `None` if no System has been started. /// - /// Contrary to `current`, this never panics. + /// Unlike [`current`](Self::current), this never panics. pub fn try_current() -> Option { CURRENT.with(|cell| cell.borrow().clone()) } diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 83aeecd4..913a48e0 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,11 +1,16 @@ # Changes ## Unreleased - 2021-xx-xx +* Server can be started in regular Tokio runtime. [#408] +* Expose new `Server` type whose `Future` impl resolves when server stops. [#408] * Rename `Server` to `ServerHandle`. [#407] +* Add `Server::handle` to obtain handle to server. [#408] * Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#407] +* Deprecate crate-level `new` shortcut for server builder. [#408] * Minimum supported Rust version (MSRV) is now 1.52. [#407]: https://github.com/actix/actix-net/pull/407 +[#408]: https://github.com/actix/actix-net/pull/408 ## 2.0.0-beta.6 - 2021-10-11 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 46a0ad1d..a4c403f1 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -38,4 +38,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1.5.1", features = ["io-util"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 8b038da4..930ebf0a 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -10,7 +10,7 @@ //! the length of each line it echos and the total size of data sent when the connection is closed. use std::{ - env, io, + io, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -23,12 +23,10 @@ use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; -#[actix_rt::main] -async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "info"); - env_logger::init(); +async fn run() -> io::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let count = Arc::new(AtomicUsize::new(0)); @@ -88,3 +86,16 @@ async fn main() -> io::Result<()> { .run() .await } + +#[tokio::main] +async fn main() -> io::Result<()> { + run().await?; + Ok(()) +} + +// alternatively: +// #[actix_rt::main] +// async fn main() -> io::Result<()> { +// run().await?; +// Ok(()) +// } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index a872853c..bdeb6004 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,17 +1,18 @@ -use std::time::Duration; -use std::{io, thread}; +use std::{io, thread, time::Duration}; -use actix_rt::{ - time::{sleep, Instant}, - System, -}; +use actix_rt::time::Instant; use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::server::ServerHandle; -use crate::socket::MioListener; -use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{Conn, WorkerHandleAccept}; +use crate::{ + availability::Availability, + socket::MioListener, + waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, + worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer}, + ServerBuilder, ServerHandle, +}; + +const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510); struct ServerSocketInfo { token: usize, @@ -20,200 +21,110 @@ struct ServerSocketInfo { /// Timeout is used to mark the deadline when this socket's listener should be registered again /// after an error. - timeout: Option, -} - -/// Accept loop would live with `ServerBuilder`. -/// -/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to -/// `Accept` and `Worker`. -/// -/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. -pub(crate) struct AcceptLoop { - srv: Option, - poll: Option, - waker: WakerQueue, -} - -impl AcceptLoop { - pub fn new(srv: ServerHandle) -> Self { - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - - Self { - srv: Some(srv), - poll: Some(poll), - waker, - } - } - - pub(crate) fn waker_owned(&self) -> WakerQueue { - self.waker.clone() - } - - pub fn wake(&self, i: WakerInterest) { - self.waker.wake(i); - } - - pub(crate) fn start( - &mut self, - socks: Vec<(usize, MioListener)>, - handles: Vec, - ) { - let srv = self.srv.take().expect("Can not re-use AcceptInfo"); - let poll = self.poll.take().unwrap(); - let waker = self.waker.clone(); - - Accept::start(poll, waker, socks, srv, handles); - } + timeout: Option, } /// poll instance of the server. -struct Accept { +pub(crate) struct Accept { poll: Poll, - waker: WakerQueue, + waker_queue: WakerQueue, handles: Vec, srv: ServerHandle, next: usize, avail: Availability, + /// use the smallest duration from sockets timeout. + timeout: Option, paused: bool, } -/// Array of u128 with every bit as marker for a worker handle's availability. -#[derive(Debug, Default)] -struct Availability([u128; 4]); - -impl Availability { - /// Check if any worker handle is available - #[inline(always)] - fn available(&self) -> bool { - self.0.iter().any(|a| *a != 0) - } - - /// Check if worker handle is available by index - #[inline(always)] - fn get_available(&self, idx: usize) -> bool { - let (offset, idx) = Self::offset(idx); - - self.0[offset] & (1 << idx as u128) != 0 - } - - /// Set worker handle available state by index. - fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = Self::offset(idx); - - let off = 1 << idx as u128; - if avail { - self.0[offset] |= off; - } else { - self.0[offset] &= !off - } - } - - /// Set all worker handle to available state. - /// This would result in a re-check on all workers' availability. - fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { - handles.iter().for_each(|handle| { - self.set_available(handle.idx(), true); - }) - } - - /// Get offset and adjusted index of given worker handle index. - fn offset(idx: usize) -> (usize, usize) { - if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - } - } -} - -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset -} - impl Accept { pub(crate) fn start( - poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - srv: ServerHandle, - handles: Vec, - ) { - // Accept runs in its own thread and would want to spawn additional futures to current - // actix system. - let sys = System::current(); - thread::Builder::new() - .name("actix-server accept loop".to_owned()) - .spawn(move || { - System::set_current(sys); - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); + sockets: Vec<(usize, MioListener)>, + builder: &ServerBuilder, + ) -> io::Result<(WakerQueue, Vec)> { + let handle_server = ServerHandle::new(builder.cmd_tx.clone()); - accept.poll_with(&mut sockets); + // construct poll instance and its waker + let poll = Poll::new()?; + let waker_queue = WakerQueue::new(poll.registry())?; + + // start workers and collect handles + let (handles_accept, handles_server) = (0..builder.threads) + .map(|idx| { + // clone service factories + let factories = builder + .factories + .iter() + .map(|f| f.clone_factory()) + .collect::>(); + + // start worker using service factories + ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config) }) - .unwrap(); + .collect::>>()? + .into_iter() + .unzip(); + + let (mut accept, mut sockets) = Accept::new_with_sockets( + poll, + waker_queue.clone(), + sockets, + handles_accept, + handle_server, + )?; + + thread::Builder::new() + .name("actix-server acceptor".to_owned()) + .spawn(move || accept.poll_with(&mut sockets)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + Ok((waker_queue, handles_server)) } fn new_with_sockets( poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - handles: Vec, - srv: ServerHandle, - ) -> (Accept, Vec) { - let sockets = socks + waker_queue: WakerQueue, + sockets: Vec<(usize, MioListener)>, + accept_handles: Vec, + server_handle: ServerHandle, + ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> { + let sockets = sockets .into_iter() .map(|(token, mut lst)| { // Start listening for incoming connections poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); + .register(&mut lst, MioToken(token), Interest::READABLE)?; - ServerSocketInfo { + Ok(ServerSocketInfo { token, lst, timeout: None, - } + }) }) - .collect(); + .collect::>()?; let mut avail = Availability::default(); // Assume all handles are avail at construct time. - avail.set_available_all(&handles); + avail.set_available_all(&accept_handles); let accept = Accept { poll, - waker, - handles, - srv, + waker_queue, + handles: accept_handles, + srv: server_handle, next: 0, avail, + timeout: None, paused: false, }; - (accept, sockets) + Ok((accept, sockets)) } + /// blocking wait for readiness events triggered by mio fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { - let mut events = mio::Events::with_capacity(128); + let mut events = mio::Events::with_capacity(256); loop { if let Err(e) = self.poll.poll(&mut events, None) { @@ -239,6 +150,9 @@ impl Accept { } } } + + // check for timeout and re-register sockets + self.process_timeout(sockets); } } @@ -249,7 +163,7 @@ impl Accept { loop { // take guard with every iteration so no new interest can be added // until the current task is done. - let mut guard = self.waker.guard(); + let mut guard = self.waker_queue.guard(); match guard.pop_front() { // worker notify it becomes available. Some(WakerInterest::WorkerAvailable(idx)) => { @@ -261,6 +175,7 @@ impl Accept { self.accept_all(sockets); } } + // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); @@ -272,12 +187,7 @@ impl Accept { self.accept_all(sockets); } } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(sockets) - } Some(WakerInterest::Pause) => { drop(guard); @@ -287,6 +197,7 @@ impl Accept { self.deregister_all(sockets); } } + Some(WakerInterest::Resume) => { drop(guard); @@ -300,6 +211,7 @@ impl Accept { self.accept_all(sockets); } } + Some(WakerInterest::Stop) => { if !self.paused { self.deregister_all(sockets); @@ -307,6 +219,7 @@ impl Accept { return true; } + // waker queue is drained None => { // Reset the WakerQueue before break so it does not grow infinitely @@ -318,25 +231,44 @@ impl Accept { } } - fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { - let now = Instant::now(); - sockets - .iter_mut() - // Only sockets that had an associated timeout were deregistered. - .filter(|info| info.timeout.is_some()) - .for_each(|info| { - let inst = info.timeout.take().unwrap(); + fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) { + // always remove old timeouts + if self.timeout.take().is_some() { + let now = Instant::now(); - if now < inst { - info.timeout = Some(inst); - } else if !self.paused { - self.register_logged(info); + sockets + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|info| info.timeout.is_some()) + .for_each(|info| { + let inst = info.timeout.take().unwrap(); + + if now < inst { + // still timed out; try to set new timeout + info.timeout = Some(inst); + self.set_timeout(inst - now); + } else if !self.paused { + // timeout expired; register socket again + self.register_logged(info); + } + + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); + } + } + + /// Update accept timeout with `duration` if it is shorter than current timeout. + fn set_timeout(&mut self, duration: Duration) { + match self.timeout { + Some(ref mut timeout) => { + if *timeout > duration { + *timeout = duration; } - - // Drop the timeout if server is paused and socket timeout is expired. - // When server recovers from pause it will register all sockets without - // a timeout value so this socket register will be delayed till then. - }); + } + None => self.timeout = Some(duration), + } } #[cfg(not(target_os = "windows"))] @@ -382,12 +314,12 @@ impl Accept { fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // - // Every ServerSocketInfo with associate timeout will be skipped and it's timeout - // is removed in the process. + // Every ServerSocketInfo with associated timeout will be skipped and it's timeout is + // removed in the process. // - // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short - // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered - // before expected timing. + // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap + // (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before + // expected timing. sockets .iter_mut() // Take all timeout. @@ -476,13 +408,7 @@ impl Accept { // the poll would need it mark which socket and when it's // listener should be registered info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); - System::current().arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); + self.set_timeout(TIMEOUT_DURATION_ON_ERROR); return; } @@ -521,67 +447,14 @@ impl Accept { } } -#[cfg(test)] -mod test { - use super::Availability; - - fn single(aval: &mut Availability, idx: usize) { - aval.set_available(idx, true); - assert!(aval.available()); - - aval.set_available(idx, true); - - aval.set_available(idx, false); - assert!(!aval.available()); - - aval.set_available(idx, false); - assert!(!aval.available()); - } - - fn multi(aval: &mut Availability, mut idx: Vec) { - idx.iter().for_each(|idx| aval.set_available(*idx, true)); - - assert!(aval.available()); - - while let Some(idx) = idx.pop() { - assert!(aval.available()); - aval.set_available(idx, false); - } - - assert!(!aval.available()); - } - - #[test] - fn availability() { - let mut aval = Availability::default(); - - single(&mut aval, 1); - single(&mut aval, 128); - single(&mut aval, 256); - single(&mut aval, 511); - - let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); - - multi(&mut aval, idx); - - multi(&mut aval, (0..511).collect()) - } - - #[test] - #[should_panic] - fn overflow() { - let mut aval = Availability::default(); - single(&mut aval, 512); - } - - #[test] - fn pin_point() { - let mut aval = Availability::default(); - - aval.set_available(438, true); - - aval.set_available(479, true); - - assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); - } +/// This function defines errors that are per-connection; if we get this error from the `accept()` +/// system call it means the next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is +/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could +/// enter into a temporary spin loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused + || e.kind() == io::ErrorKind::ConnectionAborted + || e.kind() == io::ErrorKind::ConnectionReset } diff --git a/actix-server/src/availability.rs b/actix-server/src/availability.rs new file mode 100644 index 00000000..801b08f2 --- /dev/null +++ b/actix-server/src/availability.rs @@ -0,0 +1,121 @@ +use crate::worker::WorkerHandleAccept; + +/// Array of u128 with every bit as marker for a worker handle's availability. +#[derive(Debug, Default)] +pub(crate) struct Availability([u128; 4]); + +impl Availability { + /// Check if any worker handle is available + #[inline(always)] + pub(crate) fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Check if worker handle is available by index + #[inline(always)] + pub(crate) fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + + /// Set worker handle available state by index. + pub(crate) fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = Self::offset(idx); + + let off = 1 << idx as u128; + if avail { + self.0[offset] |= off; + } else { + self.0[offset] &= !off + } + } + + /// Set all worker handle to available state. + /// This would result in a re-check on all workers' availability. + pub(crate) fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { + handles.iter().for_each(|handle| { + self.set_available(handle.idx(), true); + }) + } + + /// Get offset and adjusted index of given worker handle index. + pub(crate) fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn single(aval: &mut Availability, idx: usize) { + aval.set_available(idx, true); + assert!(aval.available()); + + aval.set_available(idx, true); + + aval.set_available(idx, false); + assert!(!aval.available()); + + aval.set_available(idx, false); + assert!(!aval.available()); + } + + fn multi(aval: &mut Availability, mut idx: Vec) { + idx.iter().for_each(|idx| aval.set_available(*idx, true)); + + assert!(aval.available()); + + while let Some(idx) = idx.pop() { + assert!(aval.available()); + aval.set_available(idx, false); + } + + assert!(!aval.available()); + } + + #[test] + fn availability() { + let mut aval = Availability::default(); + + single(&mut aval, 1); + single(&mut aval, 128); + single(&mut aval, 256); + single(&mut aval, 511); + + let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); + + multi(&mut aval, idx); + + multi(&mut aval, (0..511).collect()) + } + + #[test] + #[should_panic] + fn overflow() { + let mut aval = Availability::default(); + single(&mut aval, 512); + } + + #[test] + fn pin_point() { + let mut aval = Availability::default(); + + aval.set_available(438, true); + + aval.set_available(479, true); + + assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); + } +} diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 7e3df9d8..0d4abe78 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,43 +1,31 @@ -use std::{ - future::Future, - io, mem, - pin::Pin, - task::{Context, Poll}, - time::Duration, +use std::{io, time::Duration}; + +use actix_rt::net::TcpStream; +use log::{info, trace}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +use crate::{ + server::ServerCommand, + service::{InternalServiceFactory, ServiceFactory, StreamNewService}, + socket::{ + MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, + }, + worker::ServerWorkerConfig, + Server, }; -use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; -use log::{error, info}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, - oneshot, -}; - -use crate::accept::AcceptLoop; -use crate::join_all; -use crate::server::{ServerCommand, ServerHandle}; -use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; - -/// Server builder +/// [Server] builder. pub struct ServerBuilder { - threads: usize, - token: usize, - backlog: u32, - handles: Vec<(usize, WorkerHandleServer)>, - services: Vec>, - sockets: Vec<(usize, String, MioListener)>, - accept: AcceptLoop, - exit: bool, - no_signals: bool, - cmd: UnboundedReceiver, - server: ServerHandle, - notify: Vec>, - worker_config: ServerWorkerConfig, + pub(crate) threads: usize, + pub(crate) token: usize, + pub(crate) backlog: u32, + pub(crate) factories: Vec>, + pub(crate) sockets: Vec<(usize, String, MioListener)>, + pub(crate) exit: bool, + pub(crate) listen_os_signals: bool, + pub(crate) cmd_tx: UnboundedSender, + pub(crate) cmd_rx: UnboundedReceiver, + pub(crate) worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -49,22 +37,18 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded_channel(); - let server = ServerHandle::new(tx); + let (cmd_tx, cmd_rx) = unbounded_channel(); ServerBuilder { threads: num_cpus::get(), token: 0, - handles: Vec::new(), - services: Vec::new(), + factories: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(server.clone()), backlog: 2048, exit: false, - no_signals: false, - cmd: rx, - notify: Vec::new(), - server, + listen_os_signals: true, + cmd_tx, + cmd_rx, worker_config: ServerWorkerConfig::default(), } } @@ -134,9 +118,9 @@ impl ServerBuilder { self } - /// Disable signal handling. + /// Disable OS signal handling. pub fn disable_signals(mut self) -> Self { - self.no_signals = true; + self.listen_os_signals = false; self } @@ -160,9 +144,11 @@ impl ServerBuilder { { let sockets = bind_addr(addr, self.backlog)?; + trace!("binding server to: {:?}", &sockets); + for lst in sockets { let token = self.next_token(); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory.clone(), @@ -171,11 +157,57 @@ impl ServerBuilder { self.sockets .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } + Ok(self) } + /// Add new service to the server. + pub fn listen>( + mut self, + name: N, + lst: StdTcpListener, + factory: F, + ) -> io::Result + where + F: ServiceFactory, + { + lst.set_nonblocking(true)?; + let addr = lst.local_addr()?; + + let token = self.next_token(); + self.factories.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory, + addr, + )); + + self.sockets + .push((token, name.as_ref().to_string(), MioListener::from(lst))); + + Ok(self) + } + + /// Starts processing incoming connections and return server controller. + pub fn run(self) -> Server { + if self.sockets.is_empty() { + panic!("Server should have at least one bound socket"); + } else { + info!("Starting {} workers", self.threads); + Server::new(self) + } + } + + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } +} + +#[cfg(unix)] +impl ServerBuilder { /// Add new unix domain service to the server. - #[cfg(unix)] pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where F: ServiceFactory, @@ -198,7 +230,6 @@ impl ServerBuilder { /// Add new unix domain service to the server. /// /// Useful when running as a systemd service and a socket FD is acquired externally. - #[cfg(unix)] pub fn listen_uds>( mut self, name: N, @@ -212,7 +243,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory, @@ -222,223 +253,6 @@ impl ServerBuilder { .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } - - /// Add new service to the server. - pub fn listen>( - mut self, - name: N, - lst: StdTcpListener, - factory: F, - ) -> io::Result - where - F: ServiceFactory, - { - lst.set_nonblocking(true)?; - let addr = lst.local_addr()?; - - let token = self.next_token(); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - addr, - )); - - self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); - - Ok(self) - } - - /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> ServerHandle { - if self.sockets.is_empty() { - panic!("Server should have at least one bound socket"); - } else { - for (_, name, lst) in &self.sockets { - info!( - r#"Starting service: "{}", workers: {}, listening on: {}"#, - name, - self.threads, - lst.local_addr() - ); - } - - // start workers - let handles = (0..self.threads) - .map(|idx| { - let (handle_accept, handle_server) = - self.start_worker(idx, self.accept.waker_owned()); - self.handles.push((idx, handle_server)); - - handle_accept - }) - .collect(); - - // start accept thread - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - handles, - ); - - // handle signals - if !self.no_signals { - Signals::start(self.server.clone()); - } - - // start http server actor - let server = self.server.clone(); - rt::spawn(self); - server - } - } - - fn start_worker( - &self, - idx: usize, - waker_queue: WakerQueue, - ) -> (WorkerHandleAccept, WorkerHandleServer) { - let services = self.services.iter().map(|v| v.clone_factory()).collect(); - - ServerWorker::start(idx, services, waker_queue, self.worker_config) - } - - fn handle_cmd(&mut self, item: ServerCommand) { - match item { - ServerCommand::Pause(tx) => { - self.accept.wake(WakerInterest::Pause); - let _ = tx.send(()); - } - ServerCommand::Resume(tx) => { - self.accept.wake(WakerInterest::Resume); - let _ = tx.send(()); - } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - match sig { - Signal::Int => { - info!("SIGINT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - - Signal::Term => { - info!("SIGTERM received; starting graceful shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - - Signal::Quit => { - info!("SIGQUIT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - } - } - ServerCommand::Notify(tx) => { - self.notify.push(tx); - } - ServerCommand::Stop { - graceful, - completion, - } => { - let exit = self.exit; - - // stop accept thread - self.accept.wake(WakerInterest::Stop); - let notify = std::mem::take(&mut self.notify); - - // stop workers - let stop = self - .handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect(); - - rt::spawn(async move { - if graceful { - // wait for all workers to shut down - let _ = join_all(stop).await; - } - - if let Some(tx) = completion { - let _ = tx.send(()); - } - - for tx in notify { - let _ = tx.send(()); - } - - if exit { - sleep(Duration::from_millis(300)).await; - System::current().stop(); - } - }); - } - ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.handles.len() { - if self.handles[i].0 == idx { - self.handles.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker {} has died; restarting", idx); - - let mut new_idx = self.handles.len(); - 'found: loop { - for i in 0..self.handles.len() { - if self.handles[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let (handle_accept, handle_server) = - self.start_worker(new_idx, self.accept.waker_owned()); - self.handles.push((new_idx, handle_server)); - self.accept.wake(WakerInterest::Worker(handle_accept)); - } - } - } - } - - fn next_token(&mut self) -> usize { - let token = self.token; - self.token += 1; - token - } -} - -impl Future for ServerBuilder { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.cmd).poll_recv(cx) { - Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), - _ => return Poll::Pending, - } - } - } } pub(super) fn bind_addr( diff --git a/actix-server/src/handle.rs b/actix-server/src/handle.rs new file mode 100644 index 00000000..49d8eb01 --- /dev/null +++ b/actix-server/src/handle.rs @@ -0,0 +1,53 @@ +use std::future::Future; + +use tokio::sync::{mpsc::UnboundedSender, oneshot}; + +use crate::server::ServerCommand; + +/// Server handle. +#[derive(Debug, Clone)] +pub struct ServerHandle { + cmd_tx: UnboundedSender, +} + +impl ServerHandle { + pub(crate) fn new(cmd_tx: UnboundedSender) -> Self { + ServerHandle { cmd_tx } + } + + pub(crate) fn worker_faulted(&self, idx: usize) { + let _ = self.cmd_tx.send(ServerCommand::WorkerFaulted(idx)); + } + + /// Pause accepting incoming connections. + /// + /// May drop socket pending connection. All open connections remain active. + pub fn pause(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ServerCommand::Pause(tx)); + async { + let _ = rx.await; + } + } + + /// Resume accepting incoming connections. + pub fn resume(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ServerCommand::Resume(tx)); + async { + let _ = rx.await; + } + } + + /// Stop incoming connection processing, stop all workers and exit. + pub fn stop(&self, graceful: bool) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ServerCommand::Stop { + graceful, + completion: Some(tx), + }); + async { + let _ = rx.await; + } + } +} diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs new file mode 100644 index 00000000..ae68871c --- /dev/null +++ b/actix-server/src/join_all.rs @@ -0,0 +1,144 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_core::future::{BoxFuture, LocalBoxFuture}; + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAll { + fut: Vec>, +} + +pub(crate) fn join_all(fut: Vec + Send + 'static>) -> JoinAll { + let fut = fut + .into_iter() + .map(|f| JoinFuture::Future(Box::pin(f))) + .collect(); + + JoinAll { fut } +} + +enum JoinFuture { + Future(BoxFuture<'static, T>), + Result(Option), +} + +impl Unpin for JoinAll {} + +impl Future for JoinAll { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Future(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + +pub(crate) fn join_all_local( + fut: Vec + 'static>, +) -> JoinAllLocal { + let fut = fut + .into_iter() + .map(|f| JoinLocalFuture::LocalFuture(Box::pin(f))) + .collect(); + + JoinAllLocal { fut } +} + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAllLocal { + fut: Vec>, +} + +enum JoinLocalFuture { + LocalFuture(LocalBoxFuture<'static, T>), + Result(Option), +} + +impl Unpin for JoinAllLocal {} + +impl Future for JoinAllLocal { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinLocalFuture::LocalFuture(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinLocalFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinLocalFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + use actix_utils::future::ready; + + #[actix_rt::test] + async fn test_join_all() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } + + #[actix_rt::test] + async fn test_join_all_local() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all_local(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } +} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 5bfc8faf..6ac8ba7e 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -5,7 +5,10 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] mod accept; +mod availability; mod builder; +mod handle; +mod join_all; mod server; mod service; mod signals; @@ -15,89 +18,17 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::server::{Server, ServerHandle}; +pub use self::handle::ServerHandle; +pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - /// Start server building process +#[doc(hidden)] +#[deprecated(since = "2.0.0", note = "Use `Server::build()`.")] pub fn new() -> ServerBuilder { ServerBuilder::default() } - -// a poor man's join future. joined future is only used when starting/stopping the server. -// pin_project and pinned futures are overkill for this task. -pub(crate) struct JoinAll { - fut: Vec>, -} - -pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { - let fut = fut - .into_iter() - .map(|f| JoinFuture::Future(Box::pin(f))) - .collect(); - - JoinAll { fut } -} - -enum JoinFuture { - Future(Pin>>), - Result(Option), -} - -impl Unpin for JoinAll {} - -impl Future for JoinAll { - type Output = Vec; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready = true; - - let this = self.get_mut(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Future(f) = fut { - match f.as_mut().poll(cx) { - Poll::Ready(t) => { - *fut = JoinFuture::Result(Some(t)); - } - Poll::Pending => ready = false, - } - } - } - - if ready { - let mut res = Vec::new(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Result(f) = fut { - res.push(f.take().unwrap()); - } - } - - Poll::Ready(res) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - use actix_utils::future::ready; - - #[actix_rt::test] - async fn test_join_all() { - let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; - let mut res = join_all(futs).await.into_iter(); - assert_eq!(Ok(1), res.next().unwrap()); - assert_eq!(Err(3), res.next().unwrap()); - assert_eq!(Ok(9), res.next().unwrap()); - } -} diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 46ffb3cd..f1edcb23 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,125 +1,359 @@ -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::oneshot; +use actix_rt::{time::sleep, System}; +use futures_core::future::BoxFuture; +use log::{error, info}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; -use crate::builder::ServerBuilder; -use crate::signals::Signal; +use crate::{ + accept::Accept, + builder::ServerBuilder, + join_all::join_all, + service::InternalServiceFactory, + signals::{Signal, Signals}, + waker_queue::{WakerInterest, WakerQueue}, + worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, + ServerHandle, +}; #[derive(Debug)] pub(crate) enum ServerCommand { + /// TODO WorkerFaulted(usize), + + /// Contains return channel to notify caller of successful state change. Pause(oneshot::Sender<()>), + + /// Contains return channel to notify caller of successful state change. Resume(oneshot::Sender<()>), - Signal(Signal), + + /// TODO Stop { /// True if shut down should be graceful. graceful: bool, + + /// Return channel to notify caller that shutdown is complete. completion: Option>, }, - /// Notify of server stop - Notify(oneshot::Sender<()>), } -#[derive(Debug)] -#[non_exhaustive] -pub struct Server; - -impl Server { - /// Start server building process. - pub fn build() -> ServerBuilder { - ServerBuilder::default() - } -} - -/// Server handle. +/// General purpose TCP server that runs services receiving Tokio `TcpStream`s. +/// +/// Handles creating worker threads, restarting faulted workers, connection accepting, and +/// back-pressure logic. +/// +/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and +/// distributes connections with a round-robin strategy. +/// +/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve +/// when the server has fully shut down. /// /// # Shutdown Signals /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a -/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. +/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown. /// /// A graceful shutdown will wait for all workers to stop first. -#[derive(Debug)] -pub struct ServerHandle( - UnboundedSender, - Option>, -); +/// +/// # Examples +/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`. +/// +/// ```no_run +/// use std::io; +/// +/// use actix_rt::net::TcpStream; +/// use actix_server::Server; +/// use actix_service::{fn_service, ServiceFactoryExt as _}; +/// use bytes::BytesMut; +/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +/// +/// #[actix_rt::main] +/// async fn main() -> io::Result<()> { +/// let bind_addr = ("127.0.0.1", 8080); +/// +/// Server::build() +/// .bind("echo", bind_addr, move || { +/// fn_service(move |mut stream: TcpStream| { +/// async move { +/// let mut size = 0; +/// let mut buf = BytesMut::new(); +/// +/// loop { +/// match stream.read_buf(&mut buf).await { +/// // end of stream; bail from loop +/// Ok(0) => break, +/// +/// // write bytes back to stream +/// Ok(bytes_read) => { +/// stream.write_all(&buf[size..]).await.unwrap(); +/// size += bytes_read; +/// } +/// +/// Err(err) => { +/// eprintln!("Stream Error: {:?}", err); +/// return Err(()); +/// } +/// } +/// } +/// +/// Ok(()) +/// } +/// }) +/// .map_err(|err| eprintln!("Service Error: {:?}", err)) +/// })? +/// .run() +/// .await +/// } +/// ``` +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub enum Server { + Server(ServerInner), + Error(Option), +} -impl ServerHandle { - pub(crate) fn new(tx: UnboundedSender) -> Self { - ServerHandle(tx, None) +impl Server { + /// Create server build. + pub fn build() -> ServerBuilder { + ServerBuilder::default() } - pub(crate) fn signal(&self, sig: Signal) { - let _ = self.0.send(ServerCommand::Signal(sig)); - } + pub(crate) fn new(mut builder: ServerBuilder) -> Self { + let sockets = mem::take(&mut builder.sockets) + .into_iter() + .map(|t| (t.0, t.2)) + .collect(); - pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); - } + // Give log information on what runtime will be used. + let is_tokio = tokio::runtime::Handle::try_current().is_ok(); + let is_actix = actix_rt::System::try_current().is_some(); - /// Pause accepting incoming connections - /// - /// If socket contains some pending connection, they might be dropped. - /// All opened connection remains active. - pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Pause(tx)); - async { - let _ = rx.await; + match (is_tokio, is_actix) { + (true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"), + (_, true) => info!("Actix runtime found. Starting in Actix runtime"), + (_, _) => info!( + "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime" + ), + } + + for (_, name, lst) in &builder.sockets { + info!( + r#"Starting service: "{}", workers: {}, listening on: {}"#, + name, + builder.threads, + lst.local_addr() + ); + } + + match Accept::start(sockets, &builder) { + Ok((waker_queue, worker_handles)) => { + // construct OS signals listener future + let signals = (builder.listen_os_signals).then(Signals::new); + + Self::Server(ServerInner { + cmd_tx: builder.cmd_tx.clone(), + cmd_rx: builder.cmd_rx, + signals, + waker_queue, + worker_handles, + worker_config: builder.worker_config, + services: builder.factories, + exit: builder.exit, + stop_task: None, + }) + } + + Err(err) => Self::Error(Some(err)), } } - /// Resume accepting incoming connections - pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Resume(tx)); - async { - let _ = rx.await; - } - } - - /// Stop incoming connection processing, stop all workers and exit. + /// Get a handle for ServerFuture that can be used to change state of actix server. /// - /// If server starts with `spawn()` method, then spawned thread get terminated. - pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Stop { - graceful, - completion: Some(tx), - }); - async { - let _ = rx.await; + /// See [ServerHandle](ServerHandle) for usage. + pub fn handle(&self) -> ServerHandle { + match self { + Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()), + Server::Error(err) => { + // TODO: i don't think this is the best way to handle server startup fail + panic!( + "server handle can not be obtained because server failed to start up: {}", + err.as_ref().unwrap() + ); + } } } } -impl Clone for ServerHandle { - fn clone(&self) -> Self { - Self(self.0.clone(), None) - } -} - -impl Future for ServerHandle { +impl Future for Server { type Output = io::Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().get_mut() { + Server::Error(err) => Poll::Ready(Err(err + .take() + .expect("Server future cannot be polled after error"))), - if this.1.is_none() { - let (tx, rx) = oneshot::channel(); - if this.0.send(ServerCommand::Notify(tx)).is_err() { - return Poll::Ready(Ok(())); + Server::Server(inner) => { + // poll Signals + if let Some(ref mut signals) = inner.signals { + if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { + inner.stop_task = inner.handle_signal(signal); + // drop signals listener + inner.signals = None; + } + } + + // handle stop tasks and eager drain command channel + loop { + if let Some(ref mut fut) = inner.stop_task { + // only resolve stop task and exit + return fut.as_mut().poll(cx).map(|_| Ok(())); + } + + match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { + Poll::Ready(Some(cmd)) => { + // if stop task is required, set it and loop + inner.stop_task = inner.handle_cmd(cmd); + } + _ => return Poll::Pending, + } + } + } + } + } +} + +pub struct ServerInner { + worker_handles: Vec, + worker_config: ServerWorkerConfig, + services: Vec>, + exit: bool, + cmd_tx: UnboundedSender, + cmd_rx: UnboundedReceiver, + signals: Option, + waker_queue: WakerQueue, + stop_task: Option>, +} + +impl ServerInner { + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + match item { + ServerCommand::Pause(tx) => { + self.waker_queue.wake(WakerInterest::Pause); + let _ = tx.send(()); + None + } + + ServerCommand::Resume(tx) => { + self.waker_queue.wake(WakerInterest::Resume); + let _ = tx.send(()); + None + } + + ServerCommand::Stop { + graceful, + completion, + } => { + let exit = self.exit; + + // stop accept thread + self.waker_queue.wake(WakerInterest::Stop); + + // stop workers + let workers_stop = self + .worker_handles + .iter() + .map(|worker| worker.stop(graceful)) + .collect::>(); + + Some(Box::pin(async move { + if graceful { + // wait for all workers to shut down + let _ = join_all(workers_stop).await; + } + + if let Some(tx) = completion { + let _ = tx.send(()); + } + + if exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } + })) + } + + ServerCommand::WorkerFaulted(idx) => { + // TODO: maybe just return with warning log if not found ? + assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx)); + + error!("Worker {} has died; restarting", idx); + + let factories = self + .services + .iter() + .map(|service| service.clone_factory()) + .collect(); + + match ServerWorker::start( + idx, + factories, + self.waker_queue.clone(), + self.worker_config, + ) { + Ok((handle_accept, handle_server)) => { + *self + .worker_handles + .iter_mut() + .find(|wrk| wrk.idx == idx) + .unwrap() = handle_server; + + self.waker_queue.wake(WakerInterest::Worker(handle_accept)); + } + + Err(err) => error!("can not restart worker {}: {}", idx, err), + }; + + None + } + } + } + + fn handle_signal(&mut self, signal: Signal) -> Option> { + match signal { + Signal::Int => { + info!("SIGINT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + + Signal::Term => { + info!("SIGTERM received; starting graceful shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + + Signal::Quit => { + info!("SIGQUIT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) } - this.1 = Some(rx); - } - - match Pin::new(this.1.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Ok(())), } } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index b80fa759..4013d7f2 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,12 +1,16 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + fmt, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; -use crate::server::ServerHandle; +use log::trace; /// Types of process signals. -#[allow(dead_code)] -#[derive(PartialEq, Clone, Copy, Debug)] +// #[allow(dead_code)] +#[derive(Debug, Clone, Copy, PartialEq)] +#[allow(dead_code)] // variants are never constructed on non-unix pub(crate) enum Signal { /// `SIGINT` Int, @@ -18,10 +22,18 @@ pub(crate) enum Signal { Quit, } +impl fmt::Display for Signal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Signal::Int => "SIGINT", + Signal::Term => "SIGTERM", + Signal::Quit => "SIGQUIT", + }) + } +} + /// Process signal listener. pub(crate) struct Signals { - srv: ServerHandle, - #[cfg(not(unix))] signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, @@ -30,14 +42,15 @@ pub(crate) struct Signals { } impl Signals { - /// Spawns a signal listening future that is able to send commands to the `Server`. - pub(crate) fn start(srv: ServerHandle) { + /// Constructs an OS signal listening future. + pub(crate) fn new() -> Self { + trace!("setting up OS signal listener"); + #[cfg(not(unix))] { - actix_rt::spawn(Signals { - srv, + Signals { signals: Box::pin(actix_rt::signal::ctrl_c()), - }); + } } #[cfg(unix)] @@ -66,33 +79,30 @@ impl Signals { }) .collect::>(); - actix_rt::spawn(Signals { srv, signals }); + Signals { signals } } } } impl Future for Signals { - type Output = (); + type Output = Signal; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] - match self.signals.as_mut().poll(cx) { - Poll::Ready(_) => { - self.srv.signal(Signal::Int); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, + { + self.signals.as_mut().poll(cx).map(|_| Signal::Int) } #[cfg(unix)] { for (sig, fut) in self.signals.iter_mut() { + // TODO: match on if let Some ? if Pin::new(fut).poll_recv(cx).is_ready() { - let sig = *sig; - self.srv.signal(sig); - return Poll::Ready(()); + trace!("{} received", sig); + return Poll::Ready(*sig); } } + Poll::Pending } } diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index ad6ee8ee..7cf0d0a6 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,9 +1,9 @@ use std::sync::mpsc; -use std::{net, thread}; +use std::{io, net, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory}; /// A testing server. /// @@ -34,7 +34,8 @@ pub struct TestServerRuntime { addr: net::SocketAddr, host: String, port: u16, - system: System, + server_handle: ServerHandle, + thread_handle: Option>>, } impl TestServer { @@ -46,20 +47,22 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { - let sys = System::new(); - factory(Server::build()).workers(1).disable_signals().run(); - - tx.send(System::current()).unwrap(); - sys.run() + let thread_handle = thread::spawn(move || { + System::new().block_on(async { + let server = factory(Server::build()).workers(1).disable_signals().run(); + tx.send(server.handle()).unwrap(); + server.await + }) }); - let system = rx.recv().unwrap(); + + let server_handle = rx.recv().unwrap(); TestServerRuntime { - system, addr: "127.0.0.1:0".parse().unwrap(), host: "127.0.0.1".to_string(), port: 0, + server_handle, + thread_handle: Some(thread_handle), } } @@ -68,24 +71,25 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { + let thread_handle = thread::spawn(move || { let sys = System::new(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); sys.block_on(async { - Server::build() + let server = Server::build() .listen("test", tcp, factory) .unwrap() .workers(1) .disable_signals() .run(); - tx.send((System::current(), local_addr)).unwrap(); - }); - sys.run() + + tx.send((server.handle(), local_addr)).unwrap(); + server.await + }) }); - let (system, addr) = rx.recv().unwrap(); + let (server_handle, addr) = rx.recv().unwrap(); let host = format!("{}", addr.ip()); let port = addr.port(); @@ -94,7 +98,8 @@ impl TestServer { addr, host, port, - system, + server_handle, + thread_handle: Some(thread_handle), } } @@ -127,7 +132,8 @@ impl TestServerRuntime { /// Stop server. fn stop(&mut self) { - self.system.stop(); + let _ = self.server_handle.stop(false); + self.thread_handle.take().unwrap().join().unwrap().unwrap(); } /// Connect to server, returning a Tokio `TcpStream`. diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 3f8669d4..a7280901 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -78,12 +78,7 @@ pub(crate) enum WakerInterest { Pause, Resume, Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, - /// `Worker` is an interest happen after a worker runs into faulted state(This is determined - /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandleAccept`. + /// `Worker` is an interest that is triggered after a worker faults. This is determined by + /// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`. Worker(WorkerHandleAccept), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f8550e18..c156444b 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,6 +1,6 @@ use std::{ future::Future, - mem, + io, mem, pin::Pin, rc::Rc, sync::{ @@ -14,7 +14,7 @@ use std::{ use actix_rt::{ spawn, time::{sleep, Instant, Sleep}, - Arbiter, + Arbiter, ArbiterHandle, System, }; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; @@ -23,12 +23,14 @@ use tokio::sync::{ oneshot, }; -use crate::join_all; -use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::MioStream; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::{ + join_all::join_all_local, + service::{BoxedServerService, InternalServiceFactory}, + socket::MioStream, + waker_queue::{WakerInterest, WakerQueue}, +}; -/// Stop worker message. Returns `true` on successful graceful shutdown. +/// Stop worker message. Returns `true` on successful graceful shutdown /// and `false` if some connections still alive when shutdown execute. pub(crate) struct Stop { graceful: bool, @@ -41,19 +43,20 @@ pub(crate) struct Conn { pub token: usize, } +/// Create accept and server worker handles. fn handle_pair( idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + conn_tx: UnboundedSender, + stop_tx: UnboundedSender, counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { let accept = WorkerHandleAccept { idx, - tx: tx1, + conn_tx, counter, }; - let server = WorkerHandleServer { idx, tx: tx2 }; + let server = WorkerHandleServer { idx, stop_tx }; (accept, server) } @@ -149,13 +152,13 @@ impl Drop for WorkerCounterGuard { } } -/// Handle to worker that can send connection message to worker and share the -/// availability of worker to other thread. +/// Handle to worker that can send connection message to worker and share the availability of worker +/// to other threads. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { idx: usize, - tx: UnboundedSender, + conn_tx: UnboundedSender, counter: Counter, } @@ -166,8 +169,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx.send(msg).map_err(|msg| msg.0) + pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> { + self.conn_tx.send(conn).map_err(|msg| msg.0) } #[inline(always)] @@ -181,15 +184,14 @@ impl WorkerHandleAccept { /// Held by [ServerBuilder](crate::builder::ServerBuilder). #[derive(Debug)] pub(crate) struct WorkerHandleServer { - #[allow(dead_code)] - idx: usize, - tx: UnboundedSender, + pub(crate) idx: usize, + stop_tx: UnboundedSender, } impl WorkerHandleServer { pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(Stop { graceful, tx }); + let _ = self.stop_tx.send(Stop { graceful, tx }); rx } } @@ -222,7 +224,7 @@ impl WorkerService { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum WorkerServiceStatus { Available, Unavailable, @@ -233,7 +235,7 @@ enum WorkerServiceStatus { } /// Config for worker behavior passed down from server builder. -#[derive(Copy, Clone)] +#[derive(Debug, Clone, Copy)] pub(crate) struct ServerWorkerConfig { shutdown_timeout: Duration, max_blocking_threads: usize, @@ -272,7 +274,9 @@ impl ServerWorker { factories: Vec>, waker_queue: WakerQueue, config: ServerWorkerConfig, - ) -> (WorkerHandleAccept, WorkerHandleServer) { + ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { + trace!("starting server worker {}", idx); + let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); @@ -289,65 +293,83 @@ impl ServerWorker { Arbiter::new() }; - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - let arbiter = Arbiter::with_tokio_rt(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - }); + // get actix system context if it is set + let sys = System::try_current(); - arbiter.spawn(async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); + // TODO: wait for server startup with sync channel - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - services + std::thread::Builder::new() + .name("eofibef".to_owned()) + .spawn(move || { + // forward existing actix system context + if let Some(sys) = sys { + System::set_current(sys); + } + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(tokio::task::LocalSet::new().run_until(async move { + let fut = factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { fut.await.map(|(t, s)| (idx, t, s)) } }) - .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - return; - } - }; + .collect::>(); - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker { - rx, - rx2, - services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), - factories: factories.into_boxed_slice(), - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }); - }); - }); + // a second spawn to run !Send future tasks. + spawn(async move { + let res = join_all_local(fut) + .await + .into_iter() + .collect::, _>>(); - handle_pair(idx, tx1, tx2, counter) + let services = match res { + Ok(res) => res + .into_iter() + .fold(Vec::new(), |mut services, (factory, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) + .into_boxed_slice(), + + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::try_current().as_ref().map(ArbiterHandle::stop); + return; + } + }; + + // a third spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker { + rx, + rx2, + services, + counter: WorkerCounter::new(idx, waker_queue, counter_clone), + factories: factories.into_boxed_slice(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }) + .await + .expect("task 3 panic"); + }) + .await + .expect("task 2 panic"); + })) + }) + .expect("worker thread error/panic"); + + Ok(handle_pair(idx, tx1, tx2, counter)) } fn restart_service(&mut self, idx: usize, factory_id: usize) { @@ -438,7 +460,7 @@ struct Shutdown { /// Start time of shutdown. start_from: Instant, - /// Notify of the shutdown outcome (force/grace) to stop caller. + /// Notify caller of the shutdown outcome (graceful/force). tx: oneshot::Sender, } @@ -450,8 +472,8 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - // Stop the Arbiter ServerWorker runs on on drop. - Arbiter::current().stop(); + trace!("stopping ServerWorker Arbiter"); + Arbiter::try_current().as_ref().map(ArbiterHandle::stop); } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 5919438b..78bc64e4 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -30,7 +30,7 @@ fn test_bind() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -61,7 +61,7 @@ fn test_listen() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -103,7 +103,7 @@ fn test_start() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -166,7 +166,7 @@ async fn test_max_concurrent_connections() { let h = thread::spawn(move || { actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() // Set a relative higher backlog. .backlog(12) // max connection for a worker is 3. @@ -187,9 +187,9 @@ async fn test_max_concurrent_connections() { })? .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); - server.await + srv.await }) }); @@ -260,7 +260,7 @@ async fn test_service_restart() { let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .backlog(1) .disable_signals() .bind("addr1", addr1, move || { @@ -280,12 +280,12 @@ async fn test_service_restart() { .workers(1) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await + let _ = tx.send((srv.handle(), actix_rt::System::current())); + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let (srv, sys) = rx.recv().unwrap(); for _ in 0..5 { TcpStream::connect(addr1) @@ -307,12 +307,12 @@ async fn test_service_restart() { assert!(num_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5); - let _ = server.stop(false); + let _ = srv.stop(false); sys.stop(); h.join().unwrap().unwrap(); } -#[ignore] +#[cfg_attr(not(target_os = "linux"), ignore)] #[actix_rt::test] async fn worker_restart() { use actix_service::{Service, ServiceFactory}; @@ -379,19 +379,19 @@ async fn worker_restart() { let h = thread::spawn(move || { let counter = counter.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .disable_signals() .bind("addr", addr, move || TestServiceFactory(counter.clone()))? .workers(2) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); - server.await + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let (srv, sys) = rx.recv().unwrap(); sleep(Duration::from_secs(3)).await; @@ -448,7 +448,7 @@ async fn worker_restart() { assert_eq!("3", id); stream.shutdown().await.unwrap(); - let _ = server.stop(false); + let _ = srv.stop(false); sys.stop(); h.join().unwrap().unwrap(); }