From 8becb0db70765401d4c608cfcf385f426ca4a020 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 30 Mar 2021 13:39:10 +0100 Subject: [PATCH] refactor crates for better api stability (#301) --- .cargo/config.toml | 3 + Cargo.toml | 4 + actix-macros/Cargo.toml | 2 +- actix-server/Cargo.toml | 3 +- actix-server/tests/test_server.rs | 5 +- actix-service/src/macros.rs | 4 +- actix-tracing/Cargo.toml | 2 +- actix-utils/CHANGES.md | 14 +- actix-utils/Cargo.toml | 9 +- actix-utils/src/counter.rs | 6 +- actix-utils/src/dispatcher.rs | 336 ------------------ actix-utils/src/future/mod.rs | 7 + actix-utils/src/{ => future}/poll_fn.rs | 14 +- actix-utils/src/future/ready.rs | 122 +++++++ actix-utils/src/lib.rs | 8 +- actix-utils/src/timeout.rs | 255 ------------- local-channel/CHANGES.md | 7 + local-channel/Cargo.toml | 22 ++ local-channel/src/lib.rs | 3 + {actix-utils => local-channel}/src/mpsc.rs | 18 +- local-waker/CHANGES.md | 7 + local-waker/Cargo.toml | 16 + .../src/task.rs => local-waker/src/lib.rs | 6 + 23 files changed, 235 insertions(+), 638 deletions(-) create mode 100644 .cargo/config.toml delete mode 100644 actix-utils/src/dispatcher.rs create mode 100644 actix-utils/src/future/mod.rs rename actix-utils/src/{ => future}/poll_fn.rs (76%) create mode 100644 actix-utils/src/future/ready.rs delete mode 100644 actix-utils/src/timeout.rs create mode 100644 local-channel/CHANGES.md create mode 100644 local-channel/Cargo.toml create mode 100644 local-channel/src/lib.rs rename {actix-utils => local-channel}/src/mpsc.rs (95%) create mode 100644 local-waker/CHANGES.md create mode 100644 local-waker/Cargo.toml rename actix-utils/src/task.rs => local-waker/src/lib.rs (95%) diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..77788410 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[alias] +lint = "hack --clean-per-run clippy --workspace --tests --examples" +chk = "hack check --workspace --tests --examples" diff --git a/Cargo.toml b/Cargo.toml index 78e54d35..5bf72300 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ members = [ "actix-tracing", "actix-utils", "bytestring", + "local-channel", + "local-waker", ] [patch.crates-io] @@ -23,3 +25,5 @@ actix-tls = { path = "actix-tls" } actix-tracing = { path = "actix-tracing" } actix-utils = { path = "actix-utils" } bytestring = { path = "bytestring" } +local-channel = { path = "local-channel" } +local-waker = { path = "local-waker" } diff --git a/actix-macros/Cargo.toml b/actix-macros/Cargo.toml index 0555f990..1664fc27 100644 --- a/actix-macros/Cargo.toml +++ b/actix-macros/Cargo.toml @@ -19,5 +19,5 @@ syn = { version = "^1", features = ["full"] } [dev-dependencies] actix-rt = "2.0.0" -futures-util = { version = "0.3", default-features = false } +futures-util = { version = "0.3.7", default-features = false } trybuild = "1" diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 6d763d79..620cbf51 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -22,7 +22,6 @@ path = "src/lib.rs" default = [] [dependencies] -actix-codec = "0.4.0-beta.1" actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.5" actix-utils = "3.0.0-beta.2" @@ -35,7 +34,9 @@ slab = "0.4" tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] +actix-codec = "0.4.0-beta.1" actix-rt = "2.0.0" + bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 86ec25e6..6d413eea 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -4,7 +4,8 @@ use std::{net, thread, time}; use actix_server::Server; use actix_service::fn_service; -use futures_util::future::{lazy, ok}; +use actix_utils::future::ok; +use futures_util::future::lazy; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); @@ -30,6 +31,7 @@ fn test_bind() { .unwrap() .run() })); + let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); @@ -175,6 +177,7 @@ fn test_configure() { .workers(1) .run() })); + let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index 4a083895..d2ae9dbf 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -147,8 +147,8 @@ mod tests { forward_ready!(inner); - fn call(&self, req: ()) -> Self::Future { - self.inner.call(req) + fn call(&self, _: ()) -> Self::Future { + self.inner.call(()) } } diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml index 7f043f4b..992edbf4 100644 --- a/actix-tracing/Cargo.toml +++ b/actix-tracing/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] actix-service = "2.0.0-beta.5" -futures-util = { version = "0.3.4", default-features = false } +futures-util = { version = "0.3.7", default-features = false } tracing = "0.1" tracing-futures = "0.2" diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 8d97b741..c911a211 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,15 +1,13 @@ # Changes ## Unreleased - 2021-xx-xx -* Add `async fn mpsc::Receiver::recv`. [#286] -* `SendError` inner field is now public. [#286] -* Rename `Dispatcher::{get_sink => tx}`. [#286] -* Rename `Dispatcher::{get_ref => service}`. [#286] -* Rename `Dispatcher::{get_mut => service_mut}`. [#286] -* Rename `Dispatcher::{get_framed => framed}`. [#286] -* Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286] +* Moved `mpsc` to own crate `local-channel`. [#301] +* Moved `task::LocalWaker` to own crate `local-waker`. [#301] +* Remove `timeout` module. [#301] +* Remove `dispatcher` module. [#301] +* Expose `future` mod with `ready` and `poll_fn` helpers. [#301] -[#286]: https://github.com/actix/actix-net/pull/286 +[#301]: https://github.com/actix/actix-net/pull/301 ## 3.0.0-beta.2 - 2021-02-06 diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 9c21dd1b..02bc3114 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -16,14 +16,7 @@ name = "actix_utils" path = "src/lib.rs" [dependencies] -actix-codec = "0.4.0-beta.1" -actix-rt = { version = "2.0.0", default-features = false } -actix-service = "2.0.0-beta.5" - -futures-core = { version = "0.3.7", default-features = false } -futures-sink = { version = "0.3.7", default-features = false } -log = "0.4" -pin-project-lite = "0.2.0" +local-waker = "0.1" [dev-dependencies] actix-rt = "2.0.0" diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index 0b5984d2..c0926b73 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -1,9 +1,9 @@ -use core::cell::Cell; -use core::task; +//! Task-notifying counter. +use core::{cell::Cell, task}; use std::rc::Rc; -use crate::task::LocalWaker; +use local_waker::LocalWaker; #[derive(Clone)] /// Simple counter with ability to notify task on reaching specific number diff --git a/actix-utils/src/dispatcher.rs b/actix-utils/src/dispatcher.rs deleted file mode 100644 index 94ac9971..00000000 --- a/actix-utils/src/dispatcher.rs +++ /dev/null @@ -1,336 +0,0 @@ -//! Framed dispatcher service and related utilities. - -#![allow(type_alias_bounds)] - -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; -use core::{fmt, mem}; - -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_service::{IntoService, Service}; -use futures_core::stream::Stream; -use log::debug; -use pin_project_lite::pin_project; - -use crate::mpsc; - -/// Framed transport errors -pub enum DispatcherError + Decoder, I> { - Service(E), - Encoder(>::Error), - Decoder(::Error), -} - -impl + Decoder, I> From for DispatcherError { - fn from(err: E) -> Self { - DispatcherError::Service(err) - } -} - -impl + Decoder, I> fmt::Debug for DispatcherError -where - E: fmt::Debug, - >::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - DispatcherError::Service(ref e) => write!(fmt, "DispatcherError::Service({:?})", e), - DispatcherError::Encoder(ref e) => write!(fmt, "DispatcherError::Encoder({:?})", e), - DispatcherError::Decoder(ref e) => write!(fmt, "DispatcherError::Decoder({:?})", e), - } - } -} - -impl + Decoder, I> fmt::Display for DispatcherError -where - E: fmt::Display, - >::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - DispatcherError::Service(ref e) => write!(fmt, "{}", e), - DispatcherError::Encoder(ref e) => write!(fmt, "{:?}", e), - DispatcherError::Decoder(ref e) => write!(fmt, "{:?}", e), - } - } -} - -pub enum Message { - Item(T), - Close, -} - -pin_project! { - /// Dispatcher is a future that reads frames from Framed object - /// and passes them to the service. - pub struct Dispatcher - where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead, - T: AsyncWrite, - U: Encoder, - U: Decoder, - I: 'static, - >::Error: fmt::Debug, - { - service: S, - state: State, - #[pin] - framed: Framed, - rx: mpsc::Receiver, S::Error>>, - tx: mpsc::Sender, S::Error>>, - } -} - -enum State -where - S: Service<::Item>, - U: Encoder + Decoder, -{ - Processing, - Error(DispatcherError), - FramedError(DispatcherError), - FlushAndStop, - Stopping, -} - -impl State -where - S: Service<::Item>, - U: Encoder + Decoder, -{ - fn take_error(&mut self) -> DispatcherError { - match mem::replace(self, State::Processing) { - State::Error(err) => err, - _ => panic!(), - } - } - - fn take_framed_error(&mut self) -> DispatcherError { - match mem::replace(self, State::Processing) { - State::FramedError(err) => err, - _ => panic!(), - } - } -} - -impl Dispatcher -where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - ::Error: fmt::Debug, - >::Error: fmt::Debug, -{ - pub fn new(framed: Framed, service: F) -> Self - where - F: IntoService::Item>, - { - let (tx, rx) = mpsc::channel(); - Dispatcher { - framed, - rx, - tx, - service: service.into_service(), - state: State::Processing, - } - } - - /// Construct new `Dispatcher` instance with customer `mpsc::Receiver` - pub fn with_rx( - framed: Framed, - service: F, - rx: mpsc::Receiver, S::Error>>, - ) -> Self - where - F: IntoService::Item>, - { - let tx = rx.sender(); - Dispatcher { - framed, - rx, - tx, - service: service.into_service(), - state: State::Processing, - } - } - - /// Get sender handle. - pub fn tx(&self) -> mpsc::Sender, S::Error>> { - self.tx.clone() - } - - /// Get reference to a service wrapped by `Dispatcher` instance. - pub fn service(&self) -> &S { - &self.service - } - - /// Get mutable reference to a service wrapped by `Dispatcher` instance. - pub fn service_mut(&mut self) -> &mut S { - &mut self.service - } - - /// Get reference to a framed instance wrapped by `Dispatcher` instance. - pub fn framed(&self) -> &Framed { - &self.framed - } - - /// Get mutable reference to a framed instance wrapped by `Dispatcher` instance. - pub fn framed_mut(&mut self) -> &mut Framed { - &mut self.framed - } - - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool - where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - >::Error: fmt::Debug, - { - loop { - let this = self.as_mut().project(); - match this.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - let item = match this.framed.next_item(cx) { - Poll::Ready(Some(Ok(el))) => el, - Poll::Ready(Some(Err(err))) => { - *this.state = State::FramedError(DispatcherError::Decoder(err)); - return true; - } - Poll::Pending => return false, - Poll::Ready(None) => { - *this.state = State::Stopping; - return true; - } - }; - - let tx = this.tx.clone(); - let fut = this.service.call(item); - actix_rt::spawn(async move { - let item = fut.await; - let _ = tx.send(item.map(Message::Item)); - }); - } - Poll::Pending => return false, - Poll::Ready(Err(err)) => { - *this.state = State::Error(DispatcherError::Service(err)); - return true; - } - } - } - } - - /// write to framed object - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool - where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - >::Error: fmt::Debug, - { - loop { - let mut this = self.as_mut().project(); - while !this.framed.is_write_buf_full() { - match Pin::new(&mut this.rx).poll_next(cx) { - Poll::Ready(Some(Ok(Message::Item(msg)))) => { - if let Err(err) = this.framed.as_mut().write(msg) { - *this.state = State::FramedError(DispatcherError::Encoder(err)); - return true; - } - } - Poll::Ready(Some(Ok(Message::Close))) => { - *this.state = State::FlushAndStop; - return true; - } - Poll::Ready(Some(Err(err))) => { - *this.state = State::Error(DispatcherError::Service(err)); - return true; - } - Poll::Ready(None) | Poll::Pending => break, - } - } - - if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { - Poll::Pending => break, - Poll::Ready(Ok(_)) => {} - Poll::Ready(Err(err)) => { - debug!("Error sending data: {:?}", err); - *this.state = State::FramedError(DispatcherError::Encoder(err)); - return true; - } - } - } else { - break; - } - } - - false - } -} - -impl Future for Dispatcher -where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - >::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - type Output = Result<(), DispatcherError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - let this = self.as_mut().project(); - - return match this.state { - State::Processing => { - if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) { - continue; - } else { - Poll::Pending - } - } - State::Error(_) => { - // flush write buffer - if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() { - return Poll::Pending; - } - Poll::Ready(Err(this.state.take_error())) - } - State::FlushAndStop => { - if !this.framed.is_write_buf_empty() { - this.framed.flush(cx).map(|res| { - if let Err(err) = res { - debug!("Error sending data: {:?}", err); - } - - Ok(()) - }) - } else { - Poll::Ready(Ok(())) - } - } - State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())), - State::Stopping => Poll::Ready(Ok(())), - }; - } - } -} diff --git a/actix-utils/src/future/mod.rs b/actix-utils/src/future/mod.rs new file mode 100644 index 00000000..0ad84ec7 --- /dev/null +++ b/actix-utils/src/future/mod.rs @@ -0,0 +1,7 @@ +//! Asynchronous values. + +mod poll_fn; +mod ready; + +pub use self::poll_fn::{poll_fn, PollFn}; +pub use self::ready::{err, ok, ready, Ready}; diff --git a/actix-utils/src/poll_fn.rs b/actix-utils/src/future/poll_fn.rs similarity index 76% rename from actix-utils/src/poll_fn.rs rename to actix-utils/src/future/poll_fn.rs index 2180f4a4..2e5285d8 100644 --- a/actix-utils/src/poll_fn.rs +++ b/actix-utils/src/future/poll_fn.rs @@ -3,20 +3,20 @@ use core::{ fmt, future::Future, - task::{self, Poll}, + pin::Pin, + task::{Context, Poll}, }; -use std::pin::Pin; /// Create a future driven by the provided function that receives a task context. -pub(crate) fn poll_fn(f: F) -> PollFn +pub fn poll_fn(f: F) -> PollFn where - F: FnMut(&mut task::Context<'_>) -> Poll, + F: FnMut(&mut Context<'_>) -> Poll, { PollFn { f } } /// A Future driven by the inner function. -pub(crate) struct PollFn { +pub struct PollFn { f: F, } @@ -30,11 +30,11 @@ impl fmt::Debug for PollFn { impl Future for PollFn where - F: FnMut(&mut task::Context<'_>) -> task::Poll, + F: FnMut(&mut Context<'_>) -> Poll, { type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { (self.f)(cx) } } diff --git a/actix-utils/src/future/ready.rs b/actix-utils/src/future/ready.rs new file mode 100644 index 00000000..be2ee146 --- /dev/null +++ b/actix-utils/src/future/ready.rs @@ -0,0 +1,122 @@ +//! When MSRV is 1.48, replace with `core::future::Ready` and `core::future::ready()`. + +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`ready`](ready()) function. +/// +/// Panic will occur if polled more than once. +/// +/// # Examples +/// ``` +/// use actix_utils::future::ready; +/// +/// // async +/// # async fn run() { +/// let a = ready(1); +/// assert_eq!(a.await, 1); +/// # } +/// +/// // sync +/// let a = ready(1); +/// assert_eq!(a.into_inner(), 1); +/// ``` +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Ready { + val: Option, +} + +impl Ready { + /// Unwraps the value from this immediately ready future. + #[inline] + pub fn into_inner(mut self) -> T { + self.val.take().unwrap() + } +} + +impl Unpin for Ready {} + +impl Future for Ready { + type Output = T; + + #[inline] + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let val = self.val.take().expect("Ready polled after completion"); + Poll::Ready(val) + } +} + +/// Creates a future that is immediately ready with a value. +/// +/// # Examples +/// ```no_run +/// use actix_utils::future::ready; +/// +/// # async fn run() { +/// let a = ready(1); +/// assert_eq!(a.await, 1); +/// # } +/// +/// // sync +/// let a = ready(1); +/// assert_eq!(a.into_inner(), 1); +/// ``` +pub fn ready(val: T) -> Ready { + Ready { val: Some(val) } +} + +/// Create a future that is immediately ready with a success value. +/// +/// # Examples +/// ```no_run +/// use actix_utils::future::ok; +/// +/// # async fn run() { +/// let a = ok::<_, ()>(1); +/// assert_eq!(a.await, Ok(1)); +/// # } +/// ``` +pub fn ok(val: T) -> Ready> { + Ready { val: Some(Ok(val)) } +} + +/// Create a future that is immediately ready with an error value. +/// +/// # Examples +/// ```no_run +/// use actix_utils::future::err; +/// +/// # async fn run() { +/// let a = err::<(), _>(1); +/// assert_eq!(a.await, Err(1)); +/// # } +/// ``` +pub fn err(err: E) -> Ready> { + Ready { + val: Some(Err(err)), + } +} + +#[cfg(test)] +mod tests { + use futures_util::task::noop_waker; + + use super::*; + + #[test] + #[should_panic] + fn multiple_poll_panics() { + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + let mut ready = ready(1); + assert_eq!(Pin::new(&mut ready).poll(&mut cx), Poll::Ready(1)); + + // panic! + let _ = Pin::new(&mut ready).poll(&mut cx); + } +} diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 6658cba8..d0e057ff 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -6,10 +6,4 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] pub mod counter; -pub mod dispatcher; -pub mod mpsc; -mod poll_fn; -pub mod task; -pub mod timeout; - -use self::poll_fn::poll_fn; +pub mod future; diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs deleted file mode 100644 index f13c7ffa..00000000 --- a/actix-utils/src/timeout.rs +++ /dev/null @@ -1,255 +0,0 @@ -//! Service that applies a timeout to requests. -//! -//! If the response does not complete within the specified timeout, the response will be aborted. - -use core::future::Future; -use core::marker::PhantomData; -use core::pin::Pin; -use core::task::{Context, Poll}; -use core::{fmt, time}; - -use actix_rt::time::{sleep, Sleep}; -use actix_service::{IntoService, Service, Transform}; -use pin_project_lite::pin_project; - -/// Applies a timeout to requests. -#[derive(Debug)] -pub struct Timeout { - timeout: time::Duration, - _t: PhantomData, -} - -/// Timeout error -pub enum TimeoutError { - /// Service error - Service(E), - /// Service call timeout - Timeout, -} - -impl From for TimeoutError { - fn from(err: E) -> Self { - TimeoutError::Service(err) - } -} - -impl fmt::Debug for TimeoutError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TimeoutError::Service(e) => write!(f, "TimeoutError::Service({:?})", e), - TimeoutError::Timeout => write!(f, "TimeoutError::Timeout"), - } - } -} - -impl fmt::Display for TimeoutError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TimeoutError::Service(e) => e.fmt(f), - TimeoutError::Timeout => write!(f, "Service call timeout"), - } - } -} - -impl PartialEq for TimeoutError { - fn eq(&self, other: &TimeoutError) -> bool { - match self { - TimeoutError::Service(e1) => match other { - TimeoutError::Service(e2) => e1 == e2, - TimeoutError::Timeout => false, - }, - TimeoutError::Timeout => matches!(other, TimeoutError::Timeout), - } - } -} - -impl Timeout { - pub fn new(timeout: time::Duration) -> Self { - Timeout { - timeout, - _t: PhantomData, - } - } -} - -impl Clone for Timeout { - fn clone(&self) -> Self { - Timeout::new(self.timeout) - } -} - -impl Transform for Timeout -where - S: Service, -{ - type Response = S::Response; - type Error = TimeoutError; - type Transform = TimeoutService; - type InitError = E; - type Future = TimeoutFuture; - - fn new_transform(&self, service: S) -> Self::Future { - let service = TimeoutService { - service, - timeout: self.timeout, - _phantom: PhantomData, - }; - - TimeoutFuture { - service: Some(service), - _err: PhantomData, - } - } -} - -pub struct TimeoutFuture { - service: Option, - _err: PhantomData, -} - -impl Unpin for TimeoutFuture {} - -impl Future for TimeoutFuture { - type Output = Result; - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(self.get_mut().service.take().unwrap())) - } -} - -/// Applies a timeout to requests. -#[derive(Debug, Clone)] -pub struct TimeoutService { - service: S, - timeout: time::Duration, - _phantom: PhantomData, -} - -impl TimeoutService -where - S: Service, -{ - pub fn new(timeout: time::Duration, service: U) -> Self - where - U: IntoService, - { - TimeoutService { - timeout, - service: service.into_service(), - _phantom: PhantomData, - } - } -} - -impl Service for TimeoutService -where - S: Service, -{ - type Response = S::Response; - type Error = TimeoutError; - type Future = TimeoutServiceResponse; - - actix_service::forward_ready!(service); - - fn call(&self, request: Req) -> Self::Future { - TimeoutServiceResponse { - fut: self.service.call(request), - sleep: sleep(self.timeout), - } - } -} - -pin_project! { - /// `TimeoutService` response future - #[derive(Debug)] - pub struct TimeoutServiceResponse - where - S: Service - { - #[pin] - fut: S::Future, - #[pin] - sleep: Sleep, - } -} - -impl Future for TimeoutServiceResponse -where - S: Service, -{ - type Output = Result>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - // First, try polling the future - if let Poll::Ready(res) = this.fut.poll(cx) { - return match res { - Ok(v) => Poll::Ready(Ok(v)), - Err(e) => Poll::Ready(Err(TimeoutError::Service(e))), - }; - } - - // Now check the sleep - this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout)) - } -} - -#[cfg(test)] -mod tests { - use core::time::Duration; - - use super::*; - use actix_service::{apply, fn_factory, Service, ServiceFactory}; - use futures_core::future::LocalBoxFuture; - - struct SleepService(Duration); - - impl Service<()> for SleepService { - type Response = (); - type Error = (); - type Future = LocalBoxFuture<'static, Result<(), ()>>; - - actix_service::always_ready!(); - - fn call(&self, _: ()) -> Self::Future { - let sleep = actix_rt::time::sleep(self.0); - Box::pin(async move { - sleep.await; - Ok(()) - }) - } - } - - #[actix_rt::test] - async fn test_success() { - let resolution = Duration::from_millis(100); - let wait_time = Duration::from_millis(50); - - let timeout = TimeoutService::new(resolution, SleepService(wait_time)); - assert_eq!(timeout.call(()).await, Ok(())); - } - - #[actix_rt::test] - async fn test_timeout() { - let resolution = Duration::from_millis(100); - let wait_time = Duration::from_millis(500); - - let timeout = TimeoutService::new(resolution, SleepService(wait_time)); - assert_eq!(timeout.call(()).await, Err(TimeoutError::Timeout)); - } - - #[actix_rt::test] - async fn test_timeout_new_service() { - let resolution = Duration::from_millis(100); - let wait_time = Duration::from_millis(500); - - let timeout = apply( - Timeout::new(resolution), - fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }), - ); - let srv = timeout.new_service(&()).await.unwrap(); - - assert_eq!(srv.call(()).await, Err(TimeoutError::Timeout)); - } -} diff --git a/local-channel/CHANGES.md b/local-channel/CHANGES.md new file mode 100644 index 00000000..cccf9609 --- /dev/null +++ b/local-channel/CHANGES.md @@ -0,0 +1,7 @@ +# Changes + +## Unreleased - 2021-xx-xx + + +## 0.1.1 - 2021-03-29 +* Move local mpsc channel to it's own crate. diff --git a/local-channel/Cargo.toml b/local-channel/Cargo.toml new file mode 100644 index 00000000..a9d3691e --- /dev/null +++ b/local-channel/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "local-channel" +version = "0.1.1" +description = "A non-threadsafe multi-producer, single-consumer, futures-aware, FIFO queue" +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] +edition = "2018" +license = "MIT OR Apache-2.0" +repository = "https://github.com/actix/actix-net.git" +documentation = "https://docs.rs/actix-server" +keywords = ["channel", "local", "futures"] + +[dependencies] +futures-core = { version = "0.3.7", default-features = false } +futures-sink = { version = "0.3.7", default-features = false } +futures-util = { version = "0.3.7", default-features = false } +local-waker = "0.1" + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "macros"] } diff --git a/local-channel/src/lib.rs b/local-channel/src/lib.rs new file mode 100644 index 00000000..b88fd98a --- /dev/null +++ b/local-channel/src/lib.rs @@ -0,0 +1,3 @@ +//! Non-thread-safe channels. + +pub mod mpsc; diff --git a/actix-utils/src/mpsc.rs b/local-channel/src/mpsc.rs similarity index 95% rename from actix-utils/src/mpsc.rs rename to local-channel/src/mpsc.rs index 9c7a5a0e..627d7db0 100644 --- a/actix-utils/src/mpsc.rs +++ b/local-channel/src/mpsc.rs @@ -1,4 +1,4 @@ -//! A multi-producer, single-consumer, futures-aware, FIFO queue. +//! A non-thread-safe multi-producer, single-consumer, futures-aware, FIFO queue. use core::{ cell::RefCell, @@ -11,8 +11,8 @@ use std::{collections::VecDeque, error::Error, rc::Rc}; use futures_core::stream::Stream; use futures_sink::Sink; - -use crate::{poll_fn, task::LocalWaker}; +use futures_util::future::poll_fn; +use local_waker::LocalWaker; /// Creates a unbounded in-memory channel with buffered storage. /// @@ -174,6 +174,8 @@ impl Drop for Receiver { } /// Error returned when attempting to send after the channels' [Receiver] is dropped or closed. +/// +/// Allows access to message that failed to send with [`into_inner`](Self::into_inner). pub struct SendError(pub T); impl SendError { @@ -199,11 +201,11 @@ impl Error for SendError {} #[cfg(test)] mod tests { - use super::*; - use futures_util::future::lazy; - use futures_util::{stream::Stream, StreamExt}; + use futures_util::{future::lazy, StreamExt as _}; - #[actix_rt::test] + use super::*; + + #[tokio::test] async fn test_mpsc() { let (tx, mut rx) = channel(); tx.send("test").unwrap(); @@ -237,7 +239,7 @@ mod tests { assert!(tx2.send("test").is_err()); } - #[actix_rt::test] + #[tokio::test] async fn test_recv() { let (tx, mut rx) = channel(); tx.send("test").unwrap(); diff --git a/local-waker/CHANGES.md b/local-waker/CHANGES.md new file mode 100644 index 00000000..edb5aa3e --- /dev/null +++ b/local-waker/CHANGES.md @@ -0,0 +1,7 @@ +# Changes + +## Unreleased - 2021-xx-xx + + +## 0.1.1 - 2021-03-29 +* Move `LocalWaker` to it's own crate. diff --git a/local-waker/Cargo.toml b/local-waker/Cargo.toml new file mode 100644 index 00000000..df1f9ab8 --- /dev/null +++ b/local-waker/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "local-waker" +version = "0.1.1" +description = "A synchronization primitive for thread-local task wakeup" +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] +keywords = ["waker", "local", "futures", "no-std"] +repository = "https://github.com/actix/actix-net.git" +documentation = "https://docs.rs/local-waker" +categories = ["asynchronous", "no-std"] +license = "MIT OR Apache-2.0" +edition = "2018" + +[dependencies] diff --git a/actix-utils/src/task.rs b/local-waker/src/lib.rs similarity index 95% rename from actix-utils/src/task.rs rename to local-waker/src/lib.rs index 507bfc14..c76badee 100644 --- a/actix-utils/src/task.rs +++ b/local-waker/src/lib.rs @@ -1,3 +1,9 @@ +//! A synchronization primitive for thread-local task wakeup. +//! +//! See docs for [`LocalWaker`]. + +#![no_std] + use core::{cell::Cell, fmt, marker::PhantomData, task::Waker}; /// A synchronization primitive for task wakeup.