1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-14 23:50:32 +02:00

Compare commits

...

34 Commits

Author SHA1 Message Date
Rob Ede
978e4f25fb prepare actix-utils release 3.0.0 (#342) 2021-04-17 02:00:36 +01:00
Rob Ede
1c4e965366 prepare service release 2.0.0 (#339) 2021-04-16 15:18:53 +01:00
fakeshadow
2435520e67 Remove/restart worker test (#341) 2021-04-16 14:40:21 +01:00
fakeshadow
19468feef8 Fix memory ordering of WorkerAvailability (#340) 2021-04-16 11:20:08 +01:00
fakeshadow
bd48908792 Return worker index in WakerInterest::WorkerAvailable (#337) 2021-04-16 05:59:10 +01:00
fakeshadow
20c2da17ed Fix worker_avail (#336)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-04-16 03:20:02 +01:00
Rob Ede
fdafc1dd65 amend licences 2021-04-16 02:08:44 +01:00
Rob Ede
7749dfe46a address msrv todo in router 2021-04-16 02:06:11 +01:00
fakeshadow
aeb81ad3fd Fix worker are notified to stop with non_graceful shutdown (#333) 2021-04-16 00:54:15 +01:00
Rob Ede
47fba25d67 remove pipeline from public api (#335) 2021-04-16 00:00:02 +01:00
Rob Ede
7a82288066 docs tweak 2021-04-15 21:58:18 +01:00
Rob Ede
4e6d88d143 improve boxed service docs 2021-04-15 20:43:02 +01:00
Rob Ede
ef206f40fb update ignored service docs to new traits 2021-04-15 20:13:27 +01:00
fakeshadow
8e98d9168c add test for restart worker thread (#328) 2021-04-15 18:49:43 +01:00
fakeshadow
3c1f57706a Make ServerWorker drop stop Arbiter it runs on (#334) 2021-04-15 13:31:03 +01:00
fakeshadow
d49ecf7203 Fix bug where backpressure happen too early (#332) 2021-04-14 14:48:05 +01:00
fakeshadow
e0fb67f646 Reduce ServerWorker size (#321) 2021-04-13 01:12:59 +01:00
fakeshadow
ddce2d6d12 Reduce cfg flags in actix_server::socket (#325) 2021-04-10 16:05:50 +01:00
fakeshadow
0a11cf5cba Separate WorkerHandle to two parts (#323) 2021-04-10 01:03:28 +01:00
Rob Ede
859f45868d Revert "do no drain backlog on backpressure" (#324)
This reverts commit d4829b046d.
2021-04-09 21:04:41 +01:00
fakeshadow
d4829b046d do no drain backlog on backpressure (#322) 2021-04-08 23:15:10 +01:00
fakeshadow
5961eb892e Fix bug where worker service restart could skip failing services and not being able to restart multiple services (#318) 2021-04-05 20:39:05 +01:00
fakeshadow
995efcf427 Fix bug where paused Accept would register timed out sockets (#312) 2021-04-05 13:38:41 +01:00
fakeshadow
f1573931dd Remove MAX_CONN (#316) 2021-04-04 23:00:12 +01:00
fakeshadow
3859e91799 Use named type for WorkerState::Restarting and Shutdown (#317) 2021-04-04 21:53:19 +01:00
fakeshadow
8aade720ed Refactor WorkerState::Shutdown (#310) 2021-04-04 20:34:52 +01:00
fakeshadow
8079c50ddb Add ServerWorker::restart_service method (#314)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-04-04 13:22:34 +01:00
fakeshadow
05689b86d9 Remove Option wrapper for CounterGuard (#313) 2021-04-04 10:53:06 +01:00
fakeshadow
fd3e5fba02 Refactor actix_server WorkerState::Restarting enum variant. (#306)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-04-03 19:40:12 +01:00
fakeshadow
39d1f282f7 add test for max concurrent connections (#311) 2021-04-03 19:01:00 +01:00
fakeshadow
d8889c63ef Do not do double check on connection num when entering graceful shutdown (#309) 2021-04-02 12:49:12 +01:00
fakeshadow
fdac52aa11 Refactor Worker::shutdown mehtod (#308) 2021-04-02 12:22:05 +01:00
Rob Ede
6d66cfb06a prepare utils release 3.0.0-beta.4 2021-04-01 13:57:08 +01:00
Rob Ede
fb27ffc525 add future::Either type to utils (#305) 2021-04-01 13:53:44 +01:00
40 changed files with 1297 additions and 539 deletions

View File

@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier same "printed page" as the copyright notice for easier
identification within third-party archives. identification within third-party archives.
Copyright 2017-NOW Nikolay Kim Copyright 2017-NOW Actix Team
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
Copyright (c) 2017 Nikolay Kim Copyright (c) 2017-NOW Actix Team
Permission is hereby granted, free of charge, to any Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated person obtaining a copy of this software and associated

View File

@@ -7,7 +7,7 @@
//! [`Sink`]: futures_sink::Sink //! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream //! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style, future_incompatible)]
#![warn(missing_docs)] #![warn(missing_docs)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]

View File

@@ -581,10 +581,7 @@ impl ResourceDef {
mut for_prefix: bool, mut for_prefix: bool,
) -> (String, Vec<PatternElement>, bool, usize) { ) -> (String, Vec<PatternElement>, bool, usize) {
if pattern.find('{').is_none() { if pattern.find('{').is_none() {
// TODO: MSRV: 1.45 return if let Some(path) = pattern.strip_suffix('*') {
#[allow(clippy::manual_strip)]
return if pattern.ends_with('*') {
let path = &pattern[..pattern.len() - 1];
let re = String::from("^") + path + "(.*)"; let re = String::from("^") + path + "(.*)";
(re, vec![PatternElement::Str(String::from(path))], true, 0) (re, vec![PatternElement::Str(String::from(path))], true, 0)
} else { } else {

View File

@@ -1,6 +1,10 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
This would make all worker shutdown immediately in force shutdown case. [#333]
[#333]: https://github.com/actix/actix-net/pull/333
## 2.0.0-beta.4 - 2021-04-01 ## 2.0.0-beta.4 - 2021-04-01

View File

@@ -22,8 +22,8 @@ default = []
[dependencies] [dependencies]
actix-rt = { version = "2.0.0", default-features = false } actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.2" actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4" log = "0.4"

View File

@@ -9,15 +9,17 @@
//! Start typing. When you press enter the typed line will be echoed back. The server will log //! Start typing. When you press enter the typed line will be echoed back. The server will log
//! the length of each line it echos and the total size of data sent when the connection is closed. //! the length of each line it echos and the total size of data sent when the connection is closed.
use std::sync::{ use std::{
atomic::{AtomicUsize, Ordering}, env, io,
Arc, sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
}; };
use std::{env, io};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::pipeline_factory; use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::future::ok; use futures_util::future::ok;
use log::{error, info}; use log::{error, info};
@@ -25,7 +27,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[actix_rt::main] #[actix_rt::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace"); env::set_var("RUST_LOG", "info");
env_logger::init(); env_logger::init();
let count = Arc::new(AtomicUsize::new(0)); let count = Arc::new(AtomicUsize::new(0));
@@ -41,7 +43,7 @@ async fn main() -> io::Result<()> {
let count = Arc::clone(&count); let count = Arc::clone(&count);
let num2 = Arc::clone(&count); let num2 = Arc::clone(&count);
pipeline_factory(move |mut stream: TcpStream| { fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count); let count = Arc::clone(&count);
async move { async move {

View File

@@ -12,7 +12,7 @@ use slab::Slab;
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandle}; use crate::worker::{Conn, WorkerHandleAccept};
use crate::Token; use crate::Token;
struct ServerSocketInfo { struct ServerSocketInfo {
@@ -66,7 +66,7 @@ impl AcceptLoop {
pub(crate) fn start( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
) { ) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap(); let poll = self.poll.take().unwrap();
@@ -80,12 +80,59 @@ impl AcceptLoop {
struct Accept { struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
next: usize, next: usize,
avail: Availability,
backpressure: bool, backpressure: bool,
} }
/// Array of u128 with every bit as marker for a worker handle's availability.
struct Availability([u128; 4]);
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
}
impl Availability {
/// Check if any worker handle is available
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
};
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
}
/// This function defines errors that are per-connection. Which basically /// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means /// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted. /// next connection might be ready to be accepted.
@@ -105,7 +152,7 @@ impl Accept {
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
srv: Server, srv: Server,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
) { ) {
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
// actix system. // actix system.
@@ -116,6 +163,7 @@ impl Accept {
System::set_current(sys); System::set_current(sys);
let (mut accept, sockets) = let (mut accept, sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv); Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(sockets); accept.poll_with(sockets);
}) })
.unwrap(); .unwrap();
@@ -125,7 +173,7 @@ impl Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
) -> (Accept, Slab<ServerSocketInfo>) { ) -> (Accept, Slab<ServerSocketInfo>) {
let mut sockets = Slab::new(); let mut sockets = Slab::new();
@@ -148,12 +196,18 @@ impl Accept {
}); });
} }
let mut avail = Availability::default();
// Assume all handles are avail at construct time.
avail.set_available_all(&handles);
let accept = Accept { let accept = Accept {
poll, poll,
waker, waker,
handles, handles,
srv, srv,
next: 0, next: 0,
avail,
backpressure: false, backpressure: false,
}; };
@@ -166,12 +220,8 @@ impl Accept {
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() { match e.kind() {
std::io::ErrorKind::Interrupted => { std::io::ErrorKind::Interrupted => continue,
continue; _ => panic!("Poll error: {}", e),
}
_ => {
panic!("Poll error: {}", e);
}
} }
} }
@@ -188,15 +238,17 @@ impl Accept {
match guard.pop_front() { match guard.pop_front() {
// worker notify it becomes available. we may want to recover // worker notify it becomes available. we may want to recover
// from backpressure. // from backpressure.
Some(WakerInterest::WorkerAvailable) => { Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard); drop(guard);
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(idx, true);
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
drop(guard); drop(guard);
// maybe we want to recover from a backpressure. // maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle); self.handles.push(handle);
} }
// got timer interest and it's time to try register socket(s) again // got timer interest and it's time to try register socket(s) again
@@ -300,27 +352,41 @@ impl Accept {
} }
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) { fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
sockets.iter_mut().for_each(|(_, info)| { // This is a best effort implementation with following limitation:
self.deregister_logged(info); //
}); // Every ServerSocketInfo with associate timeout will be skipped and it's timeout
// is removed in the process.
//
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
// before expected timing.
sockets
.iter_mut()
// Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|(_, info)| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info));
} }
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) { fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
// Only operate when server is in a different backpressure than the given flag. // Only operate when server is in a different backpressure than the given flag.
if self.backpressure != on { if self.backpressure != on {
if on { self.backpressure = on;
self.backpressure = true; sockets
// TODO: figure out if timing out sockets can be safely de-registered twice. .iter_mut()
self.deregister_all(sockets); // Only operate on sockets without associated timeout.
} else { // Sockets with it should be handled by `accept` and `process_timer` methods.
self.backpressure = false; // They are already deregistered or need to be reregister in the future.
sockets .filter(|(_, info)| info.timeout.is_none())
.iter_mut() .for_each(|(token, info)| {
// Only operate on sockets without associated timeout. if on {
// Sockets with it will attempt to re-register when their timeout expires. self.deregister_logged(info);
.filter(|(_, info)| info.timeout.is_none()) } else {
.for_each(|(token, info)| self.register_logged(token, info)); self.register_logged(token, info);
} }
});
} }
} }
@@ -328,27 +394,25 @@ impl Accept {
if self.backpressure { if self.backpressure {
// send_connection would remove fault worker from handles. // send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone. // worst case here is conn get dropped after all handles are gone.
while !self.handles.is_empty() { while let Err(c) = self.send_connection(sockets, conn) {
match self.send_connection(sockets, conn) { conn = c
Ok(_) => return,
Err(c) => conn = c,
}
} }
} else { } else {
// Do one round and try to send conn to all workers until it succeed. while self.avail.available() {
// Start from self.next. let next = self.next();
let mut idx = 0; let idx = next.idx();
while idx < self.handles.len() { if next.available() {
idx += 1; self.avail.set_available(idx, true);
if self.handles[self.next].available() {
match self.send_connection(sockets, conn) { match self.send_connection(sockets, conn) {
Ok(_) => return, Ok(_) => return,
Err(c) => conn = c, Err(c) => conn = c,
} }
} else { } else {
self.avail.set_available(idx, false);
self.set_next(); self.set_next();
} }
} }
// Sending Conn failed due to either all workers are in error or not available. // Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again. // Enter backpressure state and try again.
self.maybe_backpressure(sockets, true); self.maybe_backpressure(sockets, true);
@@ -356,28 +420,22 @@ impl Accept {
} }
} }
// Set next worker handle that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
// Send connection to worker and handle error. // Send connection to worker and handle error.
fn send_connection( fn send_connection(
&mut self, &mut self,
sockets: &mut Slab<ServerSocketInfo>, sockets: &mut Slab<ServerSocketInfo>,
conn: Conn, conn: Conn,
) -> Result<(), Conn> { ) -> Result<(), Conn> {
match self.handles[self.next].send(conn) { match self.next().send(conn) {
Ok(_) => { Ok(_) => {
self.set_next(); self.set_next();
Ok(()) Ok(())
} }
Err(conn) => { Err(conn) => {
// worker lost contact and could be gone. a message is sent to // Worker thread is error and could be gone.
// `ServerBuilder` future to notify it a new worker should be made. // Remove worker handle and notify `ServerBuilder`.
// after that remove the fault worker and enter backpressure if necessary. self.remove_next();
self.srv.worker_faulted(self.handles[self.next].idx);
self.handles.swap_remove(self.next);
if self.handles.is_empty() { if self.handles.is_empty() {
error!("No workers"); error!("No workers");
self.maybe_backpressure(sockets, true); self.maybe_backpressure(sockets, true);
@@ -387,6 +445,7 @@ impl Accept {
} else if self.handles.len() <= self.next { } else if self.handles.len() <= self.next {
self.next = 0; self.next = 0;
} }
Err(conn) Err(conn)
} }
} }
@@ -431,4 +490,88 @@ impl Accept {
}; };
} }
} }
fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next]
}
/// Set next worker handle that would accept connection.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
/// Remove next worker handle that fail to accept connection.
fn remove_next(&mut self) {
let handle = self.handles.swap_remove(self.next);
let idx = handle.idx();
// A message is sent to `ServerBuilder` future to notify it a new worker
// should be made.
self.srv.worker_faulted(idx);
self.avail.set_available(idx, false);
}
}
#[cfg(test)]
mod test {
use super::Availability;
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
aval.set_available(idx, true);
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
} }

View File

@@ -19,7 +19,10 @@ use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; use crate::worker::{
ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
WorkerHandleServer,
};
use crate::{join_all, Token}; use crate::{join_all, Token};
/// Server builder /// Server builder
@@ -27,7 +30,7 @@ pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: Token,
backlog: u32, backlog: u32,
handles: Vec<(usize, WorkerHandle)>, handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>, sockets: Vec<(Token, String, MioListener)>,
accept: AcceptLoop, accept: AcceptLoop,
@@ -117,8 +120,8 @@ impl ServerBuilder {
/// reached for each worker. /// reached for each worker.
/// ///
/// By default max connections is set to a 25k per worker. /// By default max connections is set to a 25k per worker.
pub fn maxconn(self, num: usize) -> Self { pub fn maxconn(mut self, num: usize) -> Self {
worker::max_concurrent_connections(num); self.worker_config.max_concurrent_connections(num);
self self
} }
@@ -280,10 +283,11 @@ impl ServerBuilder {
// start workers // start workers
let handles = (0..self.threads) let handles = (0..self.threads)
.map(|idx| { .map(|idx| {
let handle = self.start_worker(idx, self.accept.waker_owned()); let (handle_accept, handle_server) =
self.handles.push((idx, handle.clone())); self.start_worker(idx, self.accept.waker_owned());
self.handles.push((idx, handle_server));
handle handle_accept
}) })
.collect(); .collect();
@@ -311,8 +315,12 @@ impl ServerBuilder {
} }
} }
fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle { fn start_worker(
let avail = WorkerAvailability::new(waker); &self,
idx: usize,
waker: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let avail = WorkerAvailability::new(idx, waker);
let services = self.services.iter().map(|v| v.clone_factory()).collect(); let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, avail, self.worker_config) ServerWorker::start(idx, services, avail, self.worker_config)
@@ -373,45 +381,29 @@ impl ServerBuilder {
let notify = std::mem::take(&mut self.notify); let notify = std::mem::take(&mut self.notify);
// stop workers // stop workers
if !self.handles.is_empty() && graceful { let stop = self
let iter = self .handles
.handles .iter()
.iter() .map(move |worker| worker.1.stop(graceful))
.map(move |worker| worker.1.stop(graceful)) .collect();
.collect();
let fut = join_all(iter); rt::spawn(async move {
if graceful {
rt::spawn(async move { let _ = join_all(stop).await;
let _ = fut.await;
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
if exit {
rt::spawn(async {
sleep(Duration::from_millis(300)).await;
System::current().stop();
});
}
});
} else {
// we need to stop system if server was spawned
if self.exit {
rt::spawn(async {
sleep(Duration::from_millis(300)).await;
System::current().stop();
});
} }
if let Some(tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
} }
for tx in notify { for tx in notify {
let _ = tx.send(()); let _ = tx.send(());
} }
}
if exit {
sleep(Duration::from_millis(300)).await;
System::current().stop();
}
});
} }
ServerCommand::WorkerFaulted(idx) => { ServerCommand::WorkerFaulted(idx) => {
let mut found = false; let mut found = false;
@@ -437,9 +429,10 @@ impl ServerBuilder {
break; break;
} }
let handle = self.start_worker(new_idx, self.accept.waker_owned()); let (handle_accept, handle_server) =
self.handles.push((new_idx, handle.clone())); self.start_worker(new_idx, self.accept.waker_owned());
self.accept.wake(WakerInterest::Worker(handle)); self.handles.push((new_idx, handle_server));
self.accept.wake(WakerInterest::Worker(handle_accept));
} }
} }
} }

View File

@@ -7,14 +7,14 @@ use actix_service::{
fn_service, IntoServiceFactory as IntoBaseServiceFactory, fn_service, IntoServiceFactory as IntoBaseServiceFactory,
ServiceFactory as BaseServiceFactory, ServiceFactory as BaseServiceFactory,
}; };
use actix_utils::counter::CounterGuard; use actix_utils::{counter::CounterGuard, future::ready};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::builder::bind_addr; use crate::builder::bind_addr;
use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; use crate::service::{BoxedServerService, InternalServiceFactory, StreamService};
use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::{ready, Token}; use crate::Token;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, MioTcpListener)>, pub(crate) services: Vec<(String, MioTcpListener)>,
@@ -243,7 +243,7 @@ impl ServiceRuntime {
type BoxedNewService = Box< type BoxedNewService = Box<
dyn BaseServiceFactory< dyn BaseServiceFactory<
(Option<CounterGuard>, MioStream), (CounterGuard, MioStream),
Response = (), Response = (),
Error = (), Error = (),
InitError = (), InitError = (),
@@ -257,7 +257,7 @@ struct ServiceFactory<T> {
inner: T, inner: T,
} }
impl<T> BaseServiceFactory<(Option<CounterGuard>, MioStream)> for ServiceFactory<T> impl<T> BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory<T>
where where
T: BaseServiceFactory<TcpStream, Config = ()>, T: BaseServiceFactory<TcpStream, Config = ()>,
T::Future: 'static, T::Future: 'static,

View File

@@ -55,24 +55,6 @@ pub fn new() -> ServerBuilder {
ServerBuilder::default() ServerBuilder::default()
} }
// temporary Ready type for std::future::{ready, Ready}; Can be removed when MSRV surpass 1.48
#[doc(hidden)]
pub struct Ready<T>(Option<T>);
pub(crate) fn ready<T>(t: T) -> Ready<T> {
Ready(Some(t))
}
impl<T> Unpin for Ready<T> {}
impl<T> Future for Ready<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(self.get_mut().0.take().unwrap())
}
}
// a poor man's join future. joined future is only used when starting/stopping the server. // a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task. // pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> { pub(crate) struct JoinAll<T> {
@@ -132,6 +114,8 @@ impl<T> Future for JoinAll<T> {
mod test { mod test {
use super::*; use super::*;
use actix_utils::future::ready;
#[actix_rt::test] #[actix_rt::test]
async fn test_join_all() { async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];

View File

@@ -3,12 +3,15 @@ use std::net::SocketAddr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::counter::CounterGuard; use actix_utils::{
counter::CounterGuard,
future::{ready, Ready},
};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::socket::{FromStream, MioStream};
use crate::{ready, Ready, Token}; use crate::Token;
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>; type Factory: BaseServiceFactory<Stream, Config = ()>;
@@ -26,7 +29,7 @@ pub(crate) trait InternalServiceFactory: Send {
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
dyn Service< dyn Service<
(Option<CounterGuard>, MioStream), (CounterGuard, MioStream),
Response = (), Response = (),
Error = (), Error = (),
Future = Ready<Result<(), ()>>, Future = Ready<Result<(), ()>>,
@@ -47,7 +50,7 @@ impl<S, I> StreamService<S, I> {
} }
} }
impl<S, I> Service<(Option<CounterGuard>, MioStream)> for StreamService<S, I> impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I>
where where
S: Service<I>, S: Service<I>,
S::Future: 'static, S::Future: 'static,
@@ -62,7 +65,7 @@ where
self.service.poll_ready(ctx).map_err(|_| ()) self.service.poll_ready(ctx).map_err(|_| ())
} }
fn call(&self, (guard, req): (Option<CounterGuard>, MioStream)) -> Self::Future { fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future {
ready(match FromStream::from_mio(req) { ready(match FromStream::from_mio(req) {
Ok(stream) => { Ok(stream) => {
let f = self.service.call(stream); let f = self.service.call(stream);

View File

@@ -12,18 +12,7 @@ pub(crate) use {
use std::{fmt, io}; use std::{fmt, io};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use mio::event::Source; use mio::{event::Source, Interest, Registry, Token};
use mio::net::TcpStream as MioTcpStream;
use mio::{Interest, Registry, Token};
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
#[cfg(unix)]
use {
actix_rt::net::UnixStream,
mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream},
std::os::unix::io::{FromRawFd, IntoRawFd},
};
pub(crate) enum MioListener { pub(crate) enum MioListener {
Tcp(MioTcpListener), Tcp(MioTcpListener),
@@ -131,7 +120,7 @@ impl fmt::Display for MioListener {
pub(crate) enum SocketAddr { pub(crate) enum SocketAddr {
Tcp(StdSocketAddr), Tcp(StdSocketAddr),
#[cfg(unix)] #[cfg(unix)]
Uds(MioSocketAddr), Uds(mio::net::SocketAddr),
} }
impl fmt::Display for SocketAddr { impl fmt::Display for SocketAddr {
@@ -156,9 +145,9 @@ impl fmt::Debug for SocketAddr {
#[derive(Debug)] #[derive(Debug)]
pub enum MioStream { pub enum MioStream {
Tcp(MioTcpStream), Tcp(mio::net::TcpStream),
#[cfg(unix)] #[cfg(unix)]
Uds(MioUnixStream), Uds(mio::net::UnixStream),
} }
/// helper trait for converting mio stream to tokio stream. /// helper trait for converting mio stream to tokio stream.
@@ -166,47 +155,60 @@ pub trait FromStream: Sized {
fn from_mio(sock: MioStream) -> io::Result<Self>; fn from_mio(sock: MioStream) -> io::Result<Self>;
} }
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)]
impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(windows)] #[cfg(windows)]
impl FromStream for TcpStream { mod win_impl {
fn from_mio(sock: MioStream) -> io::Result<Self> { use super::*;
match sock {
MioStream::Tcp(mio) => { use std::os::windows::io::{FromRawSocket, IntoRawSocket};
let raw = IntoRawSocket::into_raw_socket(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream. // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
let raw = IntoRawSocket::into_raw_socket(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) })
}
} }
} }
} }
} }
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)] #[cfg(unix)]
impl FromStream for UnixStream { mod unix_impl {
fn from_mio(sock: MioStream) -> io::Result<Self> { use super::*;
match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), use std::os::unix::io::{FromRawFd, IntoRawFd};
MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio); use actix_rt::net::UnixStream;
// SAFETY: This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for UnixStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
} }
} }
} }

View File

@@ -6,7 +6,7 @@ use std::{
use mio::{Registry, Token as MioToken, Waker}; use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandle; use crate::worker::WorkerHandleAccept;
/// Waker token for `mio::Poll` instance. /// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
@@ -72,7 +72,7 @@ impl WakerQueue {
pub(crate) enum WakerInterest { pub(crate) enum WakerInterest {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks. /// available and can accept new tasks.
WorkerAvailable, WorkerAvailable(usize),
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks. /// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause, Pause,
@@ -84,6 +84,6 @@ pub(crate) enum WakerInterest {
Timer, Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandle`. /// `WorkerHandleAccept`.
Worker(WorkerHandle), Worker(WorkerHandleAccept),
} }

View File

@@ -1,30 +1,38 @@
use std::future::Future; use std::{
use std::pin::Pin; future::Future,
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; mem,
use std::sync::Arc; pin::Pin,
use std::task::{Context, Poll}; sync::{
use std::time::Duration; atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use actix_rt::time::{sleep, Sleep}; use actix_rt::{
use actix_rt::{spawn, Arbiter}; spawn,
time::{sleep, Instant, Sleep},
Arbiter,
};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture; use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace}; use log::{error, info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::{
use tokio::sync::oneshot; mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream; use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token}; use crate::{join_all, Token};
pub(crate) struct WorkerCommand(Conn); /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
/// Stop worker message. Returns `true` on successful shutdown pub(crate) struct Stop {
/// and `false` if some connections still alive.
pub(crate) struct StopCommand {
graceful: bool, graceful: bool,
result: oneshot::Sender<bool>, tx: oneshot::Sender<bool>,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -33,90 +41,91 @@ pub(crate) struct Conn {
pub token: Token, pub token: Token,
} }
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); fn handle_pair(
idx: usize,
tx1: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>,
avail: WorkerAvailability,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept { tx: tx1, avail };
/// Sets the maximum per-worker number of concurrent connections. let server = WorkerHandleServer { idx, tx: tx2 };
(accept, server)
}
/// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread.
/// ///
/// All socket listeners will stop accepting connections when this limit is /// Held by [Accept](crate::accept::Accept).
/// reached for each worker. pub(crate) struct WorkerHandleAccept {
/// tx: UnboundedSender<Conn>,
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
thread_local! {
static MAX_CONNS_COUNTER: Counter =
Counter::new(MAX_CONNS.load(Ordering::Relaxed));
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
// a handle to worker that can send message to worker and share the availability of worker to other
// thread.
#[derive(Clone)]
pub(crate) struct WorkerHandle {
pub idx: usize,
tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability,
} }
impl WorkerHandle { impl WorkerHandleAccept {
pub fn new( #[inline(always)]
idx: usize, pub(crate) fn idx(&self) -> usize {
tx1: UnboundedSender<WorkerCommand>, self.avail.idx
tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability,
) -> Self {
WorkerHandle {
idx,
tx1,
tx2,
avail,
}
} }
pub fn send(&self, msg: Conn) -> Result<(), Conn> { #[inline(always)]
self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0) pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0)
} }
pub fn available(&self) -> bool { #[inline(always)]
pub(crate) fn available(&self) -> bool {
self.avail.available() self.avail.available()
} }
}
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { /// Handle to worker than can send stop message to worker.
let (result, rx) = oneshot::channel(); ///
let _ = self.tx2.send(StopCommand { graceful, result }); /// Held by [ServerBuilder](crate::builder::ServerBuilder).
pub(crate) struct WorkerHandleServer {
pub idx: usize,
tx: UnboundedSender<Stop>,
}
impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(Stop { graceful, tx });
rx rx
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct WorkerAvailability { pub(crate) struct WorkerAvailability {
idx: usize,
waker: WakerQueue, waker: WakerQueue,
available: Arc<AtomicBool>, available: Arc<AtomicBool>,
} }
impl WorkerAvailability { impl WorkerAvailability {
pub fn new(waker: WakerQueue) -> Self { pub fn new(idx: usize, waker: WakerQueue) -> Self {
WorkerAvailability { WorkerAvailability {
idx,
waker, waker,
available: Arc::new(AtomicBool::new(false)), available: Arc::new(AtomicBool::new(false)),
} }
} }
#[inline(always)]
pub fn available(&self) -> bool { pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire) self.available.load(Ordering::Acquire)
} }
pub fn set(&self, val: bool) { pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release); // Ordering:
// notify the accept on switched to available. //
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
// Order is important between them.
let old = self.available.swap(val, Ordering::AcqRel);
// Notify the accept on switched to available.
if !old && val { if !old && val {
self.waker.wake(WakerInterest::WorkerAvailable); self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
} }
} }
} }
@@ -125,14 +134,16 @@ impl WorkerAvailability {
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream processing. /// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
rx: UnboundedReceiver<WorkerCommand>, // UnboundedReceiver<Conn> should always be the first field.
rx2: UnboundedReceiver<StopCommand>, // It must be dropped as soon as ServerWorker dropping.
services: Vec<WorkerService>, rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
services: Box<[WorkerService]>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Box<[Box<dyn InternalServiceFactory>]>,
state: WorkerState, state: WorkerState,
config: ServerWorkerConfig, shutdown_timeout: Duration,
} }
struct WorkerService { struct WorkerService {
@@ -163,6 +174,7 @@ enum WorkerServiceStatus {
pub(crate) struct ServerWorkerConfig { pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration, shutdown_timeout: Duration,
max_blocking_threads: usize, max_blocking_threads: usize,
max_concurrent_connections: usize,
} }
impl Default for ServerWorkerConfig { impl Default for ServerWorkerConfig {
@@ -172,6 +184,7 @@ impl Default for ServerWorkerConfig {
Self { Self {
shutdown_timeout: Duration::from_secs(30), shutdown_timeout: Duration::from_secs(30),
max_blocking_threads, max_blocking_threads,
max_concurrent_connections: 25600,
} }
} }
} }
@@ -181,6 +194,10 @@ impl ServerWorkerConfig {
self.max_blocking_threads = num; self.max_blocking_threads = num;
} }
pub(crate) fn max_concurrent_connections(&mut self, num: usize) {
self.max_concurrent_connections = num;
}
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
self.shutdown_timeout = dur; self.shutdown_timeout = dur;
} }
@@ -192,7 +209,9 @@ impl ServerWorker {
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> WorkerHandle { ) -> (WorkerHandleAccept, WorkerHandleServer) {
assert!(!availability.available());
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel(); let (tx2, rx2) = unbounded_channel();
let avail = availability.clone(); let avail = availability.clone();
@@ -207,20 +226,7 @@ impl ServerWorker {
.unwrap() .unwrap()
}) })
.spawn(async move { .spawn(async move {
availability.set(false); let fut = factories
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx,
rx2,
availability,
factories,
config,
services: Vec::new(),
conns: conns.clone(),
state: WorkerState::Unavailable,
});
let fut = wrk
.factories
.iter() .iter()
.enumerate() .enumerate()
.map(|(idx, factory)| { .map(|(idx, factory)| {
@@ -233,54 +239,76 @@ impl ServerWorker {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// a second spawn to make sure worker future runs as non boxed future. // a second spawn to run !Send future tasks.
// As Arbiter::spawn would box the future before send it to arbiter.
spawn(async move { spawn(async move {
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect(); let res = join_all(fut)
match res { .await
Ok(services) => { .into_iter()
for item in services { .collect::<Result<Vec<_>, _>>();
for (factory, token, service) in item { let services = match res {
assert_eq!(token.0, wrk.services.len()); Ok(res) => res
wrk.services.push(WorkerService { .into_iter()
factory, .flatten()
service, .fold(Vec::new(), |mut services, (factory, token, service)| {
status: WorkerServiceStatus::Unavailable, assert_eq!(token.0, services.len());
}); services.push(WorkerService {
} factory,
} service,
} status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => { Err(e) => {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::current().stop(); Arbiter::current().stop();
return;
} }
} };
wrk.await
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
availability,
conns: Counter::new(config.max_concurrent_connections),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
}); });
}); });
WorkerHandle::new(idx, tx1, tx2, avail) handle_pair(idx, tx1, tx2, avail)
}
fn restart_service(&mut self, token: Token, factory_id: usize) {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(token));
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(Restart {
factory_id,
token,
fut: factory.create(),
});
} }
fn shutdown(&mut self, force: bool) { fn shutdown(&mut self, force: bool) {
if force { self.services
self.services.iter_mut().for_each(|srv| { .iter_mut()
if srv.status == WorkerServiceStatus::Available { .filter(|srv| srv.status == WorkerServiceStatus::Available)
srv.status = WorkerServiceStatus::Stopped; .for_each(|srv| {
} srv.status = if force {
WorkerServiceStatus::Stopped
} else {
WorkerServiceStatus::Stopping
};
}); });
} else {
self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping;
}
});
}
} }
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
let mut ready = self.conns.available(cx); let mut ready = self.conns.available(cx);
let mut failed = None;
for (idx, srv) in self.services.iter_mut().enumerate() { for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable || srv.status == WorkerServiceStatus::Unavailable
@@ -311,171 +339,178 @@ impl ServerWorker {
"Service {:?} readiness check returned error, restarting", "Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(Token(idx)) self.factories[srv.factory].name(Token(idx))
); );
failed = Some((Token(idx), srv.factory));
srv.status = WorkerServiceStatus::Failed; srv.status = WorkerServiceStatus::Failed;
return Err((Token(idx), srv.factory));
} }
} }
} }
} }
if let Some(idx) = failed {
Err(idx) Ok(ready)
} else {
Ok(ready)
}
} }
} }
enum WorkerState { enum WorkerState {
Available, Available,
Unavailable, Unavailable,
Restarting( Restarting(Restart),
usize, Shutdown(Shutdown),
Token, }
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
), struct Restart {
Shutdown( factory_id: usize,
Pin<Box<Sleep>>, token: Token,
Pin<Box<Sleep>>, fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
Option<oneshot::Sender<bool>>, }
),
// Shutdown keep states necessary for server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
struct Shutdown {
timer: Pin<Box<Sleep>>,
start_from: Instant,
tx: oneshot::Sender<bool>,
}
impl Default for WorkerState {
fn default() -> Self {
Self::Unavailable
}
}
impl Drop for ServerWorker {
fn drop(&mut self) {
// Set availability to true so if accept try to send connection to this worker
// it would find worker is gone and remove it.
// This is helpful when worker is dropped unexpected.
self.availability.set(true);
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
}
} }
impl Future for ServerWorker { impl Future for ServerWorker {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) = if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
Pin::new(&mut self.rx2).poll_recv(cx)
{ {
self.availability.set(false); this.availability.set(false);
let num = num_connections(); let num = this.conns.total();
if num == 0 { if num == 0 {
info!("Shutting down worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = result.send(true); let _ = tx.send(true);
return Poll::Ready(()); return Poll::Ready(());
} else if graceful { } else if graceful {
self.shutdown(false); info!("Graceful worker shutdown, {} connections", num);
let num = num_connections(); this.shutdown(false);
if num != 0 {
info!("Graceful worker shutdown, {} connections", num); this.state = WorkerState::Shutdown(Shutdown {
self.state = WorkerState::Shutdown( timer: Box::pin(sleep(Duration::from_secs(1))),
Box::pin(sleep(Duration::from_secs(1))), start_from: Instant::now(),
Box::pin(sleep(self.config.shutdown_timeout)), tx,
Some(result), });
);
} else {
let _ = result.send(true);
return Poll::Ready(());
}
} else { } else {
info!("Force shutdown worker, {} connections", num); info!("Force shutdown worker, {} connections", num);
self.shutdown(true); this.shutdown(true);
let _ = result.send(false);
let _ = tx.send(false);
return Poll::Ready(()); return Poll::Ready(());
} }
} }
match self.state { match this.state {
WorkerState::Unavailable => match self.check_readiness(cx) { WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => { Ok(true) => {
self.state = WorkerState::Available; this.state = WorkerState::Available;
self.availability.set(true); this.availability.set(true);
self.poll(cx) self.poll(cx)
} }
Ok(false) => Poll::Pending, Ok(false) => Poll::Pending,
Err((token, idx)) => { Err((token, idx)) => {
trace!( this.restart_service(token, idx);
"Service {:?} failed, restarting",
self.factories[idx].name(token)
);
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state =
WorkerState::Restarting(idx, token, self.factories[idx].create());
self.poll(cx) self.poll(cx)
} }
}, },
WorkerState::Restarting(idx, token, ref mut fut) => { WorkerState::Restarting(ref mut restart) => {
match fut.as_mut().poll(cx) { let factory_id = restart.factory_id;
Poll::Ready(Ok(item)) => { let token = restart.token;
// only interest in the first item?
if let Some((token, service)) = item.into_iter().next() { let service = ready!(restart.fut.as_mut().poll(cx))
trace!( .unwrap_or_else(|_| {
"Service {:?} has been restarted",
self.factories[idx].name(token)
);
self.services[token.0].created(service);
self.state = WorkerState::Unavailable;
return self.poll(cx);
}
}
Poll::Ready(Err(_)) => {
panic!( panic!(
"Can not restart {:?} service", "Can not restart {:?} service",
self.factories[idx].name(token) this.factories[factory_id].name(token)
); )
} })
Poll::Pending => return Poll::Pending, .into_iter()
} // Find the same token from vector. There should be only one
// So the first match would be enough.
.find(|(t, _)| *t == token)
.map(|(_, service)| service)
.expect("No BoxedServerService found");
trace!(
"Service {:?} has been restarted",
this.factories[factory_id].name(token)
);
this.services[token.0].created(service);
this.state = WorkerState::Unavailable;
self.poll(cx) self.poll(cx)
} }
WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { WorkerState::Shutdown(ref mut shutdown) => {
let num = num_connections(); // Wait for 1 second.
if num == 0 { ready!(shutdown.timer.as_mut().poll(cx));
let _ = tx.take().unwrap().send(true);
Arbiter::current().stop();
return Poll::Ready(());
}
// check graceful timeout if this.conns.total() == 0 {
if Pin::new(t2).poll(cx).is_ready() { // Graceful shutdown.
let _ = tx.take().unwrap().send(false); if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
self.shutdown(true); let _ = shutdown.tx.send(true);
Arbiter::current().stop(); }
return Poll::Ready(()); Poll::Ready(())
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
// Timeout forceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(false);
}
Poll::Ready(())
} else {
// Reset timer and wait for 1 second.
let time = Instant::now() + Duration::from_secs(1);
shutdown.timer.as_mut().reset(time);
shutdown.timer.as_mut().poll(cx)
} }
// sleep for 1 second and then check again
if t1.as_mut().poll(cx).is_ready() {
*t1 = Box::pin(sleep(Duration::from_secs(1)));
let _ = t1.as_mut().poll(cx);
}
Poll::Pending
} }
// actively poll stream and handle worker command // actively poll stream and handle worker command
WorkerState::Available => loop { WorkerState::Available => loop {
match self.check_readiness(cx) { match this.check_readiness(cx) {
Ok(true) => (), Ok(true) => {}
Ok(false) => { Ok(false) => {
trace!("Worker is unavailable"); trace!("Worker is unavailable");
self.availability.set(false); this.availability.set(false);
self.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
return self.poll(cx); return self.poll(cx);
} }
Err((token, idx)) => { Err((token, idx)) => {
trace!( this.restart_service(token, idx);
"Service {:?} failed, restarting", this.availability.set(false);
self.factories[idx].name(token)
);
self.availability.set(false);
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state =
WorkerState::Restarting(idx, token, self.factories[idx].create());
return self.poll(cx); return self.poll(cx);
} }
} }
match Pin::new(&mut self.rx).poll_recv(cx) { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream // handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => { Some(msg) => {
let guard = self.conns.get(); let guard = this.conns.get();
let _ = self.services[msg.token.0] let _ = this.services[msg.token.0].service.call((guard, msg.io));
.service
.call((Some(guard), msg.io));
} }
Poll::Pending => return Poll::Pending, None => return Poll::Ready(()),
Poll::Ready(None) => return Poll::Ready(()),
}; };
}, },
} }

View File

@@ -1,7 +1,8 @@
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::{net, thread, time}; use std::{net, thread, time::Duration};
use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server; use actix_server::Server;
use actix_service::fn_service; use actix_service::fn_service;
use actix_utils::future::ok; use actix_utils::future::ok;
@@ -37,7 +38,7 @@ fn test_bind() {
}); });
let (_, sys) = rx.recv().unwrap(); let (_, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
@@ -64,7 +65,7 @@ fn test_listen() {
}); });
let sys = rx.recv().unwrap(); let sys = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
@@ -73,11 +74,11 @@ fn test_listen() {
#[test] #[test]
#[cfg(unix)] #[cfg(unix)]
fn test_start() { fn test_start() {
use std::io::Read;
use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
use bytes::Bytes; use bytes::Bytes;
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
use std::io::Read;
let addr = unused_addr(); let addr = unused_addr();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
@@ -112,16 +113,16 @@ fn test_start() {
// pause // pause
let _ = srv.pause(); let _ = srv.pause();
thread::sleep(time::Duration::from_millis(200)); thread::sleep(Duration::from_millis(200));
let mut conn = net::TcpStream::connect(addr).unwrap(); let mut conn = net::TcpStream::connect(addr).unwrap();
conn.set_read_timeout(Some(time::Duration::from_millis(100))) conn.set_read_timeout(Some(Duration::from_millis(100)))
.unwrap(); .unwrap();
let res = conn.read_exact(&mut buf); let res = conn.read_exact(&mut buf);
assert!(res.is_err()); assert!(res.is_err());
// resume // resume
let _ = srv.resume(); let _ = srv.resume();
thread::sleep(time::Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
@@ -133,10 +134,10 @@ fn test_start() {
// stop // stop
let _ = srv.stop(false); let _ = srv.stop(false);
thread::sleep(time::Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err()); assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(time::Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
} }
@@ -169,7 +170,7 @@ fn test_configure() {
rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
rt.on_start(lazy(move |_| { rt.on_start(lazy(move |_| {
let _ = num.fetch_add(1, Relaxed); let _ = num.fetch_add(1, Ordering::Relaxed);
})) }))
}) })
}) })
@@ -182,12 +183,397 @@ fn test_configure() {
let _ = sys.run(); let _ = sys.run();
}); });
let (_, sys) = rx.recv().unwrap(); let (_, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok());
assert!(net::TcpStream::connect(addr3).is_ok()); assert!(net::TcpStream::connect(addr3).is_ok());
assert_eq!(num.load(Relaxed), 1); assert_eq!(num.load(Ordering::Relaxed), 1);
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
} }
#[actix_rt::test]
async fn test_max_concurrent_connections() {
// Note:
// A tcp listener would accept connects based on it's backlog setting.
//
// The limit test on the other hand is only for concurrent tcp stream limiting a work
// thread accept.
use tokio::io::AsyncWriteExt;
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let max_conn = 3;
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let server = Server::build()
// Set a relative higher backlog.
.backlog(12)
// max connection for a worker is 3.
.maxconn(max_conn)
.workers(1)
.disable_signals()
.bind("test", addr, move || {
let counter = counter.clone();
fn_service(move |_io: TcpStream| {
let counter = counter.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_secs(20)).await;
counter.fetch_sub(1, Ordering::SeqCst);
Ok::<(), ()>(())
}
})
})?
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (srv, sys) = rx.recv().unwrap();
let mut conns = vec![];
for _ in 0..12 {
let conn = tokio::net::TcpStream::connect(addr).await.unwrap();
conns.push(conn);
}
sleep(Duration::from_secs(5)).await;
// counter would remain at 3 even with 12 successful connection.
// and 9 of them remain in backlog.
assert_eq!(max_conn, counter_clone.load(Ordering::SeqCst));
for mut conn in conns {
conn.shutdown().await.unwrap();
}
srv.stop(false).await;
sys.stop();
let _ = h.join().unwrap();
}
#[actix_rt::test]
async fn test_service_restart() {
use std::task::{Context, Poll};
use actix_service::{fn_factory, Service};
use futures_core::future::LocalBoxFuture;
use tokio::io::AsyncWriteExt;
struct TestService(Arc<AtomicUsize>);
impl Service<TcpStream> for TestService {
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let TestService(ref counter) = self;
let c = counter.fetch_add(1, Ordering::SeqCst);
// Force the service to restart on first readiness check.
if c > 0 {
Poll::Ready(Ok(()))
} else {
Poll::Ready(Err(()))
}
}
fn call(&self, _: TcpStream) -> Self::Future {
Box::pin(async { Ok(()) })
}
}
let addr1 = unused_addr();
let addr2 = unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = Arc::new(AtomicUsize::new(0));
let num_clone = num.clone();
let num2_clone = num2.clone();
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let server = Server::build()
.backlog(1)
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
let num2 = num2.clone();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.apply(move |rt| {
let num = num.clone();
let num2 = num2.clone();
rt.service(
"addr1",
fn_factory(move || {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
}),
);
rt.service(
"addr2",
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
}),
);
})
})
.unwrap()
.workers(1)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
for _ in 0..5 {
TcpStream::connect(addr1)
.await
.unwrap()
.shutdown()
.await
.unwrap();
TcpStream::connect(addr2)
.await
.unwrap()
.shutdown()
.await
.unwrap();
}
sleep(Duration::from_secs(3)).await;
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
let addr1 = unused_addr();
let addr2 = unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = Arc::new(AtomicUsize::new(0));
let num_clone = num.clone();
let num2_clone = num2.clone();
let h = thread::spawn(move || {
let num = num.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
.backlog(1)
.disable_signals()
.bind("addr1", addr1, move || {
let num = num.clone();
fn_factory(move || {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
})
})
.unwrap()
.bind("addr2", addr2, move || {
let num2 = num2.clone();
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
})
})
.unwrap()
.workers(1)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
for _ in 0..5 {
TcpStream::connect(addr1)
.await
.unwrap()
.shutdown()
.await
.unwrap();
TcpStream::connect(addr2)
.await
.unwrap()
.shutdown()
.await
.unwrap();
}
sleep(Duration::from_secs(3)).await;
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
}
#[ignore]
#[actix_rt::test]
async fn worker_restart() {
use actix_service::{Service, ServiceFactory};
use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory {
type Response = ();
type Error = ();
type Config = ();
type Service = TestService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: Self::Config) -> Self::Future {
let counter = self.0.fetch_add(1, Ordering::Relaxed);
Box::pin(async move { Ok(TestService(counter)) })
}
}
struct TestService(usize);
impl Service<TcpStream> for TestService {
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!();
fn call(&self, stream: TcpStream) -> Self::Future {
let counter = self.0;
let mut stream = stream.into_std().unwrap();
use std::io::Write;
let str = counter.to_string();
let buf = str.as_bytes();
let mut written = 0;
while written < buf.len() {
if let Ok(n) = stream.write(&buf[written..]) {
written += n;
}
}
stream.flush().unwrap();
stream.shutdown(net::Shutdown::Write).unwrap();
// force worker 2 to restart service once.
if counter == 2 {
panic!("panic on purpose")
} else {
Box::pin(async { Ok(()) })
}
}
}
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let counter = Arc::new(AtomicUsize::new(1));
let h = thread::spawn(move || {
let counter = counter.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
.unwrap()
.workers(2)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await;
let mut buf = [0; 8];
// worker 1 would not restart and return it's id consistently.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// worker 2 dead after return response.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("2", id);
stream.shutdown().await.unwrap();
// request to worker 1
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarting and work goes to worker 1.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarted but worker 1 was still the next to accept connection.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 accept connection again but it's id is 3.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("3", id);
stream.shutdown().await.unwrap();
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
}

View File

@@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.0.0 - 2021-04-16
* Removed pipeline and related structs/functions. [#335]
[#335]: https://github.com/actix/actix-net/pull/335
## 2.0.0-beta.5 - 2021-03-15 ## 2.0.0-beta.5 - 2021-03-15
* Add default `Service` trait impl for `Rc<S: Service>` and `&S: Service`. [#288] * Add default `Service` trait impl for `Rc<S: Service>` and `&S: Service`. [#288]
* Add `boxed::rc_service` function for constructing `boxed::RcService` type [#290] * Add `boxed::rc_service` function for constructing `boxed::RcService` type [#290]

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "2.0.0-beta.5" version = "2.0.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
@@ -8,11 +8,8 @@ authors = [
] ]
description = "Service trait and combinators for representing asynchronous request/response operations." description = "Service trait and combinators for representing asynchronous request/response operations."
keywords = ["network", "framework", "async", "futures", "service"] keywords = ["network", "framework", "async", "futures", "service"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-service"
readme = "README.md"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
@@ -22,8 +19,10 @@ path = "src/lib.rs"
[dependencies] [dependencies]
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
paste = "1"
pin-project-lite = "0.2" pin-project-lite = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"
actix-utils = "3.0.0"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }

View File

@@ -3,10 +3,10 @@
> Service trait and combinators for representing asynchronous request/response operations. > Service trait and combinators for representing asynchronous request/response operations.
[![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service) [![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service)
[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0-beta.5)](https://docs.rs/actix-service/2.0.0-beta.5) [![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0)](https://docs.rs/actix-service/2.0.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![License](https://img.shields.io/crates/l/actix-service.svg) ![License](https://img.shields.io/crates/l/actix-service.svg)
[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0-beta.5/status.svg)](https://deps.rs/crate/actix-service/2.0.0-beta.5) [![Dependency Status](https://deps.rs/crate/actix-service/2.0.0/status.svg)](https://deps.rs/crate/actix-service/2.0.0)
![Download](https://img.shields.io/crates/d/actix-service.svg) ![Download](https://img.shields.io/crates/d/actix-service.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@@ -11,11 +11,11 @@ use pin_project_lite::pin_project;
use super::{Service, ServiceFactory}; use super::{Service, ServiceFactory};
/// Service for the `and_then` combinator, chaining a computation onto the end /// Service for the `and_then` combinator, chaining a computation onto the end of another service
/// of another service which completes successfully. /// which completes successfully.
/// ///
/// This is created by the `Pipeline::and_then` method. /// This is created by the `Pipeline::and_then` method.
pub(crate) struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>); pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
impl<A, B, Req> AndThenService<A, B, Req> { impl<A, B, Req> AndThenService<A, B, Req> {
/// Create new `AndThen` combinator /// Create new `AndThen` combinator
@@ -64,7 +64,7 @@ where
} }
pin_project! { pin_project! {
pub(crate) struct AndThenServiceResponse<A, B, Req> pub struct AndThenServiceResponse<A, B, Req>
where where
A: Service<Req>, A: Service<Req>,
B: Service<A::Response, Error = A::Error>, B: Service<A::Response, Error = A::Error>,
@@ -117,7 +117,7 @@ where
} }
/// `.and_then()` service factory combinator /// `.and_then()` service factory combinator
pub(crate) struct AndThenServiceFactory<A, B, Req> pub struct AndThenServiceFactory<A, B, Req>
where where
A: ServiceFactory<Req>, A: ServiceFactory<Req>,
A::Config: Clone, A::Config: Clone,
@@ -200,7 +200,7 @@ where
} }
pin_project! { pin_project! {
pub(crate) struct AndThenServiceFactoryResponse<A, B, Req> pub struct AndThenServiceFactoryResponse<A, B, Req>
where where
A: ServiceFactory<Req>, A: ServiceFactory<Req>,
B: ServiceFactory<A::Response>, B: ServiceFactory<A::Response>,
@@ -272,7 +272,9 @@ mod tests {
use futures_util::future::lazy; use futures_util::future::lazy;
use crate::{ use crate::{
fn_factory, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory, fn_factory, ok,
pipeline::{pipeline, pipeline_factory},
ready, Ready, Service, ServiceFactory,
}; };
struct Srv1(Rc<Cell<usize>>); struct Srv1(Rc<Cell<usize>>);

View File

@@ -214,7 +214,11 @@ mod tests {
use futures_util::future::lazy; use futures_util::future::lazy;
use super::*; use super::*;
use crate::{ok, pipeline, pipeline_factory, Ready, Service, ServiceFactory}; use crate::{
ok,
pipeline::{pipeline, pipeline_factory},
Ready, Service, ServiceFactory,
};
#[derive(Clone)] #[derive(Clone)]
struct Srv; struct Srv;

View File

@@ -3,26 +3,30 @@
use alloc::{boxed::Box, rc::Rc}; use alloc::{boxed::Box, rc::Rc};
use core::{future::Future, pin::Pin}; use core::{future::Future, pin::Pin};
use paste::paste;
use crate::{Service, ServiceFactory}; use crate::{Service, ServiceFactory};
/// A boxed future without a Send bound or lifetime parameters. /// A boxed future with no send bound or lifetime parameters.
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>; pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>;
macro_rules! service_object { macro_rules! service_object {
($name: ident, $type: tt, $fn_name: ident) => { ($name: ident, $type: tt, $fn_name: ident) => {
/// Type alias for service trait object. paste! {
pub type $name<Req, Res, Err> = $type< #[doc = "Type alias for service trait object using `" $type "`."]
dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<Result<Res, Err>>>, pub type $name<Req, Res, Err> = $type<
>; dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<Result<Res, Err>>>,
>;
/// Create service trait object. #[doc = "Wraps service as a trait object using [`" $name "`]."]
pub fn $fn_name<S, Req>(service: S) -> $name<Req, S::Response, S::Error> pub fn $fn_name<S, Req>(service: S) -> $name<Req, S::Response, S::Error>
where where
S: Service<Req> + 'static, S: Service<Req> + 'static,
Req: 'static, Req: 'static,
S::Future: 'static, S::Future: 'static,
{ {
$type::new(ServiceWrapper::new(service)) $type::new(ServiceWrapper::new(service))
}
} }
}; };
} }
@@ -56,10 +60,10 @@ where
} }
} }
/// Wrapper for a service factory trait object that will produce a boxed trait object service. /// Wrapper for a service factory that will map it's services to boxed trait object services.
pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(Inner<Cfg, Req, Res, Err, InitErr>); pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(Inner<Cfg, Req, Res, Err, InitErr>);
/// Create service factory trait object. /// Wraps a service factory that returns service trait objects.
pub fn factory<SF, Req>( pub fn factory<SF, Req>(
factory: SF, factory: SF,
) -> BoxServiceFactory<SF::Config, Req, SF::Response, SF::Error, SF::InitError> ) -> BoxServiceFactory<SF::Config, Req, SF::Response, SF::Error, SF::InitError>

View File

@@ -1,8 +1,12 @@
use crate::{ use crate::{
map::Map, map_err::MapErr, transform_err::TransformMapInitErr, Service, ServiceFactory, and_then::{AndThenService, AndThenServiceFactory},
Transform, map::Map,
map_err::MapErr,
transform_err::TransformMapInitErr,
IntoService, IntoServiceFactory, Service, ServiceFactory, Transform,
}; };
/// An extension trait for [`Service`]s that provides a variety of convenient adapters.
pub trait ServiceExt<Req>: Service<Req> { pub trait ServiceExt<Req>: Service<Req> {
/// Map this service's output to a different type, returning a new service /// Map this service's output to a different type, returning a new service
/// of the resulting type. /// of the resulting type.
@@ -36,10 +40,27 @@ pub trait ServiceExt<Req>: Service<Req> {
{ {
MapErr::new(self, f) MapErr::new(self, f)
} }
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that the second service
/// isn't called until call to the fist service have finished. Result of the call to the first
/// service is used as an input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a wrapped version of it.
fn and_then<I, S1>(self, service: I) -> AndThenService<Self, S1, Req>
where
Self: Sized,
I: IntoService<S1, Self::Response>,
S1: Service<Self::Response, Error = Self::Error>,
{
AndThenService::new(self, service.into_service())
}
} }
impl<S, Req> ServiceExt<Req> for S where S: Service<Req> {} impl<S, Req> ServiceExt<Req> for S where S: Service<Req> {}
/// An extension trait for [`ServiceFactory`]s that provides a variety of convenient adapters.
pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> { pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
/// Map this service's output to a different type, returning a new service /// Map this service's output to a different type, returning a new service
/// of the resulting type. /// of the resulting type.
@@ -68,10 +89,27 @@ pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
{ {
crate::map_init_err::MapInitErr::new(self, f) crate::map_init_err::MapInitErr::new(self, f)
} }
/// Call another service after call to this one has resolved successfully.
fn and_then<I, SF1>(self, factory: I) -> AndThenServiceFactory<Self, SF1, Req>
where
Self: Sized,
Self::Config: Clone,
I: IntoServiceFactory<SF1, Self::Response>,
SF1: ServiceFactory<
Self::Response,
Config = Self::Config,
Error = Self::Error,
InitError = Self::InitError,
>,
{
AndThenServiceFactory::new(self, factory.into_factory())
}
} }
impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {} impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {}
/// An extension trait for [`Transform`]s that provides a variety of convenient adapters.
pub trait TransformExt<S, Req>: Transform<S, Req> { pub trait TransformExt<S, Req>: Transform<S, Req> {
/// Return a new `Transform` whose init error is mapped to to a different type. /// Return a new `Transform` whose init error is mapped to to a different type.
fn map_init_err<F, E>(self, f: F) -> TransformMapInitErr<Self, S, Req, F, E> fn map_init_err<F, E>(self, f: F) -> TransformMapInitErr<Self, S, Req, F, E>

View File

@@ -1,7 +1,8 @@
//! See [`Service`] docs for information on this crate's foundational trait. //! See [`Service`] docs for information on this crate's foundational trait.
#![no_std] #![no_std]
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style, future_incompatible)]
#![warn(missing_docs)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
@@ -37,7 +38,6 @@ pub use self::apply_cfg::{apply_cfg, apply_cfg_factory};
pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt}; pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt};
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::map_config::{map_config, unit_config}; pub use self::map_config::{map_config, unit_config};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
pub use self::transform::{apply, ApplyTransform, Transform}; pub use self::transform::{apply, ApplyTransform, Transform};
#[allow(unused_imports)] #[allow(unused_imports)]
@@ -53,8 +53,14 @@ use self::ready::{err, ok, ready, Ready};
/// async fn(Request) -> Result<Response, Err> /// async fn(Request) -> Result<Response, Err>
/// ``` /// ```
/// ///
/// The `Service` trait just generalizes this form where each parameter is described as an /// The `Service` trait just generalizes this form. Requests are defined as a generic type parameter
/// associated type on the trait. Services can also have mutable state that influence computation. /// and responses and other details are defined as associated types on the trait impl. Notice that
/// this design means that services can receive many request types and converge them to a single
/// response type.
///
/// Services can also have mutable state that influence computation by using a `Cell`, `RefCell`
/// or `Mutex`. Services intentionally do not take `&mut self` to reduce overhead in the
/// common cases.
/// ///
/// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent /// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent
/// both clients and servers. Services describe only _transformation_ operations which encourage /// both clients and servers. Services describe only _transformation_ operations which encourage
@@ -64,11 +70,10 @@ use self::ready::{err, ok, ready, Ready};
/// ```ignore /// ```ignore
/// struct MyService; /// struct MyService;
/// ///
/// impl Service for MyService { /// impl Service<u8> for MyService {
/// type Request = u8;
/// type Response = u64; /// type Response = u64;
/// type Error = MyError; /// type Error = MyError;
/// type Future = Pin<Box<Future<Output=Result<Self::Response, Self::Error>>>>; /// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
/// ///
/// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... } /// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... }
/// ///
@@ -77,10 +82,13 @@ use self::ready::{err, ok, ready, Ready};
/// ``` /// ```
/// ///
/// Sometimes it is not necessary to implement the Service trait. For example, the above service /// Sometimes it is not necessary to implement the Service trait. For example, the above service
/// could be rewritten as a simple function and passed to [fn_service](fn_service()). /// could be rewritten as a simple function and passed to [`fn_service`](fn_service()).
/// ///
/// ```ignore /// ```ignore
/// async fn my_service(req: u8) -> Result<u64, MyError>; /// async fn my_service(req: u8) -> Result<u64, MyError>;
///
/// let svc = fn_service(my_service)
/// svc.call(123)
/// ``` /// ```
pub trait Service<Req> { pub trait Service<Req> {
/// Responses given by the service. /// Responses given by the service.
@@ -94,13 +102,12 @@ pub trait Service<Req> {
/// Returns `Ready` when the service is able to process requests. /// Returns `Ready` when the service is able to process requests.
/// ///
/// If the service is at capacity, then `Pending` is returned and the task /// If the service is at capacity, then `Pending` is returned and the task is notified when the
/// is notified when the service becomes ready again. This function is /// service becomes ready again. This function is expected to be called while on a task.
/// expected to be called while on a task.
/// ///
/// This is a **best effort** implementation. False positives are permitted. /// This is a best effort implementation. False positives are permitted. It is permitted for
/// It is permitted for the service to return `Ready` from a `poll_ready` /// the service to return `Ready` from a `poll_ready` call and the next invocation of `call`
/// call and the next invocation of `call` results in an error. /// results in an error.
/// ///
/// # Notes /// # Notes
/// 1. `poll_ready` might be called on a different task to `call`. /// 1. `poll_ready` might be called on a different task to `call`.
@@ -109,25 +116,26 @@ pub trait Service<Req> {
/// Process the request and return the response asynchronously. /// Process the request and return the response asynchronously.
/// ///
/// This function is expected to be callable off task. As such, /// This function is expected to be callable off-task. As such, implementations of `call` should
/// implementations should take care to not call `poll_ready`. If the /// take care to not call `poll_ready`. If the service is at capacity and the request is unable
/// service is at capacity and the request is unable to be handled, the /// to be handled, the returned `Future` should resolve to an error.
/// returned `Future` should resolve to an error.
/// ///
/// Calling `call` without calling `poll_ready` is permitted. The /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be
/// implementation must be resilient to this fact. /// resilient to this fact.
fn call(&self, req: Req) -> Self::Future; fn call(&self, req: Req) -> Self::Future;
} }
/// Factory for creating `Service`s. /// Factory for creating `Service`s.
/// ///
/// Acts as a service factory. This is useful for cases where new `Service`s /// This is useful for cases where new `Service`s must be produced. One case is a TCP
/// must be produced. One case is a TCP server listener. The listener /// server listener: a listener accepts new connections, constructs a new `Service` for each using
/// accepts new TCP streams, obtains a new `Service` using the /// the `ServiceFactory` trait, and uses the new `Service` to process inbound requests on that new
/// `ServiceFactory` trait, and uses the new `Service` to process inbound /// connection.
/// requests on that new TCP stream.
/// ///
/// `Config` is a service factory configuration type. /// `Config` is a service factory configuration type.
///
/// Simple factories may be able to use [`fn_factory`] or [`fn_factory_with_config`] to
/// reduce boilerplate.
pub trait ServiceFactory<Req> { pub trait ServiceFactory<Req> {
/// Responses given by the created services. /// Responses given by the created services.
type Response; type Response;
@@ -144,7 +152,7 @@ pub trait ServiceFactory<Req> {
/// Errors potentially raised while building a service. /// Errors potentially raised while building a service.
type InitError; type InitError;
/// The future of the `Service` instance. /// The future of the `Service` instance.g
type Future: Future<Output = Result<Self::Service, Self::InitError>>; type Future: Future<Output = Result<Self::Service, Self::InitError>>;
/// Create and return a new service asynchronously. /// Create and return a new service asynchronously.

View File

@@ -1,6 +1,6 @@
/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness. /// An implementation of [`poll_ready`]() that always signals readiness.
/// ///
/// [`Service::poll_ready`]: crate::Service::poll_ready /// [`poll_ready`]: crate::Service::poll_ready
/// ///
/// # Examples /// # Examples
/// ```no_run /// ```no_run
@@ -34,12 +34,12 @@ macro_rules! always_ready {
}; };
} }
/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a /// An implementation of [`poll_ready`] that forwards readiness checks to a
/// named struct field. /// named struct field.
/// ///
/// Tuple structs are not supported. /// Tuple structs are not supported.
/// ///
/// [`Service::poll_ready`]: crate::Service::poll_ready /// [`poll_ready`]: crate::Service::poll_ready
/// ///
/// # Examples /// # Examples
/// ```no_run /// ```no_run

View File

@@ -1,3 +1,6 @@
// TODO: see if pipeline is necessary
#![allow(dead_code)]
use core::{ use core::{
marker::PhantomData, marker::PhantomData,
task::{Context, Poll}, task::{Context, Poll},
@@ -11,7 +14,7 @@ use crate::then::{ThenService, ThenServiceFactory};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Construct new pipeline with one service in pipeline chain. /// Construct new pipeline with one service in pipeline chain.
pub fn pipeline<I, S, Req>(service: I) -> Pipeline<S, Req> pub(crate) fn pipeline<I, S, Req>(service: I) -> Pipeline<S, Req>
where where
I: IntoService<S, Req>, I: IntoService<S, Req>,
S: Service<Req>, S: Service<Req>,
@@ -23,7 +26,7 @@ where
} }
/// Construct new pipeline factory with one service factory. /// Construct new pipeline factory with one service factory.
pub fn pipeline_factory<I, SF, Req>(factory: I) -> PipelineFactory<SF, Req> pub(crate) fn pipeline_factory<I, SF, Req>(factory: I) -> PipelineFactory<SF, Req>
where where
I: IntoServiceFactory<SF, Req>, I: IntoServiceFactory<SF, Req>,
SF: ServiceFactory<Req>, SF: ServiceFactory<Req>,
@@ -35,7 +38,7 @@ where
} }
/// Pipeline service - pipeline allows to compose multiple service into one service. /// Pipeline service - pipeline allows to compose multiple service into one service.
pub struct Pipeline<S, Req> { pub(crate) struct Pipeline<S, Req> {
service: S, service: S,
_phantom: PhantomData<Req>, _phantom: PhantomData<Req>,
} }
@@ -157,7 +160,7 @@ impl<S: Service<Req>, Req> Service<Req> for Pipeline<S, Req> {
} }
/// Pipeline factory /// Pipeline factory
pub struct PipelineFactory<SF, Req> { pub(crate) struct PipelineFactory<SF, Req> {
factory: SF, factory: SF,
_phantom: PhantomData<Req>, _phantom: PhantomData<Req>,
} }

View File

@@ -246,7 +246,11 @@ mod tests {
use futures_util::future::lazy; use futures_util::future::lazy;
use crate::{err, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory}; use crate::{
err, ok,
pipeline::{pipeline, pipeline_factory},
ready, Ready, Service, ServiceFactory,
};
#[derive(Clone)] #[derive(Clone)]
struct Srv1(Rc<Cell<usize>>); struct Srv1(Rc<Cell<usize>>);

View File

@@ -21,13 +21,12 @@ where
ApplyTransform::new(t, factory.into_factory()) ApplyTransform::new(t, factory.into_factory())
} }
/// The `Transform` trait defines the interface of a service factory that wraps inner service /// Defines the interface of a service factory that wraps inner service during construction.
/// during construction.
/// ///
/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in /// Transformers wrap an inner service and runs during inbound and/or outbound processing in the
/// the request/response lifecycle. It may modify request and/or response. /// service lifecycle. It may modify request and/or response.
/// ///
/// For example, timeout transform: /// For example, a timeout service wrapper:
/// ///
/// ```ignore /// ```ignore
/// pub struct Timeout<S> { /// pub struct Timeout<S> {
@@ -35,11 +34,7 @@ where
/// timeout: Duration, /// timeout: Duration,
/// } /// }
/// ///
/// impl<S> Service for Timeout<S> /// impl<S: Service<Req>, Req> Service<Req> for Timeout<S> {
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// type Response = S::Response; /// type Response = S::Response;
/// type Error = TimeoutError<S::Error>; /// type Error = TimeoutError<S::Error>;
/// type Future = TimeoutServiceResponse<S>; /// type Future = TimeoutServiceResponse<S>;
@@ -55,26 +50,22 @@ where
/// } /// }
/// ``` /// ```
/// ///
/// Timeout service in above example is decoupled from underlying service implementation and could /// This wrapper service is decoupled from the underlying service implementation and could be
/// be applied to any service. /// applied to any service.
/// ///
/// The `Transform` trait defines the interface of a Service factory. `Transform` is often /// The `Transform` trait defines the interface of a service wrapper. `Transform` is often
/// implemented for middleware, defining how to construct a middleware Service. A Service that is /// implemented for middleware, defining how to construct a middleware Service. A Service that is
/// constructed by the factory takes the Service that follows it during execution as a parameter, /// constructed by the factory takes the Service that follows it during execution as a parameter,
/// assuming ownership of the next Service. /// assuming ownership of the next Service.
/// ///
/// Factory for `Timeout` middleware from the above example could look like this: /// A transform for the `Timeout` middleware could look like this:
/// ///
/// ```ignore /// ```ignore
/// pub struct TimeoutTransform { /// pub struct TimeoutTransform {
/// timeout: Duration, /// timeout: Duration,
/// } /// }
/// ///
/// impl<S> Transform<S> for TimeoutTransform /// impl<S: Service<Req>, Req> Transform<S, Req> for TimeoutTransform {
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// type Response = S::Response; /// type Response = S::Response;
/// type Error = TimeoutError<S::Error>; /// type Error = TimeoutError<S::Error>;
/// type InitError = S::Error; /// type InitError = S::Error;
@@ -82,7 +73,7 @@ where
/// type Future = Ready<Result<Self::Transform, Self::InitError>>; /// type Future = Ready<Result<Self::Transform, Self::InitError>>;
/// ///
/// fn new_transform(&self, service: S) -> Self::Future { /// fn new_transform(&self, service: S) -> Self::Future {
/// ready(Ok(TimeoutService { /// ready(Ok(Timeout {
/// service, /// service,
/// timeout: self.timeout, /// timeout: self.timeout,
/// })) /// }))
@@ -227,3 +218,53 @@ where
} }
} }
} }
#[cfg(test)]
mod tests {
use core::time::Duration;
use actix_utils::future::{ready, Ready};
use super::*;
use crate::Service;
// pseudo-doctest for Transform trait
pub struct TimeoutTransform {
timeout: Duration,
}
// pseudo-doctest for Transform trait
impl<S: Service<Req>, Req> Transform<S, Req> for TimeoutTransform {
type Response = S::Response;
type Error = S::Error;
type InitError = S::Error;
type Transform = Timeout<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(Timeout {
service,
_timeout: self.timeout,
}))
}
}
// pseudo-doctest for Transform trait
pub struct Timeout<S> {
service: S,
_timeout: Duration,
}
// pseudo-doctest for Transform trait
impl<S: Service<Req>, Req> Service<Req> for Timeout<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
crate::forward_ready!(service);
fn call(&self, req: Req) -> Self::Future {
self.service.call(req)
}
}
}

View File

@@ -42,8 +42,8 @@ uri = ["http"]
[dependencies] [dependencies]
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.2.0", default-features = false } actix-rt = { version = "2.2.0", default-features = false }
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.2" actix-utils = "3.0.0"
derive_more = "0.99.5" derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }

View File

@@ -31,7 +31,7 @@ use std::{
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::pipeline_factory; use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok; use futures_util::future::ok;
use log::info; use log::info;
@@ -39,14 +39,9 @@ use rustls::{
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig, internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
}; };
#[derive(Debug)]
struct ServiceState {
num: Arc<AtomicUsize>,
}
#[actix_rt::main] #[actix_rt::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace"); env::set_var("RUST_LOG", "info");
env_logger::init(); env_logger::init();
let mut tls_config = ServerConfig::new(NoClientAuth::new()); let mut tls_config = ServerConfig::new(NoClientAuth::new());
@@ -73,7 +68,8 @@ async fn main() -> io::Result<()> {
let count = Arc::clone(&count); let count = Arc::clone(&count);
// Set up TLS service factory // Set up TLS service factory
pipeline_factory(tls_acceptor.clone()) tls_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err)) .map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| { .and_then(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed); let num = count.fetch_add(1, Ordering::Relaxed);

View File

@@ -56,7 +56,7 @@ pub enum Resolver {
/// An interface for custom async DNS resolvers. /// An interface for custom async DNS resolvers.
/// ///
/// # Usage /// # Usage
/// ```rust /// ```
/// use std::net::SocketAddr; /// use std::net::SocketAddr;
/// ///
/// use actix_tls::connect::{Resolve, Resolver}; /// use actix_tls::connect::{Resolve, Resolver};

View File

@@ -16,9 +16,9 @@ name = "actix_tracing"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0"
futures-util = { version = "0.3.7", default-features = false }
tracing = "0.1" tracing = "0.1"
tracing-futures = "0.2" tracing-futures = "0.2"

View File

@@ -9,7 +9,7 @@ use core::marker::PhantomData;
use actix_service::{ use actix_service::{
apply, ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform, apply, ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform,
}; };
use futures_util::future::{ok, Either, Ready}; use actix_utils::future::{ok, Either, Ready};
use tracing_futures::{Instrument, Instrumented}; use tracing_futures::{Instrument, Instrumented};
/// A `Service` implementation that automatically enters/exits tracing spans /// A `Service` implementation that automatically enters/exits tracing spans
@@ -48,9 +48,9 @@ where
.clone() .clone()
.map(|span| tracing::span!(parent: &span, tracing::Level::INFO, "future")) .map(|span| tracing::span!(parent: &span, tracing::Level::INFO, "future"))
{ {
Either::Right(fut.instrument(span)) Either::right(fut.instrument(span))
} else { } else {
Either::Left(fut) Either::left(fut)
} }
} }
} }

View File

@@ -3,6 +3,16 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 3.0.0 - 2021-04-16
* No significant changes from `3.0.0-beta.4`.
## 3.0.0-beta.4 - 2021-04-01
* Add `future::Either` type. [#305]
[#305]: https://github.com/actix/actix-net/pull/305
## 3.0.0-beta.3 - 2021-04-01 ## 3.0.0-beta.3 - 2021-04-01
* Moved `mpsc` to own crate `local-channel`. [#301] * Moved `mpsc` to own crate `local-channel`. [#301]
* Moved `task::LocalWaker` to own crate `local-waker`. [#301] * Moved `task::LocalWaker` to own crate `local-waker`. [#301]

View File

@@ -1,14 +1,14 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "3.0.0-beta.3" version = "3.0.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
] ]
description = "Utilities for the Actix ecosystem" description = "Various utilities used in the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
@@ -17,6 +17,7 @@ name = "actix_utils"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
pin-project-lite = "0.2"
local-waker = "0.1" local-waker = "0.1"
[dev-dependencies] [dev-dependencies]

View File

@@ -0,0 +1,91 @@
//! A symmetric either future.
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
pin_project! {
/// Combines two different futures that have the same output type.
///
/// Construct variants with [`Either::left`] and [`Either::right`].
///
/// # Examples
/// ```
/// use actix_utils::future::{ready, Ready, Either};
///
/// # async fn run() {
/// let res = Either::<_, Ready<usize>>::left(ready(42));
/// assert_eq!(res.await, 42);
///
/// let res = Either::<Ready<usize>, _>::right(ready(43));
/// assert_eq!(res.await, 43);
/// # }
/// ```
#[project = EitherProj]
#[derive(Debug, Clone)]
pub enum Either<L, R> {
/// A value of type `L`.
#[allow(missing_docs)]
Left { #[pin] value: L },
/// A value of type `R`.
#[allow(missing_docs)]
Right { #[pin] value: R },
}
}
impl<L, R> Either<L, R> {
/// Creates new `Either` using left variant.
pub fn left(value: L) -> Either<L, R> {
Either::Left { value }
}
/// Creates new `Either` using right variant.
pub fn right(value: R) -> Either<L, R> {
Either::Right { value }
}
}
impl<T> Either<T, T> {
/// Unwraps into inner value when left and right have a common type.
pub fn into_inner(self) -> T {
match self {
Either::Left { value } => value,
Either::Right { value } => value,
}
}
}
impl<L, R> Future for Either<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
type Output = L::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
EitherProj::Left { value } => value.poll(cx),
EitherProj::Right { value } => value.poll(cx),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::future::{ready, Ready};
#[actix_rt::test]
async fn test_either() {
let res = Either::<_, Ready<usize>>::left(ready(42));
assert_eq!(res.await, 42);
let res = Either::<Ready<usize>, _>::right(ready(43));
assert_eq!(res.await, 43);
}
}

View File

@@ -1,7 +1,9 @@
//! Asynchronous values. //! Asynchronous values.
mod either;
mod poll_fn; mod poll_fn;
mod ready; mod ready;
pub use self::either::Either;
pub use self::poll_fn::{poll_fn, PollFn}; pub use self::poll_fn::{poll_fn, PollFn};
pub use self::ready::{err, ok, ready, Ready}; pub use self::ready::{err, ok, ready, Ready};

View File

@@ -7,7 +7,7 @@ use core::{
task::{Context, Poll}, task::{Context, Poll},
}; };
/// Create a future driven by the provided function that receives a task context. /// Creates a future driven by the provided function that receives a task context.
pub fn poll_fn<F, T>(f: F) -> PollFn<F> pub fn poll_fn<F, T>(f: F) -> PollFn<F>
where where
F: FnMut(&mut Context<'_>) -> Poll<T>, F: FnMut(&mut Context<'_>) -> Poll<T>,

View File

@@ -69,7 +69,7 @@ pub fn ready<T>(val: T) -> Ready<T> {
Ready { val: Some(val) } Ready { val: Some(val) }
} }
/// Create a future that is immediately ready with a success value. /// Creates a future that is immediately ready with a success value.
/// ///
/// # Examples /// # Examples
/// ```no_run /// ```no_run
@@ -84,7 +84,7 @@ pub fn ok<T, E>(val: T) -> Ready<Result<T, E>> {
Ready { val: Some(Ok(val)) } Ready { val: Some(Ok(val)) }
} }
/// Create a future that is immediately ready with an error value. /// Creates a future that is immediately ready with an error value.
/// ///
/// # Examples /// # Examples
/// ```no_run /// ```no_run

View File

@@ -1,4 +1,4 @@
//! Various utilities for the Actix ecosystem. //! Various utilities used in the Actix ecosystem.
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![warn(missing_docs)] #![warn(missing_docs)]