diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 333e7549..58471cf9 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -28,7 +28,6 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" -slab = "0.4" tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 7aaa57d0..a14842cf 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -7,18 +7,14 @@ use actix_rt::{ }; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; -use slab::Slab; use crate::server::Server; use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; -use crate::Token; struct ServerSocketInfo { - /// Beware this is the crate token for identify socket and should not be confused - /// with `mio::Token`. - token: Token, + token: usize, lst: MioListener, @@ -62,7 +58,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); @@ -81,7 +77,7 @@ struct Accept { srv: Server, next: usize, avail: Availability, - backpressure: bool, + paused: bool, } /// Array of u128 with every bit as marker for a worker handle's availability. @@ -95,23 +91,22 @@ impl Default for Availability { impl Availability { /// Check if any worker handle is available + #[inline(always)] fn available(&self) -> bool { self.0.iter().any(|a| *a != 0) } + /// Check if worker handle is available by index + #[inline(always)] + fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + /// Set worker handle available state by index. fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - }; + let (offset, idx) = Self::offset(idx); let off = 1 << idx as u128; if avail { @@ -128,6 +123,21 @@ impl Availability { self.set_available(handle.idx(), true); }) } + + /// Get offset and adjusted index of given worker handle index. + fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } } /// This function defines errors that are per-connection. Which basically @@ -147,7 +157,7 @@ impl Accept { pub(crate) fn start( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, srv: Server, handles: Vec, ) { @@ -158,10 +168,10 @@ impl Accept { .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let (mut accept, sockets) = + let (mut accept, mut sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); - accept.poll_with(sockets); + accept.poll_with(&mut sockets); }) .unwrap(); } @@ -169,26 +179,25 @@ impl Accept { fn new_with_sockets( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, srv: Server, - ) -> (Accept, Slab) { - let mut sockets = Slab::new(); - for (hnd_token, mut lst) in socks.into_iter() { - let entry = sockets.vacant_entry(); - let token = entry.key(); + ) -> (Accept, Vec) { + let sockets = socks + .into_iter() + .map(|(token, mut lst)| { + // Start listening for incoming connections + poll.registry() + .register(&mut lst, MioToken(token), Interest::READABLE) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - - entry.insert(ServerSocketInfo { - token: hnd_token, - lst, - timeout: None, - }); - } + ServerSocketInfo { + token, + lst, + timeout: None, + } + }) + .collect(); let mut avail = Availability::default(); @@ -202,19 +211,19 @@ impl Accept { srv, next: 0, avail, - backpressure: false, + paused: false, }; (accept, sockets) } - fn poll_with(&mut self, mut sockets: Slab) { + fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { let mut events = mio::Events::with_capacity(128); loop { if let Err(e) = self.poll.poll(&mut events, None) { match e.kind() { - std::io::ErrorKind::Interrupted => continue, + io::ErrorKind::Interrupted => continue, _ => panic!("Poll error: {}", e), } } @@ -222,122 +231,146 @@ impl Accept { for event in events.iter() { let token = event.token(); match token { - // This is a loop because interests for command from previous version was - // a loop that would try to drain the command channel. It's yet unknown - // if it's necessary/good practice to actively drain the waker queue. - WAKER_TOKEN => 'waker: loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker.guard(); - match guard.pop_front() { - // worker notify it becomes available. we may want to recover - // from backpressure. - Some(WakerInterest::WorkerAvailable(idx)) => { - drop(guard); - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(idx, true); - } - // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(handle.idx(), true); - self.handles.push(handle); - } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(&mut sockets) - } - Some(WakerInterest::Pause) => { - drop(guard); - self.deregister_all(&mut sockets); - } - Some(WakerInterest::Resume) => { - drop(guard); - sockets.iter_mut().for_each(|(token, info)| { - self.register_logged(token, info); - }); - } - Some(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); - } - // waker queue is drained - None => { - // Reset the WakerQueue before break so it does not grow infinitely - WakerQueue::reset(&mut guard); - break 'waker; - } + WAKER_TOKEN => { + let exit = self.handle_waker(sockets); + if exit { + info!("Accept is stopped."); + return; } - }, + } _ => { let token = usize::from(token); - self.accept(&mut sockets, token); + self.accept(sockets, token); } } } } } - fn process_timer(&self, sockets: &mut Slab) { + fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. + loop { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker.guard(); + match guard.pop_front() { + // worker notify it becomes available. + Some(WakerInterest::WorkerAvailable(idx)) => { + drop(guard); + + self.avail.set_available(idx, true); + + if !self.paused { + self.accept_all(sockets); + } + } + // a new worker thread is made and it's handle would be added to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); + + self.avail.set_available(handle.idx(), true); + self.handles.push(handle); + + if !self.paused { + self.accept_all(sockets); + } + } + // got timer interest and it's time to try register socket(s) again + Some(WakerInterest::Timer) => { + drop(guard); + + self.process_timer(sockets) + } + Some(WakerInterest::Pause) => { + drop(guard); + + self.paused = true; + + self.deregister_all(sockets); + } + Some(WakerInterest::Resume) => { + drop(guard); + + self.paused = false; + + sockets.iter_mut().for_each(|info| { + self.register_logged(info); + }); + + self.accept_all(sockets); + } + Some(WakerInterest::Stop) => { + self.deregister_all(sockets); + + return true; + } + // waker queue is drained + None => { + // Reset the WakerQueue before break so it does not grow infinitely + WakerQueue::reset(&mut guard); + + return false; + } + } + } + } + + fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { let now = Instant::now(); sockets .iter_mut() // Only sockets that had an associated timeout were deregistered. - .filter(|(_, info)| info.timeout.is_some()) - .for_each(|(token, info)| { + .filter(|info| info.timeout.is_some()) + .for_each(|info| { let inst = info.timeout.take().unwrap(); if now < inst { info.timeout = Some(inst); - } else if !self.backpressure { - self.register_logged(token, info); + } else if !self.paused { + self.register_logged(info); } - // Drop the timeout if server is in backpressure and socket timeout is expired. - // When server recovers from backpressure it will register all sockets without + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without // a timeout value so this socket register will be delayed till then. }); } #[cfg(not(target_os = "windows"))] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, MioToken(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) } #[cfg(target_os = "windows")] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { // On windows, calling register without deregister cause an error. // See https://github.com/actix/actix-web/issues/905 // Calling reregister seems to fix the issue. + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, mio::Token(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) .or_else(|_| { - self.poll.registry().reregister( - &mut info.lst, - mio::Token(token), - Interest::READABLE, - ) + self.poll + .registry() + .reregister(&mut info.lst, token, Interest::READABLE) }) } - fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { - match self.register(token, info) { + fn register_logged(&self, info: &mut ServerSocketInfo) { + match self.register(info) { Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), Err(e) => error!("Can not register server socket {}", e), } } - fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - self.poll.registry().deregister(&mut info.lst) - } - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.deregister(info) { + match self.poll.registry().deregister(&mut info.lst) { Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), Err(e) => { error!("Can not deregister server socket {}", e) @@ -345,7 +378,7 @@ impl Accept { } } - fn deregister_all(&self, sockets: &mut Slab) { + fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // // Every ServerSocketInfo with associate timeout will be skipped and it's timeout @@ -358,70 +391,23 @@ impl Accept { .iter_mut() // Take all timeout. // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|(_, info)| (info.timeout.take(), info)) + .map(|info| (info.timeout.take(), info)) // Socket info with a timeout is already deregistered so skip them. .filter(|(timeout, _)| timeout.is_none()) .for_each(|(_, info)| self.deregister_logged(info)); } - fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { - // Only operate when server is in a different backpressure than the given flag. - if self.backpressure != on { - self.backpressure = on; - sockets - .iter_mut() - // Only operate on sockets without associated timeout. - // Sockets with it should be handled by `accept` and `process_timer` methods. - // They are already deregistered or need to be reregister in the future. - .filter(|(_, info)| info.timeout.is_none()) - .for_each(|(token, info)| { - if on { - self.deregister_logged(info); - } else { - self.register_logged(token, info); - } - }); - } - } - - fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { - if self.backpressure { - // send_connection would remove fault worker from handles. - // worst case here is conn get dropped after all handles are gone. - while let Err(c) = self.send_connection(sockets, conn) { - conn = c - } - } else { - while self.avail.available() { - let next = self.next(); - let idx = next.idx(); - if next.available() { - self.avail.set_available(idx, true); - match self.send_connection(sockets, conn) { - Ok(_) => return, - Err(c) => conn = c, - } - } else { - self.avail.set_available(idx, false); - self.set_next(); - } - } - - // Sending Conn failed due to either all workers are in error or not available. - // Enter backpressure state and try again. - self.maybe_backpressure(sockets, true); - self.accept_one(sockets, conn); - } - } - // Send connection to worker and handle error. - fn send_connection( - &mut self, - sockets: &mut Slab, - conn: Conn, - ) -> Result<(), Conn> { - match self.next().send(conn) { + fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { + let next = self.next(); + match next.send(conn) { Ok(_) => { + // Increment counter of WorkerHandle. + // Set worker to unavailable with it hit max (Return false). + if !next.inc_counter() { + let idx = next.idx(); + self.avail.set_available(idx, false); + } self.set_next(); Ok(()) } @@ -432,7 +418,6 @@ impl Accept { if self.handles.is_empty() { error!("No workers"); - self.maybe_backpressure(sockets, true); // All workers are gone and Conn is nowhere to be sent. // Treat this situation as Ok and drop Conn. return Ok(()); @@ -445,19 +430,38 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut Slab, token: usize) { + fn accept_one(&mut self, mut conn: Conn) { loop { - let info = sockets - .get_mut(token) - .expect("ServerSocketInfo is removed from Slab"); + let next = self.next(); + let idx = next.idx(); + + if self.avail.get_available(idx) { + match self.send_connection(conn) { + Ok(_) => return, + Err(c) => conn = c, + } + } else { + self.avail.set_available(idx, false); + self.set_next(); + + if !self.avail.available() { + while let Err(c) = self.send_connection(conn) { + conn = c; + } + return; + } + } + } + } + + fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { + while self.avail.available() { + let info = &mut sockets[token]; match info.lst.accept() { Ok(io) => { - let msg = Conn { - io, - token: info.token, - }; - self.accept_one(sockets, msg); + let conn = Conn { io, token }; + self.accept_one(conn); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, @@ -485,11 +489,22 @@ impl Accept { } } + fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { + sockets + .iter_mut() + .map(|info| info.token) + .collect::>() + .into_iter() + .for_each(|idx| self.accept(sockets, idx)) + } + + #[inline(always)] fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] } /// Set next worker handle that would accept connection. + #[inline(always)] fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 932e8f83..e84a887d 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -8,30 +8,29 @@ use std::{ use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use log::{error, info}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + oneshot, +}; use crate::accept::AcceptLoop; +use crate::join_all; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ - ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept, - WorkerHandleServer, -}; -use crate::{join_all, Token}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; /// Server builder pub struct ServerBuilder { threads: usize, - token: Token, + token: usize, backlog: u32, handles: Vec<(usize, WorkerHandleServer)>, services: Vec>, - sockets: Vec<(Token, String, MioListener)>, + sockets: Vec<(usize, String, MioListener)>, accept: AcceptLoop, exit: bool, no_signals: bool, @@ -55,7 +54,7 @@ impl ServerBuilder { ServerBuilder { threads: num_cpus::get(), - token: Token::default(), + token: 0, handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), @@ -157,7 +156,7 @@ impl ServerBuilder { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -206,7 +205,7 @@ impl ServerBuilder { { use std::net::{IpAddr, Ipv4Addr}; lst.set_nonblocking(true)?; - let token = self.token.next(); + let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); self.services.push(StreamNewService::create( name.as_ref().to_string(), @@ -232,7 +231,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let addr = lst.local_addr()?; - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -291,12 +290,11 @@ impl ServerBuilder { fn start_worker( &self, idx: usize, - waker: WakerQueue, + waker_queue: WakerQueue, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let avail = WorkerAvailability::new(idx, waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, avail, self.worker_config) + ServerWorker::start(idx, services, waker_queue, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { @@ -410,6 +408,12 @@ impl ServerBuilder { } } } + + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } } impl Future for ServerBuilder { diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index cf484f10..b2117191 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -26,28 +26,6 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Socket ID token -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub(crate) struct Token(usize); - -impl Default for Token { - fn default() -> Self { - Self::new() - } -} - -impl Token { - fn new() -> Self { - Self(0) - } - - pub(crate) fn next(&mut self) -> Token { - let token = Token(self.0); - self.0 += 1; - token - } -} - /// Start server building process pub fn new() -> ServerBuilder { ServerBuilder::default() diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index d0eea966..28ffb4f1 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,15 +3,12 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; -use actix_utils::{ - counter::CounterGuard, - future::{ready, Ready}, -}; +use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::Token; +use crate::worker::WorkerCounterGuard; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -20,16 +17,16 @@ pub trait ServiceFactory: Send + Clone + 'static { } pub(crate) trait InternalServiceFactory: Send { - fn name(&self, token: Token) -> &str; + fn name(&self, token: usize) -> &str; fn clone_factory(&self) -> Box; - fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } pub(crate) type BoxedServerService = Box< dyn Service< - (CounterGuard, MioStream), + (WorkerCounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -50,7 +47,7 @@ impl StreamService { } } -impl Service<(CounterGuard, MioStream)> for StreamService +impl Service<(WorkerCounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -65,7 +62,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); @@ -86,7 +83,7 @@ where pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, - token: Token, + token: usize, addr: SocketAddr, _t: PhantomData, } @@ -98,7 +95,7 @@ where { pub(crate) fn create( name: String, - token: Token, + token: usize, inner: F, addr: SocketAddr, ) -> Box { @@ -117,7 +114,7 @@ where F: ServiceFactory, Io: FromStream + Send + 'static, { - fn name(&self, _: Token) -> &str { + fn name(&self, _: usize) -> &str { &self.name } @@ -131,7 +128,7 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 3d799370..79f15b16 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -2,8 +2,9 @@ use std::{ future::Future, mem, pin::Pin, + rc::Rc, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, task::{Context, Poll}, @@ -15,7 +16,6 @@ use actix_rt::{ time::{sleep, Instant, Sleep}, Arbiter, }; -use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; use tokio::sync::{ @@ -23,10 +23,10 @@ use tokio::sync::{ oneshot, }; +use crate::join_all; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::{join_all, Token}; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. @@ -38,35 +38,131 @@ pub(crate) struct Stop { #[derive(Debug)] pub(crate) struct Conn { pub io: MioStream, - pub token: Token, + pub token: usize, } fn handle_pair( idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let accept = WorkerHandleAccept { tx: tx1, avail }; + let accept = WorkerHandleAccept { + idx, + tx: tx1, + counter, + }; let server = WorkerHandleServer { idx, tx: tx2 }; (accept, server) } +/// counter: Arc field is owned by `Accept` thread and `ServerWorker` thread. +/// +/// `Accept` would increment the counter and `ServerWorker` would decrement it. +/// +/// # Atomic Ordering: +/// +/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state. +/// It lazily increment counter after successful dispatching new work to `ServerWorker`. +/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as +/// unable to accept any work. +/// +/// `ServerWorker` always decrement the counter when every work received from `Accept` is done. +/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept` +/// and notify it to update cached `Availability` again to mark worker as able to accept work again. +/// +/// Hense a wake up would only happen after `Accept` increment it to limit. +/// And a decrement to limit always wake up `Accept`. +#[derive(Clone)] +pub(crate) struct Counter { + counter: Arc, + limit: usize, +} + +impl Counter { + pub(crate) fn new(limit: usize) -> Self { + Self { + counter: Arc::new(AtomicUsize::new(1)), + limit, + } + } + + /// Increment counter by 1 and return true when hitting limit + #[inline(always)] + pub(crate) fn inc(&self) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed) != self.limit + } + + /// Decrement counter by 1 and return true if crossing limit. + #[inline(always)] + pub(crate) fn dec(&self) -> bool { + self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit + } + + pub(crate) fn total(&self) -> usize { + self.counter.load(Ordering::SeqCst) - 1 + } +} + +pub(crate) struct WorkerCounter { + idx: usize, + inner: Rc<(WakerQueue, Counter)>, +} + +impl Clone for WorkerCounter { + fn clone(&self) -> Self { + Self { + idx: self.idx, + inner: self.inner.clone(), + } + } +} + +impl WorkerCounter { + pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + Self { + idx, + inner: Rc::new((waker_queue, counter)), + } + } + + #[inline(always)] + pub(crate) fn guard(&self) -> WorkerCounterGuard { + WorkerCounterGuard(self.clone()) + } + + fn total(&self) -> usize { + self.inner.1.total() + } +} + +pub(crate) struct WorkerCounterGuard(WorkerCounter); + +impl Drop for WorkerCounterGuard { + fn drop(&mut self) { + let (waker_queue, counter) = &*self.0.inner; + if counter.dec() { + waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + } + } +} + /// Handle to worker that can send connection message to worker and share the /// availability of worker to other thread. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { + idx: usize, tx: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, } impl WorkerHandleAccept { #[inline(always)] pub(crate) fn idx(&self) -> usize { - self.avail.idx + self.idx } #[inline(always)] @@ -75,8 +171,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn available(&self) -> bool { - self.avail.available() + pub(crate) fn inc_counter(&self) -> bool { + self.counter.inc() } } @@ -96,40 +192,6 @@ impl WorkerHandleServer { } } -#[derive(Clone)] -pub(crate) struct WorkerAvailability { - idx: usize, - waker: WakerQueue, - available: Arc, -} - -impl WorkerAvailability { - pub fn new(idx: usize, waker: WakerQueue) -> Self { - WorkerAvailability { - idx, - waker, - available: Arc::new(AtomicBool::new(false)), - } - } - - #[inline(always)] - pub fn available(&self) -> bool { - self.available.load(Ordering::Acquire) - } - - pub fn set(&self, val: bool) { - // Ordering: - // - // There could be multiple set calls happen in one ::poll. - // Order is important between them. - let old = self.available.swap(val, Ordering::AcqRel); - // Notify the accept on switched to available. - if !old && val { - self.waker.wake(WakerInterest::WorkerAvailable(self.idx)); - } - } -} - /// Service worker. /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. @@ -138,9 +200,8 @@ pub(crate) struct ServerWorker { // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, + counter: WorkerCounter, services: Box<[WorkerService]>, - availability: WorkerAvailability, - conns: Counter, factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, @@ -207,15 +268,15 @@ impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, - availability: WorkerAvailability, + waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { - assert!(!availability.available()); - let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); - let avail = availability.clone(); + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. Arbiter::with_tokio_rt(move || { @@ -245,7 +306,7 @@ impl ServerWorker { Ok(res) => res .into_iter() .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token.0, services.len()); + assert_eq!(token, services.len()); services.push(WorkerService { factory, service, @@ -266,8 +327,7 @@ impl ServerWorker { rx, rx2, services, - availability, - conns: Counter::new(config.max_concurrent_connections), + counter: WorkerCounter::new(idx, waker_queue, counter_clone), factories: factories.into_boxed_slice(), state: Default::default(), shutdown_timeout: config.shutdown_timeout, @@ -275,16 +335,16 @@ impl ServerWorker { }); }); - handle_pair(idx, tx1, tx2, avail) + handle_pair(idx, tx1, tx2, counter) } - fn restart_service(&mut self, token: Token, factory_id: usize) { + fn restart_service(&mut self, idx: usize, factory_id: usize) { let factory = &self.factories[factory_id]; - trace!("Service {:?} failed, restarting", factory.name(token)); - self.services[token.0].status = WorkerServiceStatus::Restarting; + trace!("Service {:?} failed, restarting", factory.name(idx)); + self.services[idx].status = WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting(Restart { factory_id, - token, + token: idx, fut: factory.create(), }); } @@ -302,8 +362,8 @@ impl ServerWorker { }); } - fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = self.conns.available(cx); + fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { + let mut ready = true; for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable @@ -313,7 +373,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -324,7 +384,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -332,10 +392,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((Token(idx), srv.factory)); + return Err((idx, srv.factory)); } } } @@ -354,8 +414,8 @@ enum WorkerState { struct Restart { factory_id: usize, - token: Token, - fut: LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>, + token: usize, + fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, } // Shutdown keep states necessary for server shutdown: @@ -376,10 +436,6 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - // Set availability to true so if accept try to send connection to this worker - // it would find worker is gone and remove it. - // This is helpful when worker is dropped unexpected. - self.availability.set(true); // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); } @@ -394,8 +450,7 @@ impl Future for ServerWorker { // `StopWorker` message handler if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { - this.availability.set(false); - let num = this.conns.total(); + let num = this.counter.total(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = tx.send(true); @@ -422,7 +477,6 @@ impl Future for ServerWorker { WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { this.state = WorkerState::Available; - this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, @@ -450,7 +504,7 @@ impl Future for ServerWorker { this.factories[factory_id].name(token) ); - this.services[token.0].created(service); + this.services[token].created(service); this.state = WorkerState::Unavailable; self.poll(cx) @@ -459,7 +513,7 @@ impl Future for ServerWorker { // Wait for 1 second. ready!(shutdown.timer.as_mut().poll(cx)); - if this.conns.total() == 0 { + if this.counter.total() == 0 { // Graceful shutdown. if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { let _ = shutdown.tx.send(true); @@ -484,22 +538,20 @@ impl Future for ServerWorker { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - this.availability.set(false); this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { this.restart_service(token, idx); - this.availability.set(false); return self.poll(cx); } } + // handle incoming io stream match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { - // handle incoming io stream Some(msg) => { - let guard = this.conns.get(); - let _ = this.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.counter.guard(); + let _ = this.services[msg.token].service.call((guard, msg.io)); } None => return Poll::Ready(()), };