mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-12 07:37:06 +02:00
Compare commits
8 Commits
rt-v2.2.0
...
server-v2.
Author | SHA1 | Date | |
---|---|---|---|
|
b068ea16f8 | ||
|
4eebdf4070 | ||
|
b09e7cd417 | ||
|
2c5c9167a5 | ||
|
ee3a548a85 | ||
|
f21eaa954f | ||
|
8becb0db70 | ||
|
26a5af70cb |
3
.cargo/config.toml
Normal file
3
.cargo/config.toml
Normal file
@@ -0,0 +1,3 @@
|
||||
[alias]
|
||||
chk = "hack check --workspace --all-features --tests --examples"
|
||||
lint = "hack --clean-per-run clippy --workspace --tests --examples"
|
@@ -10,6 +10,8 @@ members = [
|
||||
"actix-tracing",
|
||||
"actix-utils",
|
||||
"bytestring",
|
||||
"local-channel",
|
||||
"local-waker",
|
||||
]
|
||||
|
||||
[patch.crates-io]
|
||||
@@ -23,3 +25,5 @@ actix-tls = { path = "actix-tls" }
|
||||
actix-tracing = { path = "actix-tracing" }
|
||||
actix-utils = { path = "actix-utils" }
|
||||
bytestring = { path = "bytestring" }
|
||||
local-channel = { path = "local-channel" }
|
||||
local-waker = { path = "local-waker" }
|
||||
|
@@ -19,5 +19,5 @@ syn = { version = "^1", features = ["full"] }
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
|
||||
futures-util = { version = "0.3", default-features = false }
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
trybuild = "1"
|
||||
|
@@ -1,7 +1,10 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Prevent panic when shutdown_timeout is very large. [f9262db]
|
||||
|
||||
|
||||
## 2.0.0-beta.4 - 2021-04-01
|
||||
* Prevent panic when `shutdown_timeout` is very large. [f9262db]
|
||||
|
||||
[f9262db]: https://github.com/actix/actix-net/commit/f9262db
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-server"
|
||||
version = "2.0.0-beta.3"
|
||||
version = "2.0.0-beta.4"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"fakeshadow <24548779@qq.com>",
|
||||
@@ -9,7 +9,6 @@ description = "General purpose TCP server built for the Actix ecosystem"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-server"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
@@ -22,7 +21,6 @@ path = "src/lib.rs"
|
||||
default = []
|
||||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = { version = "2.0.0", default-features = false }
|
||||
actix-service = "2.0.0-beta.5"
|
||||
actix-utils = "3.0.0-beta.2"
|
||||
@@ -35,7 +33,9 @@ slab = "0.4"
|
||||
tokio = { version = "1.2", features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = "2.0.0"
|
||||
|
||||
bytes = "1"
|
||||
env_logger = "0.8"
|
||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||
|
@@ -187,38 +187,26 @@ impl Accept {
|
||||
let mut guard = self.waker.guard();
|
||||
match guard.pop_front() {
|
||||
// worker notify it becomes available. we may want to recover
|
||||
// from backpressure.
|
||||
// from backpressure.
|
||||
Some(WakerInterest::WorkerAvailable) => {
|
||||
drop(guard);
|
||||
self.maybe_backpressure(&mut sockets, false);
|
||||
}
|
||||
// a new worker thread is made and it's handle would be added
|
||||
// to Accept
|
||||
// a new worker thread is made and it's handle would be added to Accept
|
||||
Some(WakerInterest::Worker(handle)) => {
|
||||
drop(guard);
|
||||
// maybe we want to recover from a backpressure.
|
||||
self.maybe_backpressure(&mut sockets, false);
|
||||
self.handles.push(handle);
|
||||
}
|
||||
// got timer interest and it's time to try register socket(s)
|
||||
// again.
|
||||
// got timer interest and it's time to try register socket(s) again
|
||||
Some(WakerInterest::Timer) => {
|
||||
drop(guard);
|
||||
self.process_timer(&mut sockets)
|
||||
}
|
||||
Some(WakerInterest::Pause) => {
|
||||
drop(guard);
|
||||
sockets.iter_mut().for_each(|(_, info)| {
|
||||
match self.deregister(info) {
|
||||
Ok(_) => info!(
|
||||
"Paused accepting connections on {}",
|
||||
info.addr
|
||||
),
|
||||
Err(e) => {
|
||||
error!("Can not deregister server socket {}", e)
|
||||
}
|
||||
}
|
||||
});
|
||||
self.deregister_all(&mut sockets);
|
||||
}
|
||||
Some(WakerInterest::Resume) => {
|
||||
drop(guard);
|
||||
@@ -248,16 +236,23 @@ impl Accept {
|
||||
|
||||
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
|
||||
let now = Instant::now();
|
||||
sockets.iter_mut().for_each(|(token, info)| {
|
||||
// only the ServerSocketInfo have an associate timeout value was de registered.
|
||||
if let Some(inst) = info.timeout.take() {
|
||||
if now > inst {
|
||||
self.register_logged(token, info);
|
||||
} else {
|
||||
sockets
|
||||
.iter_mut()
|
||||
// Only sockets that had an associated timeout were deregistered.
|
||||
.filter(|(_, info)| info.timeout.is_some())
|
||||
.for_each(|(token, info)| {
|
||||
let inst = info.timeout.take().unwrap();
|
||||
|
||||
if now < inst {
|
||||
info.timeout = Some(inst);
|
||||
} else if !self.backpressure {
|
||||
self.register_logged(token, info);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Drop the timeout if server is in backpressure and socket timeout is expired.
|
||||
// When server recovers from backpressure it will register all sockets without
|
||||
// a timeout value so this socket register will be delayed till then.
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
@@ -295,136 +290,145 @@ impl Accept {
|
||||
self.poll.registry().deregister(&mut info.lst)
|
||||
}
|
||||
|
||||
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
|
||||
match self.deregister(info) {
|
||||
Ok(_) => info!("Paused accepting connections on {}", info.addr),
|
||||
Err(e) => {
|
||||
error!("Can not deregister server socket {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
|
||||
sockets.iter_mut().for_each(|(_, info)| {
|
||||
info!("Accepting connections on {} has been paused", info.addr);
|
||||
let _ = self.deregister(info);
|
||||
self.deregister_logged(info);
|
||||
});
|
||||
}
|
||||
|
||||
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
|
||||
if self.backpressure {
|
||||
if !on {
|
||||
// Only operate when server is in a different backpressure than the given flag.
|
||||
if self.backpressure != on {
|
||||
if on {
|
||||
self.backpressure = true;
|
||||
// TODO: figure out if timing out sockets can be safely de-registered twice.
|
||||
self.deregister_all(sockets);
|
||||
} else {
|
||||
self.backpressure = false;
|
||||
for (token, info) in sockets.iter_mut() {
|
||||
if info.timeout.is_some() {
|
||||
// socket will attempt to re-register itself when its timeout completes
|
||||
continue;
|
||||
}
|
||||
self.register_logged(token, info);
|
||||
}
|
||||
sockets
|
||||
.iter_mut()
|
||||
// Only operate on sockets without associated timeout.
|
||||
// Sockets with it will attempt to re-register when their timeout expires.
|
||||
.filter(|(_, info)| info.timeout.is_none())
|
||||
.for_each(|(token, info)| self.register_logged(token, info));
|
||||
}
|
||||
} else if on {
|
||||
self.backpressure = true;
|
||||
self.deregister_all(sockets);
|
||||
}
|
||||
}
|
||||
|
||||
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
|
||||
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
|
||||
if self.backpressure {
|
||||
// send_connection would remove fault worker from handles.
|
||||
// worst case here is conn get dropped after all handles are gone.
|
||||
while !self.handles.is_empty() {
|
||||
match self.handles[self.next].send(msg) {
|
||||
Ok(_) => {
|
||||
self.set_next();
|
||||
break;
|
||||
}
|
||||
Err(tmp) => {
|
||||
// worker lost contact and could be gone. a message is sent to
|
||||
// `ServerBuilder` future to notify it a new worker should be made
|
||||
// after that remove the fault worker
|
||||
self.srv.worker_faulted(self.handles[self.next].idx);
|
||||
msg = tmp;
|
||||
self.handles.swap_remove(self.next);
|
||||
if self.handles.is_empty() {
|
||||
error!("No workers");
|
||||
return;
|
||||
} else if self.handles.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
match self.send_connection(sockets, conn) {
|
||||
Ok(_) => return,
|
||||
Err(c) => conn = c,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Do one round and try to send conn to all workers until it succeed.
|
||||
// Start from self.next.
|
||||
let mut idx = 0;
|
||||
while idx < self.handles.len() {
|
||||
idx += 1;
|
||||
if self.handles[self.next].available() {
|
||||
match self.handles[self.next].send(msg) {
|
||||
Ok(_) => {
|
||||
self.set_next();
|
||||
return;
|
||||
}
|
||||
// worker lost contact and could be gone. a message is sent to
|
||||
// `ServerBuilder` future to notify it a new worker should be made.
|
||||
// after that remove the fault worker and enter backpressure if necessary.
|
||||
Err(tmp) => {
|
||||
self.srv.worker_faulted(self.handles[self.next].idx);
|
||||
msg = tmp;
|
||||
self.handles.swap_remove(self.next);
|
||||
if self.handles.is_empty() {
|
||||
error!("No workers");
|
||||
self.maybe_backpressure(sockets, true);
|
||||
return;
|
||||
} else if self.handles.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
match self.send_connection(sockets, conn) {
|
||||
Ok(_) => return,
|
||||
Err(c) => conn = c,
|
||||
}
|
||||
} else {
|
||||
self.set_next();
|
||||
}
|
||||
self.set_next();
|
||||
}
|
||||
// enable backpressure
|
||||
// Sending Conn failed due to either all workers are in error or not available.
|
||||
// Enter backpressure state and try again.
|
||||
self.maybe_backpressure(sockets, true);
|
||||
self.accept_one(sockets, msg);
|
||||
self.accept_one(sockets, conn);
|
||||
}
|
||||
}
|
||||
|
||||
// set next worker handle that would accept work.
|
||||
// Set next worker handle that would accept work.
|
||||
fn set_next(&mut self) {
|
||||
self.next = (self.next + 1) % self.handles.len();
|
||||
}
|
||||
|
||||
// Send connection to worker and handle error.
|
||||
fn send_connection(
|
||||
&mut self,
|
||||
sockets: &mut Slab<ServerSocketInfo>,
|
||||
conn: Conn,
|
||||
) -> Result<(), Conn> {
|
||||
match self.handles[self.next].send(conn) {
|
||||
Ok(_) => {
|
||||
self.set_next();
|
||||
Ok(())
|
||||
}
|
||||
Err(conn) => {
|
||||
// worker lost contact and could be gone. a message is sent to
|
||||
// `ServerBuilder` future to notify it a new worker should be made.
|
||||
// after that remove the fault worker and enter backpressure if necessary.
|
||||
self.srv.worker_faulted(self.handles[self.next].idx);
|
||||
self.handles.swap_remove(self.next);
|
||||
if self.handles.is_empty() {
|
||||
error!("No workers");
|
||||
self.maybe_backpressure(sockets, true);
|
||||
// All workers are gone and Conn is nowhere to be sent.
|
||||
// Treat this situation as Ok and drop Conn.
|
||||
return Ok(());
|
||||
} else if self.handles.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
Err(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
|
||||
loop {
|
||||
let msg = if let Some(info) = sockets.get_mut(token) {
|
||||
match info.lst.accept() {
|
||||
Ok(Some((io, addr))) => Conn {
|
||||
let info = sockets
|
||||
.get_mut(token)
|
||||
.expect("ServerSocketInfo is removed from Slab");
|
||||
|
||||
match info.lst.accept() {
|
||||
Ok(io) => {
|
||||
let msg = Conn {
|
||||
io,
|
||||
token: info.token,
|
||||
peer: Some(addr),
|
||||
},
|
||||
Ok(None) => return,
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
|
||||
Err(ref e) if connection_error(e) => continue,
|
||||
Err(e) => {
|
||||
// deregister listener temporary
|
||||
error!("Error accepting connection: {}", e);
|
||||
if let Err(err) = self.deregister(info) {
|
||||
error!("Can not deregister server socket {}", err);
|
||||
}
|
||||
|
||||
// sleep after error. write the timeout to socket info as later
|
||||
// the poll would need it mark which socket and when it's
|
||||
// listener should be registered
|
||||
info.timeout = Some(Instant::now() + Duration::from_millis(500));
|
||||
|
||||
// after the sleep a Timer interest is sent to Accept Poll
|
||||
let waker = self.waker.clone();
|
||||
System::current().arbiter().spawn(async move {
|
||||
sleep(Duration::from_millis(510)).await;
|
||||
waker.wake(WakerInterest::Timer);
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.accept_one(sockets, msg);
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
|
||||
Err(ref e) if connection_error(e) => continue,
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
|
||||
self.accept_one(sockets, msg);
|
||||
// deregister listener temporary
|
||||
self.deregister_logged(info);
|
||||
|
||||
// sleep after error. write the timeout to socket info as later
|
||||
// the poll would need it mark which socket and when it's
|
||||
// listener should be registered
|
||||
info.timeout = Some(Instant::now() + Duration::from_millis(500));
|
||||
|
||||
// after the sleep a Timer interest is sent to Accept Poll
|
||||
let waker = self.waker.clone();
|
||||
System::current().arbiter().spawn(async move {
|
||||
sleep(Duration::from_millis(510)).await;
|
||||
waker.wake(WakerInterest::Timer);
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -40,15 +40,11 @@ impl MioListener {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn accept(&self) -> io::Result<Option<(MioStream, SocketAddr)>> {
|
||||
pub(crate) fn accept(&self) -> io::Result<MioStream> {
|
||||
match *self {
|
||||
MioListener::Tcp(ref lst) => lst
|
||||
.accept()
|
||||
.map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))),
|
||||
MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)),
|
||||
#[cfg(unix)]
|
||||
MioListener::Uds(ref lst) => lst
|
||||
.accept()
|
||||
.map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))),
|
||||
MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -14,7 +14,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::service::{BoxedServerService, InternalServiceFactory};
|
||||
use crate::socket::{MioStream, SocketAddr};
|
||||
use crate::socket::MioStream;
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
use crate::{join_all, Token};
|
||||
|
||||
@@ -31,7 +31,6 @@ pub(crate) struct StopCommand {
|
||||
pub(crate) struct Conn {
|
||||
pub io: MioStream,
|
||||
pub token: Token,
|
||||
pub peer: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
|
||||
|
@@ -4,7 +4,8 @@ use std::{net, thread, time};
|
||||
|
||||
use actix_server::Server;
|
||||
use actix_service::fn_service;
|
||||
use futures_util::future::{lazy, ok};
|
||||
use actix_utils::future::ok;
|
||||
use futures_util::future::lazy;
|
||||
|
||||
fn unused_addr() -> net::SocketAddr {
|
||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
@@ -30,6 +31,7 @@ fn test_bind() {
|
||||
.unwrap()
|
||||
.run()
|
||||
}));
|
||||
|
||||
let _ = tx.send((srv, actix_rt::System::current()));
|
||||
let _ = sys.run();
|
||||
});
|
||||
@@ -175,6 +177,7 @@ fn test_configure() {
|
||||
.workers(1)
|
||||
.run()
|
||||
}));
|
||||
|
||||
let _ = tx.send((srv, actix_rt::System::current()));
|
||||
let _ = sys.run();
|
||||
});
|
||||
|
@@ -147,8 +147,8 @@ mod tests {
|
||||
|
||||
forward_ready!(inner);
|
||||
|
||||
fn call(&self, req: ()) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
fn call(&self, _: ()) -> Self::Future {
|
||||
self.inner.call(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -18,7 +18,7 @@ path = "src/lib.rs"
|
||||
[dependencies]
|
||||
actix-service = "2.0.0-beta.5"
|
||||
|
||||
futures-util = { version = "0.3.4", default-features = false }
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
tracing = "0.1"
|
||||
tracing-futures = "0.2"
|
||||
|
||||
|
@@ -1,15 +1,16 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Add `async fn mpsc::Receiver::recv`. [#286]
|
||||
* `SendError` inner field is now public. [#286]
|
||||
* Rename `Dispatcher::{get_sink => tx}`. [#286]
|
||||
* Rename `Dispatcher::{get_ref => service}`. [#286]
|
||||
* Rename `Dispatcher::{get_mut => service_mut}`. [#286]
|
||||
* Rename `Dispatcher::{get_framed => framed}`. [#286]
|
||||
* Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286]
|
||||
|
||||
[#286]: https://github.com/actix/actix-net/pull/286
|
||||
|
||||
## 3.0.0-beta.3 - 2021-04-01
|
||||
* Moved `mpsc` to own crate `local-channel`. [#301]
|
||||
* Moved `task::LocalWaker` to own crate `local-waker`. [#301]
|
||||
* Remove `timeout` module. [#301]
|
||||
* Remove `dispatcher` module. [#301]
|
||||
* Expose `future` mod with `ready` and `poll_fn` helpers. [#301]
|
||||
|
||||
[#301]: https://github.com/actix/actix-net/pull/301
|
||||
|
||||
|
||||
## 3.0.0-beta.2 - 2021-02-06
|
||||
|
@@ -1,12 +1,13 @@
|
||||
[package]
|
||||
name = "actix-utils"
|
||||
version = "3.0.0-beta.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Various network related services and utilities for the Actix ecosystem"
|
||||
version = "3.0.0-beta.3"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
]
|
||||
description = "Utilities for the Actix ecosystem"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-utils"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
@@ -16,14 +17,7 @@ name = "actix_utils"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = { version = "2.0.0", default-features = false }
|
||||
actix-service = "2.0.0-beta.5"
|
||||
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-sink = { version = "0.3.7", default-features = false }
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.2.0"
|
||||
local-waker = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
|
@@ -1,24 +1,18 @@
|
||||
use core::cell::Cell;
|
||||
use core::task;
|
||||
//! Task-notifying counter.
|
||||
|
||||
use core::{cell::Cell, fmt, task};
|
||||
use std::rc::Rc;
|
||||
|
||||
use crate::task::LocalWaker;
|
||||
use local_waker::LocalWaker;
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Simple counter with ability to notify task on reaching specific number
|
||||
///
|
||||
/// Counter could be cloned, total n-count is shared across all clones.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Counter(Rc<CounterInner>);
|
||||
|
||||
struct CounterInner {
|
||||
count: Cell<usize>,
|
||||
capacity: usize,
|
||||
task: LocalWaker,
|
||||
}
|
||||
|
||||
impl Counter {
|
||||
/// Create `Counter` instance and set max value.
|
||||
/// Create `Counter` instance with max value.
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
Counter(Rc::new(CounterInner {
|
||||
capacity,
|
||||
@@ -27,38 +21,26 @@ impl Counter {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get counter guard.
|
||||
/// Create new counter guard, incrementing the counter.
|
||||
pub fn get(&self) -> CounterGuard {
|
||||
CounterGuard::new(self.0.clone())
|
||||
}
|
||||
|
||||
/// Check if counter is not at capacity. If counter at capacity
|
||||
/// it registers notification for current task.
|
||||
/// Notify current task and return true if counter is at capacity.
|
||||
pub fn available(&self, cx: &mut task::Context<'_>) -> bool {
|
||||
self.0.available(cx)
|
||||
}
|
||||
|
||||
/// Get total number of acquired counts
|
||||
/// Get total number of acquired guards.
|
||||
pub fn total(&self) -> usize {
|
||||
self.0.count.get()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CounterGuard(Rc<CounterInner>);
|
||||
|
||||
impl CounterGuard {
|
||||
fn new(inner: Rc<CounterInner>) -> Self {
|
||||
inner.inc();
|
||||
CounterGuard(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl Unpin for CounterGuard {}
|
||||
|
||||
impl Drop for CounterGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.dec();
|
||||
}
|
||||
struct CounterInner {
|
||||
count: Cell<usize>,
|
||||
capacity: usize,
|
||||
task: LocalWaker,
|
||||
}
|
||||
|
||||
impl CounterInner {
|
||||
@@ -83,3 +65,32 @@ impl CounterInner {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for CounterInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Counter")
|
||||
.field("count", &self.count.get())
|
||||
.field("capacity", &self.capacity)
|
||||
.field("task", &self.task)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// An RAII structure that keeps the underlying counter incremented until this guard is dropped.
|
||||
#[derive(Debug)]
|
||||
pub struct CounterGuard(Rc<CounterInner>);
|
||||
|
||||
impl CounterGuard {
|
||||
fn new(inner: Rc<CounterInner>) -> Self {
|
||||
inner.inc();
|
||||
CounterGuard(inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl Unpin for CounterGuard {}
|
||||
|
||||
impl Drop for CounterGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.dec();
|
||||
}
|
||||
}
|
||||
|
@@ -1,336 +0,0 @@
|
||||
//! Framed dispatcher service and related utilities.
|
||||
|
||||
#![allow(type_alias_bounds)]
|
||||
|
||||
use core::future::Future;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
use core::{fmt, mem};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||
use actix_service::{IntoService, Service};
|
||||
use futures_core::stream::Stream;
|
||||
use log::debug;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::mpsc;
|
||||
|
||||
/// Framed transport errors
|
||||
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
|
||||
Service(E),
|
||||
Encoder(<U as Encoder<I>>::Error),
|
||||
Decoder(<U as Decoder>::Error),
|
||||
}
|
||||
|
||||
impl<E, U: Encoder<I> + Decoder, I> From<E> for DispatcherError<E, U, I> {
|
||||
fn from(err: E) -> Self {
|
||||
DispatcherError::Service(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, U: Encoder<I> + Decoder, I> fmt::Debug for DispatcherError<E, U, I>
|
||||
where
|
||||
E: fmt::Debug,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
<U as Decoder>::Error: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
DispatcherError::Service(ref e) => write!(fmt, "DispatcherError::Service({:?})", e),
|
||||
DispatcherError::Encoder(ref e) => write!(fmt, "DispatcherError::Encoder({:?})", e),
|
||||
DispatcherError::Decoder(ref e) => write!(fmt, "DispatcherError::Decoder({:?})", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, U: Encoder<I> + Decoder, I> fmt::Display for DispatcherError<E, U, I>
|
||||
where
|
||||
E: fmt::Display,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
<U as Decoder>::Error: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
DispatcherError::Service(ref e) => write!(fmt, "{}", e),
|
||||
DispatcherError::Encoder(ref e) => write!(fmt, "{:?}", e),
|
||||
DispatcherError::Decoder(ref e) => write!(fmt, "{:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Message<T> {
|
||||
Item(T),
|
||||
Close,
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// Dispatcher is a future that reads frames from Framed object
|
||||
/// and passes them to the service.
|
||||
pub struct Dispatcher<S, T, U, I>
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
S::Error: 'static,
|
||||
S::Future: 'static,
|
||||
T: AsyncRead,
|
||||
T: AsyncWrite,
|
||||
U: Encoder<I>,
|
||||
U: Decoder,
|
||||
I: 'static,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
{
|
||||
service: S,
|
||||
state: State<S, U, I>,
|
||||
#[pin]
|
||||
framed: Framed<T, U>,
|
||||
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
|
||||
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
|
||||
}
|
||||
}
|
||||
|
||||
enum State<S, U, I>
|
||||
where
|
||||
S: Service<<U as Decoder>::Item>,
|
||||
U: Encoder<I> + Decoder,
|
||||
{
|
||||
Processing,
|
||||
Error(DispatcherError<S::Error, U, I>),
|
||||
FramedError(DispatcherError<S::Error, U, I>),
|
||||
FlushAndStop,
|
||||
Stopping,
|
||||
}
|
||||
|
||||
impl<S, U, I> State<S, U, I>
|
||||
where
|
||||
S: Service<<U as Decoder>::Item>,
|
||||
U: Encoder<I> + Decoder,
|
||||
{
|
||||
fn take_error(&mut self) -> DispatcherError<S::Error, U, I> {
|
||||
match mem::replace(self, State::Processing) {
|
||||
State::Error(err) => err,
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U, I> {
|
||||
match mem::replace(self, State::Processing) {
|
||||
State::FramedError(err) => err,
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, U, I> Dispatcher<S, T, U, I>
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
S::Error: 'static,
|
||||
S::Future: 'static,
|
||||
T: AsyncRead + AsyncWrite,
|
||||
U: Decoder + Encoder<I>,
|
||||
I: 'static,
|
||||
<U as Decoder>::Error: fmt::Debug,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
{
|
||||
pub fn new<F>(framed: Framed<T, U>, service: F) -> Self
|
||||
where
|
||||
F: IntoService<S, <U as Decoder>::Item>,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel();
|
||||
Dispatcher {
|
||||
framed,
|
||||
rx,
|
||||
tx,
|
||||
service: service.into_service(),
|
||||
state: State::Processing,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new `Dispatcher` instance with customer `mpsc::Receiver`
|
||||
pub fn with_rx<F>(
|
||||
framed: Framed<T, U>,
|
||||
service: F,
|
||||
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
|
||||
) -> Self
|
||||
where
|
||||
F: IntoService<S, <U as Decoder>::Item>,
|
||||
{
|
||||
let tx = rx.sender();
|
||||
Dispatcher {
|
||||
framed,
|
||||
rx,
|
||||
tx,
|
||||
service: service.into_service(),
|
||||
state: State::Processing,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get sender handle.
|
||||
pub fn tx(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
|
||||
self.tx.clone()
|
||||
}
|
||||
|
||||
/// Get reference to a service wrapped by `Dispatcher` instance.
|
||||
pub fn service(&self) -> &S {
|
||||
&self.service
|
||||
}
|
||||
|
||||
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
|
||||
pub fn service_mut(&mut self) -> &mut S {
|
||||
&mut self.service
|
||||
}
|
||||
|
||||
/// Get reference to a framed instance wrapped by `Dispatcher` instance.
|
||||
pub fn framed(&self) -> &Framed<T, U> {
|
||||
&self.framed
|
||||
}
|
||||
|
||||
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
|
||||
pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
|
||||
&mut self.framed
|
||||
}
|
||||
|
||||
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
S::Error: 'static,
|
||||
S::Future: 'static,
|
||||
T: AsyncRead + AsyncWrite,
|
||||
U: Decoder + Encoder<I>,
|
||||
I: 'static,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
{
|
||||
loop {
|
||||
let this = self.as_mut().project();
|
||||
match this.service.poll_ready(cx) {
|
||||
Poll::Ready(Ok(_)) => {
|
||||
let item = match this.framed.next_item(cx) {
|
||||
Poll::Ready(Some(Ok(el))) => el,
|
||||
Poll::Ready(Some(Err(err))) => {
|
||||
*this.state = State::FramedError(DispatcherError::Decoder(err));
|
||||
return true;
|
||||
}
|
||||
Poll::Pending => return false,
|
||||
Poll::Ready(None) => {
|
||||
*this.state = State::Stopping;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
let tx = this.tx.clone();
|
||||
let fut = this.service.call(item);
|
||||
actix_rt::spawn(async move {
|
||||
let item = fut.await;
|
||||
let _ = tx.send(item.map(Message::Item));
|
||||
});
|
||||
}
|
||||
Poll::Pending => return false,
|
||||
Poll::Ready(Err(err)) => {
|
||||
*this.state = State::Error(DispatcherError::Service(err));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// write to framed object
|
||||
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
S::Error: 'static,
|
||||
S::Future: 'static,
|
||||
T: AsyncRead + AsyncWrite,
|
||||
U: Decoder + Encoder<I>,
|
||||
I: 'static,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
{
|
||||
loop {
|
||||
let mut this = self.as_mut().project();
|
||||
while !this.framed.is_write_buf_full() {
|
||||
match Pin::new(&mut this.rx).poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
|
||||
if let Err(err) = this.framed.as_mut().write(msg) {
|
||||
*this.state = State::FramedError(DispatcherError::Encoder(err));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(Message::Close))) => {
|
||||
*this.state = State::FlushAndStop;
|
||||
return true;
|
||||
}
|
||||
Poll::Ready(Some(Err(err))) => {
|
||||
*this.state = State::Error(DispatcherError::Service(err));
|
||||
return true;
|
||||
}
|
||||
Poll::Ready(None) | Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
|
||||
if !this.framed.is_write_buf_empty() {
|
||||
match this.framed.flush(cx) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(Ok(_)) => {}
|
||||
Poll::Ready(Err(err)) => {
|
||||
debug!("Error sending data: {:?}", err);
|
||||
*this.state = State::FramedError(DispatcherError::Encoder(err));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, U, I> Future for Dispatcher<S, T, U, I>
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
S::Error: 'static,
|
||||
S::Future: 'static,
|
||||
T: AsyncRead + AsyncWrite,
|
||||
U: Decoder + Encoder<I>,
|
||||
I: 'static,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
<U as Decoder>::Error: fmt::Debug,
|
||||
{
|
||||
type Output = Result<(), DispatcherError<S::Error, U, I>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
return match this.state {
|
||||
State::Processing => {
|
||||
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
|
||||
continue;
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
State::Error(_) => {
|
||||
// flush write buffer
|
||||
if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(Err(this.state.take_error()))
|
||||
}
|
||||
State::FlushAndStop => {
|
||||
if !this.framed.is_write_buf_empty() {
|
||||
this.framed.flush(cx).map(|res| {
|
||||
if let Err(err) = res {
|
||||
debug!("Error sending data: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
|
||||
State::Stopping => Poll::Ready(Ok(())),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
7
actix-utils/src/future/mod.rs
Normal file
7
actix-utils/src/future/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
//! Asynchronous values.
|
||||
|
||||
mod poll_fn;
|
||||
mod ready;
|
||||
|
||||
pub use self::poll_fn::{poll_fn, PollFn};
|
||||
pub use self::ready::{err, ok, ready, Ready};
|
@@ -3,20 +3,20 @@
|
||||
use core::{
|
||||
fmt,
|
||||
future::Future,
|
||||
task::{self, Poll},
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Create a future driven by the provided function that receives a task context.
|
||||
pub(crate) fn poll_fn<F, T>(f: F) -> PollFn<F>
|
||||
pub fn poll_fn<F, T>(f: F) -> PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut task::Context<'_>) -> Poll<T>,
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
PollFn { f }
|
||||
}
|
||||
|
||||
/// A Future driven by the inner function.
|
||||
pub(crate) struct PollFn<F> {
|
||||
pub struct PollFn<F> {
|
||||
f: F,
|
||||
}
|
||||
|
||||
@@ -30,11 +30,11 @@ impl<F> fmt::Debug for PollFn<F> {
|
||||
|
||||
impl<F, T> Future for PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut task::Context<'_>) -> task::Poll<T>,
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
(self.f)(cx)
|
||||
}
|
||||
}
|
122
actix-utils/src/future/ready.rs
Normal file
122
actix-utils/src/future/ready.rs
Normal file
@@ -0,0 +1,122 @@
|
||||
//! When MSRV is 1.48, replace with `core::future::Ready` and `core::future::ready()`.
|
||||
|
||||
use core::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// Future for the [`ready`](ready()) function.
|
||||
///
|
||||
/// Panic will occur if polled more than once.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// use actix_utils::future::ready;
|
||||
///
|
||||
/// // async
|
||||
/// # async fn run() {
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.await, 1);
|
||||
/// # }
|
||||
///
|
||||
/// // sync
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.into_inner(), 1);
|
||||
/// ```
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct Ready<T> {
|
||||
val: Option<T>,
|
||||
}
|
||||
|
||||
impl<T> Ready<T> {
|
||||
/// Unwraps the value from this immediately ready future.
|
||||
#[inline]
|
||||
pub fn into_inner(mut self) -> T {
|
||||
self.val.take().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Unpin for Ready<T> {}
|
||||
|
||||
impl<T> Future for Ready<T> {
|
||||
type Output = T;
|
||||
|
||||
#[inline]
|
||||
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
|
||||
let val = self.val.take().expect("Ready polled after completion");
|
||||
Poll::Ready(val)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a future that is immediately ready with a value.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_utils::future::ready;
|
||||
///
|
||||
/// # async fn run() {
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.await, 1);
|
||||
/// # }
|
||||
///
|
||||
/// // sync
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.into_inner(), 1);
|
||||
/// ```
|
||||
pub fn ready<T>(val: T) -> Ready<T> {
|
||||
Ready { val: Some(val) }
|
||||
}
|
||||
|
||||
/// Create a future that is immediately ready with a success value.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_utils::future::ok;
|
||||
///
|
||||
/// # async fn run() {
|
||||
/// let a = ok::<_, ()>(1);
|
||||
/// assert_eq!(a.await, Ok(1));
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn ok<T, E>(val: T) -> Ready<Result<T, E>> {
|
||||
Ready { val: Some(Ok(val)) }
|
||||
}
|
||||
|
||||
/// Create a future that is immediately ready with an error value.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_utils::future::err;
|
||||
///
|
||||
/// # async fn run() {
|
||||
/// let a = err::<(), _>(1);
|
||||
/// assert_eq!(a.await, Err(1));
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
|
||||
Ready {
|
||||
val: Some(Err(err)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures_util::task::noop_waker;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn multiple_poll_panics() {
|
||||
let waker = noop_waker();
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
|
||||
let mut ready = ready(1);
|
||||
assert_eq!(Pin::new(&mut ready).poll(&mut cx), Poll::Ready(1));
|
||||
|
||||
// panic!
|
||||
let _ = Pin::new(&mut ready).poll(&mut cx);
|
||||
}
|
||||
}
|
@@ -1,15 +1,9 @@
|
||||
//! Various network related services and utilities for the Actix ecosystem.
|
||||
//! Various utilities for the Actix ecosystem.
|
||||
|
||||
#![deny(rust_2018_idioms, nonstandard_style)]
|
||||
#![allow(clippy::type_complexity)]
|
||||
#![warn(missing_docs)]
|
||||
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
|
||||
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
||||
|
||||
pub mod counter;
|
||||
pub mod dispatcher;
|
||||
pub mod mpsc;
|
||||
mod poll_fn;
|
||||
pub mod task;
|
||||
pub mod timeout;
|
||||
|
||||
use self::poll_fn::poll_fn;
|
||||
pub mod future;
|
||||
|
@@ -1,255 +0,0 @@
|
||||
//! Service that applies a timeout to requests.
|
||||
//!
|
||||
//! If the response does not complete within the specified timeout, the response will be aborted.
|
||||
|
||||
use core::future::Future;
|
||||
use core::marker::PhantomData;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
use core::{fmt, time};
|
||||
|
||||
use actix_rt::time::{sleep, Sleep};
|
||||
use actix_service::{IntoService, Service, Transform};
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
/// Applies a timeout to requests.
|
||||
#[derive(Debug)]
|
||||
pub struct Timeout<E = ()> {
|
||||
timeout: time::Duration,
|
||||
_t: PhantomData<E>,
|
||||
}
|
||||
|
||||
/// Timeout error
|
||||
pub enum TimeoutError<E> {
|
||||
/// Service error
|
||||
Service(E),
|
||||
/// Service call timeout
|
||||
Timeout,
|
||||
}
|
||||
|
||||
impl<E> From<E> for TimeoutError<E> {
|
||||
fn from(err: E) -> Self {
|
||||
TimeoutError::Service(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
TimeoutError::Service(e) => write!(f, "TimeoutError::Service({:?})", e),
|
||||
TimeoutError::Timeout => write!(f, "TimeoutError::Timeout"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: fmt::Display> fmt::Display for TimeoutError<E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
TimeoutError::Service(e) => e.fmt(f),
|
||||
TimeoutError::Timeout => write!(f, "Service call timeout"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: PartialEq> PartialEq for TimeoutError<E> {
|
||||
fn eq(&self, other: &TimeoutError<E>) -> bool {
|
||||
match self {
|
||||
TimeoutError::Service(e1) => match other {
|
||||
TimeoutError::Service(e2) => e1 == e2,
|
||||
TimeoutError::Timeout => false,
|
||||
},
|
||||
TimeoutError::Timeout => matches!(other, TimeoutError::Timeout),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> Timeout<E> {
|
||||
pub fn new(timeout: time::Duration) -> Self {
|
||||
Timeout {
|
||||
timeout,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> Clone for Timeout<E> {
|
||||
fn clone(&self) -> Self {
|
||||
Timeout::new(self.timeout)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, E, Req> Transform<S, Req> for Timeout<E>
|
||||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = TimeoutError<S::Error>;
|
||||
type Transform = TimeoutService<S, Req>;
|
||||
type InitError = E;
|
||||
type Future = TimeoutFuture<Self::Transform, Self::InitError>;
|
||||
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
let service = TimeoutService {
|
||||
service,
|
||||
timeout: self.timeout,
|
||||
_phantom: PhantomData,
|
||||
};
|
||||
|
||||
TimeoutFuture {
|
||||
service: Some(service),
|
||||
_err: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TimeoutFuture<T, E> {
|
||||
service: Option<T>,
|
||||
_err: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<T, E> Unpin for TimeoutFuture<T, E> {}
|
||||
|
||||
impl<T, E> Future for TimeoutFuture<T, E> {
|
||||
type Output = Result<T, E>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Poll::Ready(Ok(self.get_mut().service.take().unwrap()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies a timeout to requests.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TimeoutService<S, Req> {
|
||||
service: S,
|
||||
timeout: time::Duration,
|
||||
_phantom: PhantomData<Req>,
|
||||
}
|
||||
|
||||
impl<S, Req> TimeoutService<S, Req>
|
||||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
pub fn new<U>(timeout: time::Duration, service: U) -> Self
|
||||
where
|
||||
U: IntoService<S, Req>,
|
||||
{
|
||||
TimeoutService {
|
||||
timeout,
|
||||
service: service.into_service(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Req> Service<Req> for TimeoutService<S, Req>
|
||||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = TimeoutError<S::Error>;
|
||||
type Future = TimeoutServiceResponse<S, Req>;
|
||||
|
||||
actix_service::forward_ready!(service);
|
||||
|
||||
fn call(&self, request: Req) -> Self::Future {
|
||||
TimeoutServiceResponse {
|
||||
fut: self.service.call(request),
|
||||
sleep: sleep(self.timeout),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// `TimeoutService` response future
|
||||
#[derive(Debug)]
|
||||
pub struct TimeoutServiceResponse<S, Req>
|
||||
where
|
||||
S: Service<Req>
|
||||
{
|
||||
#[pin]
|
||||
fut: S::Future,
|
||||
#[pin]
|
||||
sleep: Sleep,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Req> Future for TimeoutServiceResponse<S, Req>
|
||||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
type Output = Result<S::Response, TimeoutError<S::Error>>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
// First, try polling the future
|
||||
if let Poll::Ready(res) = this.fut.poll(cx) {
|
||||
return match res {
|
||||
Ok(v) => Poll::Ready(Ok(v)),
|
||||
Err(e) => Poll::Ready(Err(TimeoutError::Service(e))),
|
||||
};
|
||||
}
|
||||
|
||||
// Now check the sleep
|
||||
this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::time::Duration;
|
||||
|
||||
use super::*;
|
||||
use actix_service::{apply, fn_factory, Service, ServiceFactory};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
||||
struct SleepService(Duration);
|
||||
|
||||
impl Service<()> for SleepService {
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = LocalBoxFuture<'static, Result<(), ()>>;
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&self, _: ()) -> Self::Future {
|
||||
let sleep = actix_rt::time::sleep(self.0);
|
||||
Box::pin(async move {
|
||||
sleep.await;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_success() {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(50);
|
||||
|
||||
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
||||
assert_eq!(timeout.call(()).await, Ok(()));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_timeout() {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(500);
|
||||
|
||||
let timeout = TimeoutService::new(resolution, SleepService(wait_time));
|
||||
assert_eq!(timeout.call(()).await, Err(TimeoutError::Timeout));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_timeout_new_service() {
|
||||
let resolution = Duration::from_millis(100);
|
||||
let wait_time = Duration::from_millis(500);
|
||||
|
||||
let timeout = apply(
|
||||
Timeout::new(resolution),
|
||||
fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }),
|
||||
);
|
||||
let srv = timeout.new_service(&()).await.unwrap();
|
||||
|
||||
assert_eq!(srv.call(()).await, Err(TimeoutError::Timeout));
|
||||
}
|
||||
}
|
7
local-channel/CHANGES.md
Normal file
7
local-channel/CHANGES.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 0.1.1 - 2021-03-29
|
||||
* Move local mpsc channel to it's own crate.
|
21
local-channel/Cargo.toml
Normal file
21
local-channel/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "local-channel"
|
||||
version = "0.1.2"
|
||||
description = "A non-threadsafe multi-producer, single-consumer, futures-aware, FIFO queue"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
]
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
keywords = ["channel", "local", "futures"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-sink = { version = "0.3.7", default-features = false }
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
local-waker = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["rt", "macros"] }
|
3
local-channel/src/lib.rs
Normal file
3
local-channel/src/lib.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
//! Non-thread-safe channels.
|
||||
|
||||
pub mod mpsc;
|
@@ -1,4 +1,4 @@
|
||||
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
|
||||
//! A non-thread-safe multi-producer, single-consumer, futures-aware, FIFO queue.
|
||||
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
@@ -11,8 +11,8 @@ use std::{collections::VecDeque, error::Error, rc::Rc};
|
||||
|
||||
use futures_core::stream::Stream;
|
||||
use futures_sink::Sink;
|
||||
|
||||
use crate::{poll_fn, task::LocalWaker};
|
||||
use futures_util::future::poll_fn;
|
||||
use local_waker::LocalWaker;
|
||||
|
||||
/// Creates a unbounded in-memory channel with buffered storage.
|
||||
///
|
||||
@@ -174,6 +174,8 @@ impl<T> Drop for Receiver<T> {
|
||||
}
|
||||
|
||||
/// Error returned when attempting to send after the channels' [Receiver] is dropped or closed.
|
||||
///
|
||||
/// Allows access to message that failed to send with [`into_inner`](Self::into_inner).
|
||||
pub struct SendError<T>(pub T);
|
||||
|
||||
impl<T> SendError<T> {
|
||||
@@ -199,11 +201,11 @@ impl<T> Error for SendError<T> {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures_util::future::lazy;
|
||||
use futures_util::{stream::Stream, StreamExt};
|
||||
use futures_util::{future::lazy, StreamExt as _};
|
||||
|
||||
#[actix_rt::test]
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mpsc() {
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
@@ -237,7 +239,7 @@ mod tests {
|
||||
assert!(tx2.send("test").is_err());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
#[tokio::test]
|
||||
async fn test_recv() {
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
11
local-waker/CHANGES.md
Normal file
11
local-waker/CHANGES.md
Normal file
@@ -0,0 +1,11 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 0.1.2 - 2021-04-01
|
||||
* Fix crate metadata.
|
||||
|
||||
|
||||
## 0.1.1 - 2021-03-29
|
||||
* Move `LocalWaker` to it's own crate.
|
15
local-waker/Cargo.toml
Normal file
15
local-waker/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "local-waker"
|
||||
version = "0.1.1"
|
||||
description = "A synchronization primitive for thread-local task wakeup"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
]
|
||||
keywords = ["waker", "local", "futures", "no-std"]
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
categories = ["asynchronous", "no-std"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
@@ -1,3 +1,9 @@
|
||||
//! A synchronization primitive for thread-local task wakeup.
|
||||
//!
|
||||
//! See docs for [`LocalWaker`].
|
||||
|
||||
#![no_std]
|
||||
|
||||
use core::{cell::Cell, fmt, marker::PhantomData, task::Waker};
|
||||
|
||||
/// A synchronization primitive for task wakeup.
|
Reference in New Issue
Block a user