From b7202db8fd989cd8fe534b2f2c74c8205f867094 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 29 Dec 2020 07:44:53 +0800 Subject: [PATCH] update actix-server and actix-testing to tokio 1.0 (#239) --- actix-server/CHANGES.md | 6 + actix-server/Cargo.toml | 26 +- actix-server/src/accept.rs | 500 +++++++++++++----------------- actix-server/src/builder.rs | 207 ++++++------- actix-server/src/config.rs | 61 ++-- actix-server/src/lib.rs | 102 ++++++ actix-server/src/server.rs | 32 +- actix-server/src/service.rs | 43 ++- actix-server/src/signals.rs | 86 +++-- actix-server/src/socket.rs | 326 +++++++++++-------- actix-server/src/waker_queue.rs | 89 ++++++ actix-server/src/worker.rs | 335 +++++++++----------- actix-server/tests/test_server.rs | 27 +- actix-testing/Cargo.toml | 4 +- actix-testing/src/lib.rs | 5 +- 15 files changed, 1008 insertions(+), 841 deletions(-) create mode 100644 actix-server/src/waker_queue.rs diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 5e28fe0b..15d7d596 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -3,6 +3,12 @@ ## Unreleased - 2020-xx-xx * Added explicit info log message on accept queue pause. [#215] * Prevent double registration of sockets when back-pressure is resolved. [#223] +* Update `mio` dependency to `0.7.3`. +* Remove `socket2` dependency. +* `ServerBuilder::backlog` would accept `u32` instead of `i32`. +* Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` for wake up the `Accept`'s `Poll`. +* Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows). +* Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. [#215]: https://github.com/actix/actix-net/pull/215 [#223]: https://github.com/actix/actix-net/pull/223 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 1a67f61c..34fb3775 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -20,25 +20,21 @@ path = "src/lib.rs" default = [] [dependencies] -actix-service = "1.0.6" -actix-rt = "1.1.1" -actix-codec = "0.3.0" -actix-utils = "2.0.0" +actix-codec = "0.4.0-beta.1" +actix-rt = "2.0.0-beta.1" +actix-service = "2.0.0-beta.1" +actix-utils = "3.0.0-beta.1" +futures-core = { version = "0.3.7", default-features = false } log = "0.4" +mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" -mio = "0.6.19" -socket2 = "0.3" -futures-channel = { version = "0.3.4", default-features = false } -futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } slab = "0.4" - -# unix domain sockets -# FIXME: Remove it and use mio own uds feature once mio 0.7 is released -mio-uds = { version = "0.6.7" } +tokio = { version = "1", features = ["sync"] } [dev-dependencies] -bytes = "0.5" -env_logger = "0.7" actix-testing = "1.0.0" -tokio = { version = "0.2", features = ["io-util"] } +bytes = "1" +env_logger = "0.7" +futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } +tokio = { version = "1", features = ["io-util"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index bef175d8..bf895f06 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,120 +1,86 @@ -use std::sync::mpsc as sync_mpsc; use std::time::Duration; use std::{io, thread}; -use actix_rt::time::{delay_until, Instant}; +use actix_rt::time::{sleep_until, Instant}; use actix_rt::System; use log::{error, info}; +use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; use crate::server::Server; -use crate::socket::{SocketAddr, SocketListener, StdListener}; -use crate::worker::{Conn, WorkerClient}; +use crate::socket::{MioListener, SocketAddr}; +use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; +use crate::worker::{Conn, WorkerHandle}; use crate::Token; -pub(crate) enum Command { - Pause, - Resume, - Stop, - Worker(WorkerClient), -} - struct ServerSocketInfo { + // addr for socket. mainly used for logging. addr: SocketAddr, + // be ware this is the crate token for identify socket and should not be confused with + // mio::Token token: Token, - sock: SocketListener, + lst: MioListener, + // timeout is used to mark the deadline when this socket's listener should be registered again + // after an error. timeout: Option, } -#[derive(Clone)] -pub(crate) struct AcceptNotify(mio::SetReadiness); - -impl AcceptNotify { - pub(crate) fn new(ready: mio::SetReadiness) -> Self { - AcceptNotify(ready) - } - - pub(crate) fn notify(&self) { - let _ = self.0.set_readiness(mio::Ready::readable()); - } -} - -impl Default for AcceptNotify { - fn default() -> Self { - AcceptNotify::new(mio::Registration::new2().1) - } -} - +/// Accept loop would live with `ServerBuilder`. +/// +/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to +/// `Accept` and `Worker`. +/// +/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. pub(crate) struct AcceptLoop { - cmd_reg: Option, - cmd_ready: mio::SetReadiness, - notify_reg: Option, - notify_ready: mio::SetReadiness, - tx: sync_mpsc::Sender, - rx: Option>, srv: Option, + poll: Option, + waker: WakerQueue, } impl AcceptLoop { - pub fn new(srv: Server) -> AcceptLoop { - let (tx, rx) = sync_mpsc::channel(); - let (cmd_reg, cmd_ready) = mio::Registration::new2(); - let (notify_reg, notify_ready) = mio::Registration::new2(); + pub fn new(srv: Server) -> Self { + let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); + let waker = WakerQueue::new(poll.registry()) + .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - AcceptLoop { - tx, - cmd_ready, - cmd_reg: Some(cmd_reg), - notify_ready, - notify_reg: Some(notify_reg), - rx: Some(rx), + Self { srv: Some(srv), + poll: Some(poll), + waker, } } - pub fn send(&self, msg: Command) { - let _ = self.tx.send(msg); - let _ = self.cmd_ready.set_readiness(mio::Ready::readable()); + pub(crate) fn waker_owned(&self) -> WakerQueue { + self.waker.clone() } - pub fn get_notify(&self) -> AcceptNotify { - AcceptNotify::new(self.notify_ready.clone()) + pub fn wake(&self, i: WakerInterest) { + self.waker.wake(i); } pub(crate) fn start( &mut self, - socks: Vec<(Token, StdListener)>, - workers: Vec, + socks: Vec<(Token, MioListener)>, + handles: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); + let poll = self.poll.take().unwrap(); + let waker = self.waker.clone(); - Accept::start( - self.rx.take().expect("Can not re-use AcceptInfo"), - self.cmd_reg.take().expect("Can not re-use AcceptInfo"), - self.notify_reg.take().expect("Can not re-use AcceptInfo"), - socks, - srv, - workers, - ); + Accept::start(poll, waker, socks, srv, handles); } } +/// poll instance of the server. struct Accept { - poll: mio::Poll, - rx: sync_mpsc::Receiver, - sockets: Slab, - workers: Vec, + poll: Poll, + waker: WakerQueue, + handles: Vec, srv: Server, - timer: (mio::Registration, mio::SetReadiness), next: usize, backpressure: bool, } -const DELTA: usize = 100; -const CMD: mio::Token = mio::Token(0); -const TIMER: mio::Token = mio::Token(1); -const NOTIFY: mio::Token = mio::Token(2); - /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means /// next connection might be ready to be accepted. @@ -129,326 +95,290 @@ fn connection_error(e: &io::Error) -> bool { } impl Accept { - #![allow(clippy::too_many_arguments)] pub(crate) fn start( - rx: sync_mpsc::Receiver, - cmd_reg: mio::Registration, - notify_reg: mio::Registration, - socks: Vec<(Token, StdListener)>, + poll: Poll, + waker: WakerQueue, + socks: Vec<(Token, MioListener)>, srv: Server, - workers: Vec, + handles: Vec, ) { + // Accept runs in its own thread and would want to spawn additional futures to current + // actix system. let sys = System::current(); - - // start accept thread - let _ = thread::Builder::new() + thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let mut accept = Accept::new(rx, socks, workers, srv); - - // Start listening for incoming commands - if let Err(err) = accept.poll.register( - &cmd_reg, - CMD, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register Registration: {}", err); - } - - // Start listening for notify updates - if let Err(err) = accept.poll.register( - ¬ify_reg, - NOTIFY, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register Registration: {}", err); - } - - accept.poll(); - }); + let (mut accept, sockets) = + Accept::new_with_sockets(poll, waker, socks, handles, srv); + accept.poll_with(sockets); + }) + .unwrap(); } - fn new( - rx: sync_mpsc::Receiver, - socks: Vec<(Token, StdListener)>, - workers: Vec, + fn new_with_sockets( + poll: Poll, + waker: WakerQueue, + socks: Vec<(Token, MioListener)>, + handles: Vec, srv: Server, - ) -> Accept { - // Create a poll instance - let poll = match mio::Poll::new() { - Ok(poll) => poll, - Err(err) => panic!("Can not create mio::Poll: {}", err), - }; - - // Start accept + ) -> (Accept, Slab) { let mut sockets = Slab::new(); - for (hnd_token, lst) in socks.into_iter() { + for (hnd_token, mut lst) in socks.into_iter() { let addr = lst.local_addr(); - let server = lst.into_listener(); let entry = sockets.vacant_entry(); let token = entry.key(); // Start listening for incoming connections - if let Err(err) = poll.register( - &server, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register io: {}", err); - } + poll.registry() + .register(&mut lst, MioToken(token), Interest::READABLE) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); entry.insert(ServerSocketInfo { addr, token: hnd_token, - sock: server, + lst, timeout: None, }); } - // Timer - let (tm, tmr) = mio::Registration::new2(); - if let Err(err) = - poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register Registration: {}", err); - } - - Accept { + let accept = Accept { poll, - rx, - sockets, - workers, + waker, + handles, srv, next: 0, - timer: (tm, tmr), backpressure: false, - } + }; + + (accept, sockets) } - fn poll(&mut self) { - // Create storage for events + fn poll_with(&mut self, mut sockets: Slab) { let mut events = mio::Events::with_capacity(128); loop { - if let Err(err) = self.poll.poll(&mut events, None) { - panic!("Poll error: {}", err); - } + self.poll + .poll(&mut events, None) + .unwrap_or_else(|e| panic!("Poll error: {}", e)); for event in events.iter() { let token = event.token(); match token { - CMD => { - if !self.process_cmd() { - return; + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. + WAKER_TOKEN => 'waker: loop { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker.guard(); + match guard.pop_front() { + // worker notify it becomes available. we may want to recover + // from backpressure. + Some(WakerInterest::WorkerAvailable) => { + drop(guard); + self.maybe_backpressure(&mut sockets, false); + } + // a new worker thread is made and it's handle would be added + // to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); + // maybe we want to recover from a backpressure. + self.maybe_backpressure(&mut sockets, false); + self.handles.push(handle); + } + // got timer interest and it's time to try register socket(s) + // again. + Some(WakerInterest::Timer) => { + drop(guard); + self.process_timer(&mut sockets) + } + Some(WakerInterest::Pause) => { + drop(guard); + sockets.iter_mut().for_each(|(_, info)| { + match self.deregister(info) { + Ok(_) => info!( + "Paused accepting connections on {}", + info.addr + ), + Err(e) => { + error!("Can not deregister server socket {}", e) + } + } + }); + } + Some(WakerInterest::Resume) => { + drop(guard); + sockets.iter_mut().for_each(|(token, info)| { + self.register_logged(token, info); + }); + } + Some(WakerInterest::Stop) => { + return self.deregister_all(&mut sockets); + } + // waker queue is drained. + None => { + // Reset the WakerQueue before break so it does not grow + // infinitely. + WakerQueue::reset(&mut guard); + break 'waker; + } } - } - TIMER => self.process_timer(), - NOTIFY => self.backpressure(false), + }, _ => { let token = usize::from(token); - if token < DELTA { - continue; - } - self.accept(token - DELTA); + self.accept(&mut sockets, token); } } } } } - fn process_timer(&mut self) { + fn process_timer(&self, sockets: &mut Slab) { let now = Instant::now(); - for (token, info) in self.sockets.iter_mut() { + sockets.iter_mut().for_each(|(token, info)| { + // only the ServerSocketInfo have an associate timeout value was de registered. if let Some(inst) = info.timeout.take() { if now > inst { - if let Err(err) = self.poll.register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - error!("Can not register server socket {}", err); - } else { - info!("Resume accepting connections on {}", info.addr); - } + self.register_logged(token, info); } else { info.timeout = Some(inst); } } - } - } - - fn process_cmd(&mut self) -> bool { - loop { - match self.rx.try_recv() { - Ok(cmd) => match cmd { - Command::Pause => { - for (_, info) in self.sockets.iter_mut() { - if let Err(err) = self.poll.deregister(&info.sock) { - error!("Can not deregister server socket {}", err); - } else { - info!("Paused accepting connections on {}", info.addr); - } - } - } - Command::Resume => { - for (token, info) in self.sockets.iter() { - if let Err(err) = self.register(token, info) { - error!("Can not resume socket accept process: {}", err); - } else { - info!( - "Accepting connections on {} has been resumed", - info.addr - ); - } - } - } - Command::Stop => { - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - } - return false; - } - Command::Worker(worker) => { - self.backpressure(false); - self.workers.push(worker); - } - }, - Err(err) => match err { - sync_mpsc::TryRecvError::Empty => break, - sync_mpsc::TryRecvError::Disconnected => { - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - } - return false; - } - }, - } - } - true + }); } #[cfg(not(target_os = "windows"))] - fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> { - self.poll.register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) + fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + self.poll + .registry() + .register(&mut info.lst, MioToken(token), Interest::READABLE) } #[cfg(target_os = "windows")] - fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> { + fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { // On windows, calling register without deregister cause an error. // See https://github.com/actix/actix-web/issues/905 // Calling reregister seems to fix the issue. self.poll - .register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) + .registry() + .register(&mut info.lst, mio::Token(token), Interest::READABLE) .or_else(|_| { - self.poll.reregister( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), + self.poll.registry().reregister( + &mut info.lst, + mio::Token(token), + Interest::READABLE, ) }) } - fn backpressure(&mut self, on: bool) { + fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { + match self.register(token, info) { + Ok(_) => info!("Resume accepting connections on {}", info.addr), + Err(e) => error!("Can not register server socket {}", e), + } + } + + fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> { + self.poll.registry().deregister(&mut info.lst) + } + + fn deregister_all(&self, sockets: &mut Slab) { + sockets.iter_mut().for_each(|(_, info)| { + info!("Accepting connections on {} has been paused", info.addr); + let _ = self.deregister(info); + }); + } + + fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { if self.backpressure { if !on { self.backpressure = false; - for (token, info) in self.sockets.iter() { + for (token, info) in sockets.iter_mut() { if info.timeout.is_some() { // socket will attempt to re-register itself when its timeout completes continue; } - - if let Err(err) = self.register(token, info) { - error!("Can not resume socket accept process: {}", err); - } else { - info!("Accepting connections on {} has been resumed", info.addr); - } + self.register_logged(token, info); } } } else if on { self.backpressure = true; - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - info!("Accepting connections on {} has been paused", info.addr); - } + self.deregister_all(sockets); } } - fn accept_one(&mut self, mut msg: Conn) { + fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) { if self.backpressure { - while !self.workers.is_empty() { - match self.workers[self.next].send(msg) { - Ok(_) => (), + while !self.handles.is_empty() { + match self.handles[self.next].send(msg) { + Ok(_) => { + self.set_next(); + break; + } Err(tmp) => { - self.srv.worker_faulted(self.workers[self.next].idx); + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker. + self.srv.worker_faulted(self.handles[self.next].idx); msg = tmp; - self.workers.swap_remove(self.next); - if self.workers.is_empty() { + self.handles.swap_remove(self.next); + if self.handles.is_empty() { error!("No workers"); return; - } else if self.workers.len() <= self.next { + } else if self.handles.len() <= self.next { self.next = 0; } continue; } } - self.next = (self.next + 1) % self.workers.len(); - break; } } else { let mut idx = 0; - while idx < self.workers.len() { + while idx < self.handles.len() { idx += 1; - if self.workers[self.next].available() { - match self.workers[self.next].send(msg) { + if self.handles[self.next].available() { + match self.handles[self.next].send(msg) { Ok(_) => { - self.next = (self.next + 1) % self.workers.len(); + self.set_next(); return; } + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker and enter backpressure if necessary. Err(tmp) => { - self.srv.worker_faulted(self.workers[self.next].idx); + self.srv.worker_faulted(self.handles[self.next].idx); msg = tmp; - self.workers.swap_remove(self.next); - if self.workers.is_empty() { + self.handles.swap_remove(self.next); + if self.handles.is_empty() { error!("No workers"); - self.backpressure(true); + self.maybe_backpressure(sockets, true); return; - } else if self.workers.len() <= self.next { + } else if self.handles.len() <= self.next { self.next = 0; } continue; } } } - self.next = (self.next + 1) % self.workers.len(); + self.set_next(); } // enable backpressure - self.backpressure(true); - self.accept_one(msg); + self.maybe_backpressure(sockets, true); + self.accept_one(sockets, msg); } } - fn accept(&mut self, token: usize) { + // set next worker handle that would accept work. + fn set_next(&mut self) { + self.next = (self.next + 1) % self.handles.len(); + } + + fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { - let msg = if let Some(info) = self.sockets.get_mut(token) { - match info.sock.accept() { + let msg = if let Some(info) = sockets.get_mut(token) { + match info.lst.accept() { Ok(Some((io, addr))) => Conn { io, token: info.token, @@ -458,18 +388,22 @@ impl Accept { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, Err(e) => { + // deregister listener temporary error!("Error accepting connection: {}", e); - if let Err(err) = self.poll.deregister(&info.sock) { + if let Err(err) = self.deregister(info) { error!("Can not deregister server socket {}", err); } - // sleep after error + // sleep after error. write the timeout to socket info as later the poll + // would need it mark which socket and when it's listener should be + // registered. info.timeout = Some(Instant::now() + Duration::from_millis(500)); - let r = self.timer.1.clone(); + // after the sleep a Timer interest is sent to Accept Poll + let waker = self.waker.clone(); System::current().arbiter().send(Box::pin(async move { - delay_until(Instant::now() + Duration::from_millis(510)).await; - let _ = r.set_readiness(mio::Ready::readable()); + sleep_until(Instant::now() + Duration::from_millis(510)).await; + waker.wake(WakerInterest::Timer); })); return; } @@ -478,7 +412,7 @@ impl Accept { return; }; - self.accept_one(msg); + self.accept_one(sockets, msg); } } } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 64a45df9..51dd0eda 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,36 +1,35 @@ +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use std::{io, mem, net}; +use std::{io, mem}; use actix_rt::net::TcpStream; -use actix_rt::time::{delay_until, Instant}; +use actix_rt::time::{sleep_until, Instant}; use actix_rt::{spawn, System}; -use futures_channel::mpsc::{unbounded, UnboundedReceiver}; -use futures_channel::oneshot; -use futures_util::future::ready; -use futures_util::stream::FuturesUnordered; -use futures_util::{future::Future, ready, stream::Stream, FutureExt, StreamExt}; use log::{error, info}; -use socket2::{Domain, Protocol, Socket, Type}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::sync::oneshot; -use crate::accept::{AcceptLoop, AcceptNotify, Command}; +use crate::accept::AcceptLoop; use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; -use crate::socket::StdListener; -use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; -use crate::Token; +use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; +use crate::socket::{MioTcpListener, MioTcpSocket}; +use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle}; +use crate::{join_all, Token}; /// Server builder pub struct ServerBuilder { threads: usize, token: Token, - backlog: i32, - workers: Vec<(usize, WorkerClient)>, + backlog: u32, + handles: Vec<(usize, WorkerHandle)>, services: Vec>, - sockets: Vec<(Token, String, StdListener)>, + sockets: Vec<(Token, String, MioListener)>, accept: AcceptLoop, exit: bool, shutdown_timeout: Duration, @@ -49,13 +48,13 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded(); + let (tx, rx) = unbounded_channel(); let server = Server::new(tx); ServerBuilder { threads: num_cpus::get(), - token: Token(0), - workers: Vec::new(), + token: Token::default(), + handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), accept: AcceptLoop::new(server.clone()), @@ -89,7 +88,7 @@ impl ServerBuilder { /// Generally set in the 64-2048 range. Default value is 2048. /// /// This method should be called before `bind()` method call. - pub fn backlog(mut self, num: i32) -> Self { + pub fn backlog(mut self, num: u32) -> Self { self.backlog = num; self } @@ -147,7 +146,7 @@ impl ServerBuilder { for (name, lst) in cfg.services { let token = self.token.next(); srv.stream(token, name.clone(), lst.local_addr()?); - self.sockets.push((token, name, StdListener::Tcp(lst))); + self.sockets.push((token, name, MioListener::Tcp(lst))); } self.services.push(Box::new(srv)); } @@ -160,7 +159,7 @@ impl ServerBuilder { pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where F: ServiceFactory, - U: net::ToSocketAddrs, + U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; @@ -173,12 +172,12 @@ impl ServerBuilder { lst.local_addr()?, )); self.sockets - .push((token, name.as_ref().to_string(), StdListener::Tcp(lst))); + .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } Ok(self) } - #[cfg(all(unix))] + #[cfg(unix)] /// Add new unix domain service to the server. pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where @@ -186,8 +185,6 @@ impl ServerBuilder { N: AsRef, U: AsRef, { - use std::os::unix::net::UnixListener; - // The path must not exist when we try to bind. // Try to remove it to avoid bind error. if let Err(e) = std::fs::remove_file(addr.as_ref()) { @@ -197,26 +194,27 @@ impl ServerBuilder { } } - let lst = UnixListener::bind(addr)?; + let lst = crate::socket::StdUnixListener::bind(addr)?; self.listen_uds(name, lst, factory) } - #[cfg(all(unix))] + #[cfg(unix)] /// Add new unix domain service to the server. /// Useful when running as a systemd service and /// a socket FD can be acquired using the systemd crate. pub fn listen_uds>( mut self, name: N, - lst: std::os::unix::net::UnixListener, + lst: crate::socket::StdUnixListener, factory: F, ) -> io::Result where F: ServiceFactory, { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::net::{IpAddr, Ipv4Addr}; + lst.set_nonblocking(true)?; let token = self.token.next(); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -224,7 +222,7 @@ impl ServerBuilder { addr, )); self.sockets - .push((token, name.as_ref().to_string(), StdListener::Uds(lst))); + .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } @@ -232,21 +230,25 @@ impl ServerBuilder { pub fn listen>( mut self, name: N, - lst: net::TcpListener, + lst: StdTcpListener, factory: F, ) -> io::Result where F: ServiceFactory, { + lst.set_nonblocking(true)?; + let addr = lst.local_addr()?; + let token = self.token.next(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, factory, - lst.local_addr()?, + addr, )); + self.sockets - .push((token, name.as_ref().to_string(), StdListener::Tcp(lst))); + .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } @@ -263,12 +265,12 @@ impl ServerBuilder { info!("Starting {} workers", self.threads); // start workers - let workers = (0..self.threads) + let handles = (0..self.threads) .map(|idx| { - let worker = self.start_worker(idx, self.accept.get_notify()); - self.workers.push((idx, worker.clone())); + let handle = self.start_worker(idx, self.accept.waker_owned()); + self.handles.push((idx, handle.clone())); - worker + handle }) .collect(); @@ -281,7 +283,7 @@ impl ServerBuilder { .into_iter() .map(|t| (t.0, t.2)) .collect(), - workers, + handles, ); // handle signals @@ -296,10 +298,9 @@ impl ServerBuilder { } } - fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { - let avail = WorkerAvailability::new(notify); - let services: Vec> = - self.services.iter().map(|v| v.clone_factory()).collect(); + fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle { + let avail = WorkerAvailability::new(waker); + let services = self.services.iter().map(|v| v.clone_factory()).collect(); Worker::start(idx, services, avail, self.shutdown_timeout) } @@ -307,11 +308,11 @@ impl ServerBuilder { fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { - self.accept.send(Command::Pause); + self.accept.wake(WakerInterest::Pause); let _ = tx.send(()); } ServerCommand::Resume(tx) => { - self.accept.send(Command::Resume); + self.accept.wake(WakerInterest::Resume); let _ = tx.send(()); } ServerCommand::Signal(sig) => { @@ -355,50 +356,41 @@ impl ServerBuilder { let exit = self.exit; // stop accept thread - self.accept.send(Command::Stop); + self.accept.wake(WakerInterest::Stop); let notify = std::mem::take(&mut self.notify); // stop workers - if !self.workers.is_empty() && graceful { - spawn( - self.workers - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect::>() - .collect::>() - .then(move |_| { - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } - if exit { - spawn( - async { - delay_until( - Instant::now() + Duration::from_millis(300), - ) - .await; - System::current().stop(); - } - .boxed(), - ); - } - ready(()) - }), - ) + if !self.handles.is_empty() && graceful { + let iter = self + .handles + .iter() + .map(move |worker| worker.1.stop(graceful)) + .collect(); + + let fut = join_all(iter); + + spawn(async move { + let _ = fut.await; + if let Some(tx) = completion { + let _ = tx.send(()); + } + for tx in notify { + let _ = tx.send(()); + } + if exit { + spawn(async { + sleep_until(Instant::now() + Duration::from_millis(300)).await; + System::current().stop(); + }); + } + }) } else { // we need to stop system if server was spawned if self.exit { - spawn( - delay_until(Instant::now() + Duration::from_millis(300)).then( - |_| { - System::current().stop(); - ready(()) - }, - ), - ); + spawn(async { + sleep_until(Instant::now() + Duration::from_millis(300)).await; + System::current().stop(); + }); } if let Some(tx) = completion { let _ = tx.send(()); @@ -410,9 +402,9 @@ impl ServerBuilder { } ServerCommand::WorkerFaulted(idx) => { let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); + for i in 0..self.handles.len() { + if self.handles[i].0 == idx { + self.handles.swap_remove(i); found = true; break; } @@ -421,10 +413,10 @@ impl ServerBuilder { if found { error!("Worker has died {:?}, restarting", idx); - let mut new_idx = self.workers.len(); + let mut new_idx = self.handles.len(); 'found: loop { - for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { + for i in 0..self.handles.len() { + if self.handles[i].0 == new_idx { new_idx += 1; continue 'found; } @@ -432,9 +424,9 @@ impl ServerBuilder { break; } - let worker = self.start_worker(new_idx, self.accept.get_notify()); - self.workers.push((new_idx, worker.clone())); - self.accept.send(Command::Worker(worker)); + let handle = self.start_worker(new_idx, self.accept.waker_owned()); + self.handles.push((new_idx, handle.clone())); + self.accept.wake(WakerInterest::Worker(handle)); } } } @@ -446,20 +438,18 @@ impl Future for ServerBuilder { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(Pin::new(&mut self.cmd).poll_next(cx)) { - Some(it) => self.as_mut().get_mut().handle_cmd(it), - None => { - return Poll::Pending; - } + match Pin::new(&mut self.cmd).poll_recv(cx) { + Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), + _ => return Poll::Pending, } } } } -pub(super) fn bind_addr( +pub(super) fn bind_addr( addr: S, - backlog: i32, -) -> io::Result> { + backlog: u32, +) -> io::Result> { let mut err = None; let mut succ = false; let mut sockets = Vec::new(); @@ -487,14 +477,13 @@ pub(super) fn bind_addr( } } -fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result { - let domain = match addr { - net::SocketAddr::V4(_) => Domain::ipv4(), - net::SocketAddr::V6(_) => Domain::ipv6(), +fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result { + let socket = match addr { + StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?, + StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?, }; - let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?; - socket.set_reuse_address(true)?; - socket.bind(&addr.into())?; - socket.listen(backlog)?; - Ok(socket.into_tcp_listener()) + + socket.set_reuseaddr(true)?; + socket.bind(addr)?; + socket.listen(backlog) } diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index a1315a72..20270a2f 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; -use std::{fmt, io, net}; +use std::future::Future; +use std::{fmt, io}; use actix_rt::net::TcpStream; use actix_service::{ @@ -7,23 +8,23 @@ use actix_service::{ ServiceFactory as BaseServiceFactory, }; use actix_utils::counter::CounterGuard; -use futures_util::future::{ok, Future, FutureExt, LocalBoxFuture}; +use futures_core::future::LocalBoxFuture; use log::error; -use super::builder::bind_addr; -use super::service::{BoxedServerService, InternalServiceFactory, StreamService}; -use super::Token; -use crate::socket::StdStream; +use crate::builder::bind_addr; +use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; +use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; +use crate::{ready, Token}; pub struct ServiceConfig { - pub(crate) services: Vec<(String, net::TcpListener)>, + pub(crate) services: Vec<(String, MioTcpListener)>, pub(crate) apply: Option>, pub(crate) threads: usize, - pub(crate) backlog: i32, + pub(crate) backlog: u32, } impl ServiceConfig { - pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig { + pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig { ServiceConfig { threads, backlog, @@ -43,24 +44,20 @@ impl ServiceConfig { /// Add new service to server pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> where - U: net::ToSocketAddrs, + U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { - self.listen(name.as_ref(), lst); + self._listen(name.as_ref(), lst); } Ok(self) } /// Add new service to server - pub fn listen>(&mut self, name: N, lst: net::TcpListener) -> &mut Self { - if self.apply.is_none() { - self.apply = Some(Box::new(not_configured)); - } - self.services.push((name.as_ref().to_string(), lst)); - self + pub fn listen>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { + self._listen(name, MioTcpListener::from_std(lst)) } /// Register service configuration function. This function get called @@ -72,11 +69,19 @@ impl ServiceConfig { self.apply = Some(Box::new(f)); Ok(()) } + + fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { + if self.apply.is_none() { + self.apply = Some(Box::new(not_configured)); + } + self.services.push((name.as_ref().to_string(), lst)); + self + } } pub(super) struct ConfiguredService { rt: Box, - names: HashMap, + names: HashMap, topics: HashMap, services: Vec, } @@ -91,7 +96,7 @@ impl ConfiguredService { } } - pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) { + pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) { self.names.insert(token, (name.clone(), addr)); self.topics.insert(name, token); self.services.push(token); @@ -121,7 +126,7 @@ impl InternalServiceFactory for ConfiguredService { let tokens = self.services.clone(); // construct services - async move { + Box::pin(async move { let mut services = rt.services; // TODO: Proper error handling here for f in rt.onstart.into_iter() { @@ -146,14 +151,13 @@ impl InternalServiceFactory for ConfiguredService { token, Box::new(StreamService::new(fn_service(move |_: TcpStream| { error!("Service {:?} is not configured", name); - ok::<_, ()>(()) + ready::>(Ok(())) }))), )); }; } Ok(res) - } - .boxed_local() + }) } } @@ -233,13 +237,13 @@ impl ServiceRuntime { where F: Future + 'static, { - self.onstart.push(fut.boxed_local()) + self.onstart.push(Box::pin(fut)) } } type BoxedNewService = Box< dyn BaseServiceFactory< - (Option, StdStream), + (Option, MioStream), Response = (), Error = (), InitError = (), @@ -253,7 +257,7 @@ struct ServiceFactory { inner: T, } -impl BaseServiceFactory<(Option, StdStream)> for ServiceFactory +impl BaseServiceFactory<(Option, MioStream)> for ServiceFactory where T: BaseServiceFactory, T::Future: 'static, @@ -270,7 +274,7 @@ where fn new_service(&self, _: ()) -> Self::Future { let fut = self.inner.new_service(()); - async move { + Box::pin(async move { match fut.await { Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), Err(e) => { @@ -278,7 +282,6 @@ where Err(()) } } - } - .boxed_local() + }) } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index d7a7c242..64aca7e4 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -11,6 +11,7 @@ mod server; mod service; mod signals; mod socket; +mod waker_queue; mod worker; pub use self::builder::ServerBuilder; @@ -21,11 +22,25 @@ pub use self::service::ServiceFactory; #[doc(hidden)] pub use self::socket::FromStream; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + /// Socket ID token #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub(crate) struct Token(usize); +impl Default for Token { + fn default() -> Self { + Self::new() + } +} + impl Token { + fn new() -> Self { + Self(0) + } + pub(crate) fn next(&mut self) -> Token { let token = Token(self.0); self.0 += 1; @@ -37,3 +52,90 @@ impl Token { pub fn new() -> ServerBuilder { ServerBuilder::default() } + +// temporary Ready type for std::future::{ready, Ready}; Can be removed when MSRV surpass 1.48 +#[doc(hidden)] +pub struct Ready(Option); + +pub(crate) fn ready(t: T) -> Ready { + Ready(Some(t)) +} + +impl Unpin for Ready {} + +impl Future for Ready { + type Output = T; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(self.get_mut().0.take().unwrap()) + } +} + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAll { + fut: Vec>, +} + +pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { + let fut = fut + .into_iter() + .map(|f| JoinFuture::Future(Box::pin(f))) + .collect(); + + JoinAll { fut } +} + +enum JoinFuture { + Future(Pin>>), + Result(Option), +} + +impl Unpin for JoinAll {} + +impl Future for JoinAll { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Future(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[actix_rt::test] + async fn test_join_all() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } +} diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index b29a9e02..6b0d0aea 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -3,9 +3,8 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use futures_channel::mpsc::UnboundedSender; -use futures_channel::oneshot; -use futures_util::FutureExt; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; use crate::builder::ServerBuilder; use crate::signals::Signal; @@ -42,11 +41,11 @@ impl Server { } pub(crate) fn signal(&self, sig: Signal) { - let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); + let _ = self.0.send(ServerCommand::Signal(sig)); } pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx)); + let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); } /// Pause accepting incoming connections @@ -55,15 +54,19 @@ impl Server { /// All opened connection remains active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); - rx.map(|_| ()) + let _ = self.0.send(ServerCommand::Pause(tx)); + async { + let _ = rx.await; + } } /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); - rx.map(|_| ()) + let _ = self.0.send(ServerCommand::Resume(tx)); + async { + let _ = rx.await; + } } /// Stop incoming connection processing, stop all workers and exit. @@ -71,11 +74,13 @@ impl Server { /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Stop { + let _ = self.0.send(ServerCommand::Stop { graceful, completion: Some(tx), }); - rx.map(|_| ()) + async { + let _ = rx.await; + } } } @@ -93,7 +98,7 @@ impl Future for Server { if this.1.is_none() { let (tx, rx) = oneshot::channel(); - if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() { + if this.0.send(ServerCommand::Notify(tx)).is_err() { return Poll::Ready(Ok(())); } this.1 = Some(rx); @@ -101,8 +106,7 @@ impl Future for Server { match Pin::new(this.1.as_mut().unwrap()).poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Ready(Err(_)) => Poll::Ready(Ok(())), + Poll::Ready(_) => Poll::Ready(Ok(())), } } } diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 569ce048..04b7dce8 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -2,15 +2,13 @@ use std::marker::PhantomData; use std::net::SocketAddr; use std::task::{Context, Poll}; -use actix_rt::spawn; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_utils::counter::CounterGuard; -use futures_util::future::{err, ok, LocalBoxFuture, Ready}; -use futures_util::{FutureExt, TryFutureExt}; +use futures_core::future::LocalBoxFuture; use log::error; -use super::Token; -use crate::socket::{FromStream, StdStream}; +use crate::socket::{FromStream, MioStream}; +use crate::{ready, Ready, Token}; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -28,7 +26,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - (Option, StdStream), + (Option, MioStream), Response = (), Error = (), Future = Ready>, @@ -49,7 +47,7 @@ impl StreamService { } } -impl Service<(Option, StdStream)> for StreamService +impl Service<(Option, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -64,21 +62,21 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&mut self, (guard, req): (Option, StdStream)) -> Self::Future { - match FromStream::from_stdstream(req) { + fn call(&mut self, (guard, req): (Option, MioStream)) -> Self::Future { + ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); - spawn(async move { + actix_rt::spawn(async move { let _ = f.await; drop(guard); }); - ok(()) + Ok(()) } Err(e) => { error!("Can not convert to an async tcp stream: {}", e); - err(()) + Err(()) } - } + }) } } @@ -132,15 +130,16 @@ where fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { let token = self.token; - self.inner - .create() - .new_service(()) - .map_err(|_| ()) - .map_ok(move |inner| { - let service: BoxedServerService = Box::new(StreamService::new(inner)); - vec![(token, service)] - }) - .boxed_local() + let fut = self.inner.create().new_service(()); + Box::pin(async move { + match fut.await { + Ok(inner) => { + let service = Box::new(StreamService::new(inner)) as _; + Ok(vec![(token, service)]) + } + Err(_) => Err(()), + } + }) } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index 4fc51fc1..ea1de47e 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use futures_util::future::lazy; +use futures_core::future::LocalBoxFuture; use crate::server::Server; @@ -23,48 +23,51 @@ pub(crate) enum Signal { pub(crate) struct Signals { srv: Server, #[cfg(not(unix))] - stream: Pin>>>, + signals: LocalBoxFuture<'static, std::io::Result<()>>, #[cfg(unix)] - streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, + signals: Vec<(Signal, LocalBoxFuture<'static, ()>)>, } impl Signals { pub(crate) fn start(srv: Server) { - actix_rt::spawn(lazy(|_| { - #[cfg(not(unix))] - { - actix_rt::spawn(Signals { - srv, - stream: Box::pin(actix_rt::signal::ctrl_c()), - }); - } - #[cfg(unix)] - { - use actix_rt::signal::unix; + #[cfg(not(unix))] + { + actix_rt::spawn(Signals { + srv, + signals: Box::pin(actix_rt::signal::ctrl_c()), + }); + } + #[cfg(unix)] + { + use actix_rt::signal::unix; - let mut streams = Vec::new(); + let sig_map = [ + (unix::SignalKind::interrupt(), Signal::Int), + (unix::SignalKind::hangup(), Signal::Hup), + (unix::SignalKind::terminate(), Signal::Term), + (unix::SignalKind::quit(), Signal::Quit), + ]; - let sig_map = [ - (unix::SignalKind::interrupt(), Signal::Int), - (unix::SignalKind::hangup(), Signal::Hup), - (unix::SignalKind::terminate(), Signal::Term), - (unix::SignalKind::quit(), Signal::Quit), - ]; + let mut signals = Vec::new(); - for (kind, sig) in sig_map.iter() { - match unix::signal(*kind) { - Ok(stream) => streams.push((*sig, stream)), - Err(e) => log::error!( - "Can not initialize stream handler for {:?} err: {}", - sig, - e - ), + for (kind, sig) in sig_map.iter() { + match unix::signal(*kind) { + Ok(mut stream) => { + let fut = Box::pin(async move { + let _ = stream.recv().await; + }) as _; + signals.push((*sig, fut)); } + Err(e) => log::error!( + "Can not initialize stream handler for {:?} err: {}", + sig, + e + ), } - - actix_rt::spawn(Signals { srv, streams }) } - })); + + actix_rt::spawn(Signals { srv, signals }); + } } } @@ -73,25 +76,20 @@ impl Future for Signals { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] - match Pin::new(&mut self.stream).poll(cx) { + match self.signals.as_mut().poll(cx) { Poll::Ready(_) => { self.srv.signal(Signal::Int); Poll::Ready(()) } - Poll::Pending => return Poll::Pending, + Poll::Pending => Poll::Pending, } #[cfg(unix)] { - for idx in 0..self.streams.len() { - loop { - match self.streams[idx].1.poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => break, - Poll::Ready(Some(_)) => { - let sig = self.streams[idx].0; - self.srv.signal(sig); - } - } + for (sig, fut) in self.signals.iter_mut() { + if fut.as_mut().poll(cx).is_ready() { + let sig = *sig; + self.srv.signal(sig); + return Poll::Ready(()); } } Poll::Pending diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 3025660a..416e253b 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,135 +1,91 @@ -use std::{fmt, io, net}; +pub(crate) use std::net::{ + SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, +}; + +pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; +#[cfg(unix)] +pub(crate) use { + mio::net::UnixListener as MioUnixListener, + std::os::unix::net::UnixListener as StdUnixListener, +}; + +use std::{fmt, io}; -use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::net::TcpStream; +use mio::event::Source; +use mio::net::TcpStream as MioTcpStream; +use mio::{Interest, Registry, Token}; -pub(crate) enum StdListener { - Tcp(net::TcpListener), - #[cfg(all(unix))] - Uds(std::os::unix::net::UnixListener), +#[cfg(windows)] +use std::os::windows::io::{FromRawSocket, IntoRawSocket}; +#[cfg(unix)] +use { + actix_rt::net::UnixStream, + mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream}, + std::os::unix::io::{FromRawFd, IntoRawFd}, +}; + +pub(crate) enum MioListener { + Tcp(MioTcpListener), + #[cfg(unix)] + Uds(MioUnixListener), } -pub(crate) enum SocketAddr { - Tcp(net::SocketAddr), - #[cfg(all(unix))] - Uds(std::os::unix::net::SocketAddr), -} - -impl fmt::Display for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), - #[cfg(all(unix))] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - } - } -} - -impl fmt::Debug for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), - #[cfg(all(unix))] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - } - } -} - -impl fmt::Display for StdListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), - #[cfg(all(unix))] - StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), - } - } -} - -impl StdListener { +impl MioListener { pub(crate) fn local_addr(&self) -> SocketAddr { - match self { - StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), - #[cfg(all(unix))] - StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()), - } - } - - pub(crate) fn into_listener(self) -> SocketListener { - match self { - StdListener::Tcp(lst) => SocketListener::Tcp( - mio::net::TcpListener::from_std(lst) - .expect("Can not create mio::net::TcpListener"), - ), - #[cfg(all(unix))] - StdListener::Uds(lst) => SocketListener::Uds( - mio_uds::UnixListener::from_listener(lst) - .expect("Can not create mio_uds::UnixListener"), - ), - } - } -} - -#[derive(Debug)] -pub enum StdStream { - Tcp(std::net::TcpStream), - #[cfg(all(unix))] - Uds(std::os::unix::net::UnixStream), -} - -pub(crate) enum SocketListener { - Tcp(mio::net::TcpListener), - #[cfg(all(unix))] - Uds(mio_uds::UnixListener), -} - -impl SocketListener { - pub(crate) fn accept(&self) -> io::Result> { match *self { - SocketListener::Tcp(ref lst) => lst - .accept_std() - .map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))), - #[cfg(all(unix))] - SocketListener::Uds(ref lst) => lst.accept_std().map(|res| { - res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr))) - }), + MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), + #[cfg(unix)] + MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), + } + } + + pub(crate) fn accept(&self) -> io::Result> { + match *self { + MioListener::Tcp(ref lst) => lst + .accept() + .map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))), + #[cfg(unix)] + MioListener::Uds(ref lst) => lst + .accept() + .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))), } } } -impl mio::Evented for SocketListener { +impl Source for MioListener { fn register( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, + &mut self, + registry: &Registry, + token: Token, + interests: Interest, ) -> io::Result<()> { match *self { - SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts), - #[cfg(all(unix))] - SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts), + MioListener::Tcp(ref mut lst) => lst.register(registry, token, interests), + #[cfg(unix)] + MioListener::Uds(ref mut lst) => lst.register(registry, token, interests), } } fn reregister( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, + &mut self, + registry: &Registry, + token: Token, + interests: Interest, ) -> io::Result<()> { match *self { - SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts), - #[cfg(all(unix))] - SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts), + MioListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), + #[cfg(unix)] + MioListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), } } - fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { match *self { - SocketListener::Tcp(ref lst) => lst.deregister(poll), - #[cfg(all(unix))] - SocketListener::Uds(ref lst) => { - let res = lst.deregister(poll); + MioListener::Tcp(ref mut lst) => lst.deregister(registry), + #[cfg(unix)] + MioListener::Uds(ref mut lst) => { + let res = lst.deregister(registry); // cleanup file path if let Ok(addr) = lst.local_addr() { @@ -143,28 +99,156 @@ impl mio::Evented for SocketListener { } } -pub trait FromStream: AsyncRead + AsyncWrite + Sized { - fn from_stdstream(sock: StdStream) -> io::Result; +impl From for MioListener { + fn from(lst: StdTcpListener) -> Self { + MioListener::Tcp(MioTcpListener::from_std(lst)) + } } -impl FromStream for TcpStream { - fn from_stdstream(sock: StdStream) -> io::Result { - match sock { - StdStream::Tcp(stream) => TcpStream::from_std(stream), +#[cfg(unix)] +impl From for MioListener { + fn from(lst: StdUnixListener) -> Self { + MioListener::Uds(MioUnixListener::from_std(lst)) + } +} + +impl fmt::Debug for MioListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), #[cfg(all(unix))] - StdStream::Uds(_) => { + MioListener::Uds(ref lst) => write!(f, "{:?}", lst), + } + } +} + +impl fmt::Display for MioListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), + #[cfg(unix)] + MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), + } + } +} + +pub(crate) enum SocketAddr { + Tcp(StdSocketAddr), + #[cfg(unix)] + Uds(MioSocketAddr), +} + +impl fmt::Display for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), + #[cfg(unix)] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), + #[cfg(unix)] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + +#[derive(Debug)] +pub enum MioStream { + Tcp(MioTcpStream), + #[cfg(unix)] + Uds(MioUnixStream), +} + +/// helper trait for converting mio stream to tokio stream. +pub trait FromStream: Sized { + fn from_mio(sock: MioStream) -> io::Result; +} + +// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream +#[cfg(unix)] +impl FromStream for TcpStream { + fn from_mio(sock: MioStream) -> io::Result { + match sock { + MioStream::Tcp(mio) => { + let raw = IntoRawFd::into_raw_fd(mio); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) + } + MioStream::Uds(_) => { panic!("Should not happen, bug in server impl"); } } } } -#[cfg(all(unix))] -impl FromStream for actix_rt::net::UnixStream { - fn from_stdstream(sock: StdStream) -> io::Result { +// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream +#[cfg(windows)] +impl FromStream for TcpStream { + fn from_mio(sock: MioStream) -> io::Result { match sock { - StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"), - StdStream::Uds(stream) => actix_rt::net::UnixStream::from_std(stream), + MioStream::Tcp(mio) => { + let raw = IntoRawSocket::into_raw_socket(mio); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) + } + } + } +} + +// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream +#[cfg(unix)] +impl FromStream for UnixStream { + fn from_mio(sock: MioStream) -> io::Result { + match sock { + MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), + MioStream::Uds(mio) => { + let raw = IntoRawFd::into_raw_fd(mio); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn socket_addr() { + let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap()); + assert!(format!("{:?}", addr).contains("127.0.0.1:8080")); + assert_eq!(format!("{}", addr), "127.0.0.1:8080"); + + let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); + let socket = MioTcpSocket::new_v4().unwrap(); + socket.set_reuseaddr(true).unwrap(); + socket.bind(addr).unwrap(); + let tcp = socket.listen(128).unwrap(); + let lst = MioListener::Tcp(tcp); + assert!(format!("{:?}", lst).contains("TcpListener")); + assert!(format!("{}", lst).contains("127.0.0.1")); + } + + #[test] + #[cfg(unix)] + fn uds() { + let _ = std::fs::remove_file("/tmp/sock.xxxxx"); + if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") { + let addr = socket.local_addr().expect("Couldn't get local address"); + let a = SocketAddr::Uds(addr); + assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx")); + assert!(format!("{}", a).contains("/tmp/sock.xxxxx")); + + let lst = MioListener::Uds(socket); + assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx")); + assert!(format!("{}", lst).contains("/tmp/sock.xxxxx")); } } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs new file mode 100644 index 00000000..f92363b5 --- /dev/null +++ b/actix-server/src/waker_queue.rs @@ -0,0 +1,89 @@ +use std::{ + collections::VecDeque, + ops::Deref, + sync::{Arc, Mutex, MutexGuard}, +}; + +use mio::{Registry, Token as MioToken, Waker}; + +use crate::worker::WorkerHandle; + +/// waker token for `mio::Poll` instance +pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); + +/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` +/// the `Poll` would want to look into. +pub(crate) struct WakerQueue(Arc<(Waker, Mutex>)>); + +impl Clone for WakerQueue { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Deref for WakerQueue { + type Target = (Waker, Mutex>); + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + +impl WakerQueue { + /// construct a waker queue with given `Poll`'s `Registry` and capacity. + /// + /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match + /// event's token for it to properly handle `WakerInterest`. + pub(crate) fn new(registry: &Registry) -> std::io::Result { + let waker = Waker::new(registry, WAKER_TOKEN)?; + let queue = Mutex::new(VecDeque::with_capacity(16)); + + Ok(Self(Arc::new((waker, queue)))) + } + + /// push a new interest to the queue and wake up the accept poll afterwards. + pub(crate) fn wake(&self, interest: WakerInterest) { + let (waker, queue) = self.deref(); + + queue + .lock() + .expect("Failed to lock WakerQueue") + .push_back(interest); + + waker + .wake() + .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); + } + + /// get a MutexGuard of the waker queue. + pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque> { + self.deref().1.lock().expect("Failed to lock WakerQueue") + } + + /// reset the waker queue so it does not grow infinitely. + pub(crate) fn reset(queue: &mut VecDeque) { + std::mem::swap(&mut VecDeque::::with_capacity(16), queue); + } +} + +/// types of interests we would look into when `Accept`'s `Poll` is waked up by waker. +/// +/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related +pub(crate) enum WakerInterest { + /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker + /// available and can accept new tasks. + WorkerAvailable, + /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to + /// `ServerCommand` and notify `Accept` to do exactly these tasks. + Pause, + Resume, + Stop, + /// `Timer` is an interest sent as a delayed future. When an error happens on accepting + /// connection `Accept` would deregister socket listener temporary and wake up the poll and + /// register them again after the delayed future resolve. + Timer, + /// `Worker` is an interest happen after a worker runs into faulted state(This is determined + /// by if work can be sent to it successfully).`Accept` would be waked up and add the new + /// `WorkerHandle`. + Worker(WorkerHandle), +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index bfd11979..91e98fc2 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,22 +1,22 @@ +use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time; +use std::time::Duration; -use actix_rt::time::{delay_until, Delay, Instant}; +use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; -use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures_channel::oneshot; -use futures_util::future::{join_all, LocalBoxFuture, MapOk}; -use futures_util::{future::Future, stream::Stream, FutureExt, TryFutureExt}; +use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; -use crate::accept::AcceptNotify; use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::{SocketAddr, StdStream}; -use crate::Token; +use crate::socket::{MioStream, SocketAddr}; +use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::{join_all, Token}; pub(crate) struct WorkerCommand(Conn); @@ -29,7 +29,7 @@ pub(crate) struct StopCommand { #[derive(Debug)] pub(crate) struct Conn { - pub io: StdStream, + pub io: MioStream, pub token: Token, pub peer: Option, } @@ -46,31 +46,33 @@ pub fn max_concurrent_connections(num: usize) { MAX_CONNS.store(num, Ordering::Relaxed); } -pub(crate) fn num_connections() -> usize { - MAX_CONNS_COUNTER.with(|conns| conns.total()) -} - thread_local! { static MAX_CONNS_COUNTER: Counter = Counter::new(MAX_CONNS.load(Ordering::Relaxed)); } +pub(crate) fn num_connections() -> usize { + MAX_CONNS_COUNTER.with(|conns| conns.total()) +} + +// a handle to worker that can send message to worker and share the availability of worker to other +// thread. #[derive(Clone)] -pub(crate) struct WorkerClient { +pub(crate) struct WorkerHandle { pub idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, avail: WorkerAvailability, } -impl WorkerClient { +impl WorkerHandle { pub fn new( idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, avail: WorkerAvailability, ) -> Self { - WorkerClient { + WorkerHandle { idx, tx1, tx2, @@ -79,9 +81,7 @@ impl WorkerClient { } pub fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx1 - .unbounded_send(WorkerCommand(msg)) - .map_err(|msg| msg.into_inner().0) + self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0) } pub fn available(&self) -> bool { @@ -90,21 +90,21 @@ impl WorkerClient { pub fn stop(&self, graceful: bool) -> oneshot::Receiver { let (result, rx) = oneshot::channel(); - let _ = self.tx2.unbounded_send(StopCommand { graceful, result }); + let _ = self.tx2.send(StopCommand { graceful, result }); rx } } #[derive(Clone)] pub(crate) struct WorkerAvailability { - notify: AcceptNotify, + waker: WakerQueue, available: Arc, } impl WorkerAvailability { - pub fn new(notify: AcceptNotify) -> Self { + pub fn new(waker: WakerQueue) -> Self { WorkerAvailability { - notify, + waker, available: Arc::new(AtomicBool::new(false)), } } @@ -115,8 +115,9 @@ impl WorkerAvailability { pub fn set(&self, val: bool) { let old = self.available.swap(val, Ordering::Release); + // notify the accept on switched to available. if !old && val { - self.notify.notify() + self.waker.wake(WakerInterest::WorkerAvailable); } } } @@ -133,7 +134,7 @@ pub(crate) struct Worker { conns: Counter, factories: Vec>, state: WorkerState, - shutdown_timeout: time::Duration, + shutdown_timeout: Duration, } struct WorkerService { @@ -164,63 +165,65 @@ impl Worker { idx: usize, factories: Vec>, availability: WorkerAvailability, - shutdown_timeout: time::Duration, - ) -> WorkerClient { - let (tx1, rx) = unbounded(); - let (tx2, rx2) = unbounded(); + shutdown_timeout: Duration, + ) -> WorkerHandle { + let (tx1, rx) = unbounded_channel(); + let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); - Arbiter::new().send( - async move { - availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { - rx, - rx2, - availability, - factories, - shutdown_timeout, - services: Vec::new(), - conns: conns.clone(), - state: WorkerState::Unavailable(Vec::new()), - }); + // every worker runs in it's own arbiter. + Arbiter::new().send(Box::pin(async move { + availability.set(false); + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { + rx, + rx2, + availability, + factories, + shutdown_timeout, + services: Vec::new(), + conns: conns.clone(), + state: WorkerState::Unavailable, + }); - let mut fut: Vec, _>> = Vec::new(); - for (idx, factory) in wrk.factories.iter().enumerate() { - fut.push(factory.create().map_ok(move |r| { - r.into_iter() - .map(|(t, s): (Token, _)| (idx, t, s)) - .collect::>() - })); - } + let fut = wrk + .factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { + fut.await.map(|r| { + r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() + }) + } + }) + .collect::>(); - spawn(async move { - let res = join_all(fut).await; - let res: Result, _> = res.into_iter().collect(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - } + spawn(async move { + let res: Result, _> = join_all(fut).await.into_iter().collect(); + match res { + Ok(services) => { + for item in services { + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); } } - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - } } - wrk.await - }); - } - .boxed(), - ); + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::current().stop(); + } + } + wrk.await + }); + })); - WorkerClient::new(idx, tx1, tx2, avail) + WorkerHandle::new(idx, tx1, tx2, avail) } fn shutdown(&mut self, force: bool) { @@ -242,7 +245,7 @@ impl Worker { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { let mut ready = self.conns.available(cx); let mut failed = None; - for (idx, srv) in &mut self.services.iter_mut().enumerate() { + for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable { @@ -288,16 +291,15 @@ impl Worker { enum WorkerState { Available, - Unavailable(Vec), + Unavailable, Restarting( usize, Token, - #[allow(clippy::type_complexity)] - Pin, ()>>>>, + LocalBoxFuture<'static, Result, ()>>, ), Shutdown( - Pin>, - Pin>, + Pin>, + Pin>, Option>, ), } @@ -305,12 +307,10 @@ enum WorkerState { impl Future for Worker { type Output = (); - // FIXME: remove this attribute - #[allow(clippy::never_loop)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // `StopWorker` message handler if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_next(cx) + Pin::new(&mut self.rx2).poll_recv(cx) { self.availability.set(false); let num = num_connections(); @@ -324,8 +324,8 @@ impl Future for Worker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))), - Box::pin(delay_until(Instant::now() + self.shutdown_timeout)), + Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))), + Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)), Some(result), ); } else { @@ -341,53 +341,35 @@ impl Future for Worker { } match self.state { - WorkerState::Unavailable(ref mut conns) => { - let conn = conns.pop(); - match self.check_readiness(cx) { - Ok(true) => { - // process requests from wait queue - if let Some(conn) = conn { - let guard = self.conns.get(); - let _ = self.services[conn.token.0] - .service - .call((Some(guard), conn.io)); - } else { - self.state = WorkerState::Available; - self.availability.set(true); - } - self.poll(cx) - } - Ok(false) => { - // push connection back to queue - if let Some(conn) = conn { - if let WorkerState::Unavailable(ref mut conns) = self.state { - conns.push(conn); - } - } - Poll::Pending - } - Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); - self.poll(cx) - } + WorkerState::Unavailable => match self.check_readiness(cx) { + Ok(true) => { + self.state = WorkerState::Available; + self.availability.set(true); + self.poll(cx) } - } + Ok(false) => Poll::Pending, + Err((token, idx)) => { + trace!( + "Service {:?} failed, restarting", + self.factories[idx].name(token) + ); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = + WorkerState::Restarting(idx, token, self.factories[idx].create()); + self.poll(cx) + } + }, WorkerState::Restarting(idx, token, ref mut fut) => { - match Pin::new(fut).poll(cx) { + match fut.as_mut().poll(cx) { Poll::Ready(Ok(item)) => { - for (token, service) in item { + // only interest in the first item? + if let Some((token, service)) = item.into_iter().next() { trace!( "Service {:?} has been restarted", self.factories[idx].name(token) ); self.services[token.0].created(service); - self.state = WorkerState::Unavailable(Vec::new()); + self.state = WorkerState::Unavailable; return self.poll(cx); } } @@ -397,9 +379,7 @@ impl Future for Worker { self.factories[idx].name(token) ); } - Poll::Pending => { - return Poll::Pending; - } + Poll::Pending => return Poll::Pending, } self.poll(cx) } @@ -412,71 +392,56 @@ impl Future for Worker { } // check graceful timeout - match t2.as_mut().poll(cx) { - Poll::Pending => (), - Poll::Ready(_) => { - let _ = tx.take().unwrap().send(false); - self.shutdown(true); - Arbiter::current().stop(); - return Poll::Ready(()); - } + if Pin::new(t2).poll(cx).is_ready() { + let _ = tx.take().unwrap().send(false); + self.shutdown(true); + Arbiter::current().stop(); + return Poll::Ready(()); } // sleep for 1 second and then check again - match t1.as_mut().poll(cx) { - Poll::Pending => (), - Poll::Ready(_) => { - *t1 = Box::pin(delay_until( - Instant::now() + time::Duration::from_secs(1), - )); - let _ = t1.as_mut().poll(cx); - } + if t1.as_mut().poll(cx).is_ready() { + *t1 = Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))); + let _ = t1.as_mut().poll(cx); } + Poll::Pending } - WorkerState::Available => { - loop { - match Pin::new(&mut self.rx).poll_next(cx) { - // handle incoming io stream - Poll::Ready(Some(WorkerCommand(msg))) => { - match self.check_readiness(cx) { - Ok(true) => { - let guard = self.conns.get(); - let _ = self.services[msg.token.0] - .service - .call((Some(guard), msg.io)); - continue; - } - Ok(false) => { - trace!("Worker is unavailable"); - self.availability.set(false); - self.state = WorkerState::Unavailable(vec![msg]); - } - Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.availability.set(false); - self.services[token.0].status = - WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting( - idx, - token, - self.factories[idx].create(), - ); - } - } - return self.poll(cx); - } - Poll::Pending => { - self.state = WorkerState::Available; - return Poll::Pending; - } - Poll::Ready(None) => return Poll::Ready(()), + // actively poll stream and handle worker command + WorkerState::Available => loop { + match self.check_readiness(cx) { + Ok(true) => (), + Ok(false) => { + trace!("Worker is unavailable"); + self.availability.set(false); + self.state = WorkerState::Unavailable; + return self.poll(cx); + } + Err((token, idx)) => { + trace!( + "Service {:?} failed, restarting", + self.factories[idx].name(token) + ); + self.availability.set(false); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = + WorkerState::Restarting(idx, token, self.factories[idx].create()); + return self.poll(cx); } } - } + + match Pin::new(&mut self.rx).poll_recv(cx) { + // handle incoming io stream + Poll::Ready(Some(WorkerCommand(msg))) => { + let guard = self.conns.get(); + let _ = self.services[msg.token.0] + .service + .call((Some(guard), msg.io)); + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + }; + }, } } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 838c3cf1..2604df74 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -5,14 +5,13 @@ use std::{net, thread, time}; use actix_server::Server; use actix_service::fn_service; use futures_util::future::{lazy, ok}; -use socket2::{Domain, Protocol, Socket, Type}; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())).unwrap(); - socket.bind(&addr.into()).unwrap(); - socket.set_reuse_address(true).unwrap(); - let tcp = socket.into_tcp_listener(); + let socket = mio::net::TcpSocket::new_v4().unwrap(); + socket.bind(addr).unwrap(); + socket.set_reuseaddr(true).unwrap(); + let tcp = socket.listen(32).unwrap(); tcp.local_addr().unwrap() } @@ -22,8 +21,7 @@ fn test_bind() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let mut sys = actix_rt::System::new("test"); - + let sys = actix_rt::System::new("test"); let srv = sys.block_on(lazy(|_| { Server::build() .workers(1) @@ -49,17 +47,17 @@ fn test_listen() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let mut sys = actix_rt::System::new("test"); + let sys = actix_rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - sys.block_on(lazy(|_| { + sys.block_on(async { Server::build() .disable_signals() .workers(1) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .unwrap() - .start() - })); - let _ = tx.send(actix_rt::System::current()); + .start(); + let _ = tx.send(actix_rt::System::current()); + }); let _ = sys.run(); }); let sys = rx.recv().unwrap(); @@ -83,7 +81,7 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let mut sys = actix_rt::System::new("test"); + let sys = actix_rt::System::new("test"); let srv = sys.block_on(lazy(|_| { Server::build() .backlog(100) @@ -102,6 +100,7 @@ fn test_start() { let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); + let (srv, sys) = rx.recv().unwrap(); let mut buf = [1u8; 4]; @@ -151,7 +150,7 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); - let mut sys = actix_rt::System::new("test"); + let sys = actix_rt::System::new("test"); let srv = sys.block_on(lazy(|_| { Server::build() .disable_signals() diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index 430a12b6..17855a24 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -18,10 +18,10 @@ name = "actix_testing" path = "src/lib.rs" [dependencies] -actix-rt = "1.0.0" +actix-rt = "2.0.0-beta.1" actix-macros = "0.1.0" actix-server = "1.0.0" -actix-service = "1.0.0" +actix-service = "2.0.0-beta.1" log = "0.4" socket2 = "0.3" diff --git a/actix-testing/src/lib.rs b/actix-testing/src/lib.rs index eadfe6c9..57e2c223 100644 --- a/actix-testing/src/lib.rs +++ b/actix-testing/src/lib.rs @@ -83,7 +83,7 @@ impl TestServer { // run server in separate thread thread::spawn(move || { - let mut sys = System::new("actix-test-server"); + let sys = System::new("actix-test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); @@ -94,9 +94,8 @@ impl TestServer { .workers(1) .disable_signals() .start(); + tx.send((System::current(), local_addr)).unwrap(); }); - - tx.send((System::current(), local_addr)).unwrap(); sys.run() });