1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-12 06:27:09 +02:00

Compare commits

..

25 Commits

Author SHA1 Message Date
Rob Ede
a5a6b6704c prepare actix-service 1.0.6 release (#175) 2020-08-09 16:10:58 +01:00
Igor Aleksanov
afb0a3c9fc actix-service: Fix clippy warning in benches (#174) 2020-08-07 17:16:45 +09:00
Miloas
02aaa75591 fix actix-service doc error (#172) 2020-08-06 11:21:51 +01:00
Yuki Okushi
ed4b708c66 Fix CI on MSRV check (#171) 2020-08-05 09:02:41 +09:00
Yuki Okushi
235a76dcd4 GHA: Switch action to the official setup-msys2 (#169) 2020-07-29 08:47:32 +09:00
Matt Kantor
0c5f1da625 Remove garbled doc comment for actix_router::IntoPattern::is_single (#168) 2020-07-29 05:46:53 +09:00
Yuki Okushi
8ace9264b7 Check code style with rustfmt on CI (#164) 2020-07-22 12:32:13 +09:00
Yuki Okushi
0dca1a705a actix-utils: Remove unsound custom Cell as well (#161) 2020-07-22 01:14:32 +01:00
Juan Aguilar
5d6d309e66 Simplify bcodec decode (#162) 2020-07-20 23:09:24 +09:00
Juan Aguilar
8d0bd7ce1c Improve bcodec encode performance (#157) 2020-07-19 22:36:51 +01:00
Sergey "Shnatsel" Davidoff
a67e38b4a0 Remove unsound custom Cell (#158) 2020-07-20 06:05:36 +09:00
Rob Ede
334c98575a Upgrade tokio utils to 0.3 (#138) 2020-07-20 05:44:26 +09:00
Rob Ede
a9b5a7b070 Create PULL_REQUEST_TEMPLATE.md (#159) 2020-07-20 03:01:09 +09:00
Yuki Okushi
61176f6410 Update rustls-related dependencies (#154) 2020-07-14 11:14:06 +01:00
Yuki Okushi
10b4c30a06 Use OR instead of deprecated / in license field (#155) 2020-07-14 11:11:30 +01:00
Yuki Okushi
7f550bcf0f threadpool: Bump up to 0.3.3 (#156) 2020-07-14 11:10:15 +01:00
Yuki Okushi
887f11f787 Merge pull request #153 from actix/tweak-actions
Tweak actions trigger events
2020-07-08 09:04:05 +09:00
Yuki Okushi
e2a6d352b0 Tweak actions trigger events 2020-07-08 08:38:24 +09:00
Yuki Okushi
f6c697a2dd Merge pull request #152 from paolobarbolini/pl-011
Update parking_lot to 0.11
2020-07-04 03:20:08 +09:00
Paolo Barbolini
5ecdfd684a Update parking_lot to 0.11 2020-07-03 17:37:10 +02:00
Yuki Okushi
7140c04c44 Merge pull request #149 from taiki-e/pin-project
Remove uses of pin_project::project attribute
2020-06-07 02:01:08 +09:00
Taiki Endo
9528df4486 Remove uses of pin_project::project attribute
pin-project will deprecate the project attribute due to some unfixable
limitations.

Refs: https://github.com/taiki-e/pin-project/issues/225
2020-06-06 06:42:45 +09:00
Pen Tree
755a8bb9d1 fix codec doc links (#148) 2020-06-02 18:05:39 +01:00
Yuki Okushi
f3cb6efc30 Merge pull request #146 from actix/cache-v2
Update `actions/cache` to v2
2020-05-28 04:59:34 +09:00
Yuki Okushi
87b857705c Update actions/cache to v2 2020-05-28 03:14:01 +09:00
65 changed files with 746 additions and 1681 deletions

24
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@@ -0,0 +1,24 @@
## PR Type
<!-- What kind of change does this PR make? -->
<!-- Bug Fix / Feature / Refactor / Code Style / Other -->
INSERT_PR_TYPE
## PR Checklist
Check your PR fulfills the following:
<!-- For draft PRs check the boxes as you complete them. -->
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] A changelog entry has been made for the appropriate packages.
- [ ] Format code with the latest stable rustfmt
## Overview
<!-- Describe the current and new behavior. -->
<!-- Emphasize any breaking changes. -->
<!-- If this PR fixes or closes an issue, reference it here. -->
<!-- Closes #000 -->

View File

@@ -1,13 +1,19 @@
name: Benchmark (Linux)
on: [push, pull_request]
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
check_benchmark:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1

34
.github/workflows/clippy-fmt.yml vendored Normal file
View File

@@ -0,0 +1,34 @@
on:
pull_request:
types: [opened, synchronize, reopened]
name: Clippy and rustfmt Check
jobs:
clippy_check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: rustfmt
profile: minimal
override: true
- name: Check with rustfmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: clippy
profile: minimal
override: true
- name: Check with Clippy
uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features --all --tests

View File

@@ -1,18 +0,0 @@
on: pull_request
name: Clippy Check
jobs:
clippy_check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: clippy
profile: minimal
override: true
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features --all --tests

View File

@@ -1,6 +1,12 @@
name: CI (Linux)
on: [push, pull_request]
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
@@ -16,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
@@ -29,18 +35,16 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: generate-lockfile
- name: Cache cargo registry
uses: actions/cache@v1
- name: Cache cargo dirs
uses: actions/cache@v2
with:
path: ~/.cargo/registry
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-registry-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-index-trimmed-${{ hashFiles('**/Cargo.lock') }}
path:
~/.cargo/registry
~/.cargo/git
~/.cargo/bin
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo build
uses: actions/cache@v1
uses: actions/cache@v2
with:
path: target
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
@@ -72,5 +76,7 @@ jobs:
- name: Clear the cargo caches
run: |
rustup update stable
rustup override set stable
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo-cache

View File

@@ -1,6 +1,12 @@
name: CI (macOS)
on: [push, pull_request]
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
@@ -15,7 +21,7 @@ jobs:
runs-on: macos-latest
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1

View File

@@ -1,6 +1,12 @@
name: CI (Windows-mingw)
on: [push, pull_request]
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
@@ -15,7 +21,7 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
@@ -25,12 +31,12 @@ jobs:
override: true
- name: Install MSYS2
uses: numworks/setup-msys2@v1
uses: msys2/setup-msys2@v2
- name: Install packages
run: |
msys2do pacman -Sy --noconfirm pacman
msys2do pacman --noconfirm -S base-devel pkg-config
msys2 -c 'pacman -Sy --noconfirm pacman'
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
- name: check build
uses: actions-rs/cargo@v1

View File

@@ -1,6 +1,12 @@
name: CI (Windows)
on: [push, pull_request]
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
env:
VCPKGRS_DYNAMIC: 1
@@ -21,7 +27,7 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@master
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1

View File

@@ -2,7 +2,6 @@
members = [
"actix-codec",
"actix-connect",
"actix-ioframe",
"actix-rt",
"actix-macros",
"actix-service",
@@ -19,7 +18,6 @@ members = [
[patch.crates-io]
actix-codec = { path = "actix-codec" }
actix-connect = { path = "actix-connect" }
actix-ioframe = { path = "actix-ioframe" }
actix-rt = { path = "actix-rt" }
actix-macros = { path = "actix-macros" }
actix-server = { path = "actix-server" }

View File

@@ -1,6 +1,10 @@
# Changes
* Use `.advance()` intead of `.split_to()`
## Unreleased - 2020-xx-xx
* Use `.advance()` instead of `.split_to()`.
* Upgrade `tokio-util` to `0.3`.
* Improve `BytesCodec` `.encode()` performance
* Simplify `BytesCodec` `.decode()`
## [0.2.0] - 2019-12-10
@@ -8,7 +12,7 @@
## [0.2.0-alpha.4]
* Fix buffer remaining capacity calcualtion
* Fix buffer remaining capacity calculation
## [0.2.0-alpha.3]

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-codec/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
workspace = ".."
@@ -21,7 +21,7 @@ bitflags = "1.2.1"
bytes = "0.5.2"
futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
tokio = { version = "0.2.4", default-features=false }
tokio-util = { version = "0.2.0", default-features=false, features=["codec"] }
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }
log = "0.4"
pin-project = "0.4.8"
pin-project = "0.4.17"

View File

@@ -1,4 +1,4 @@
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use std::io;
use super::{Decoder, Encoder};
@@ -9,13 +9,12 @@ use super::{Decoder, Encoder};
#[derive(Debug, Copy, Clone)]
pub struct BytesCodec;
impl Encoder for BytesCodec {
type Item = Bytes;
impl Encoder<Bytes> for BytesCodec {
type Error = io::Error;
#[inline]
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.reserve(item.len());
dst.put(item);
dst.extend_from_slice(item.bytes());
Ok(())
}
}
@@ -28,8 +27,7 @@ impl Decoder for BytesCodec {
if src.is_empty() {
Ok(None)
} else {
let len = src.len();
Ok(Some(src.split_to(len)))
Ok(Some(src.split()))
}
}
}

View File

@@ -9,7 +9,9 @@ use pin_project::pin_project;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};
/// Low-water mark
const LW: usize = 1024;
/// High-water mark
const HW: usize = 8 * 1024;
bitflags::bitflags! {
@@ -34,7 +36,7 @@ pub struct Framed<T, U> {
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
U: Decoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
@@ -129,7 +131,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
io: self.io,
@@ -140,7 +142,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2>(self, f: F) -> Framed<T2, U>
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
@@ -154,7 +156,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
@@ -186,10 +188,10 @@ impl<T, U> Framed<T, U> {
impl<T, U> Framed<T, U> {
/// Serialize item and Write to the inner buffer
pub fn write(mut self: Pin<&mut Self>, item: <U as Encoder>::Item) -> Result<(), <U as Encoder>::Error>
pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let this = self.as_mut().project();
let remaining = this.write_buf.capacity() - this.write_buf.len();
@@ -209,7 +211,10 @@ impl<T, U> Framed<T, U> {
}
/// Try to read underlying I/O stream and decode item.
pub fn next_item(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<U::Item, U::Error>>>
pub fn next_item(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<<U as Decoder>::Item, U::Error>>>
where
T: AsyncRead,
U: Decoder,
@@ -266,10 +271,13 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer to underlying I/O stream.
pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn flush<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
log::trace!("flushing framed transport");
@@ -277,9 +285,7 @@ impl<T, U> Framed<T, U> {
while !this.write_buf.is_empty() {
log::trace!("writing; remaining={}", this.write_buf.len());
let n = ready!(
this.io.as_mut().poll_write(cx, this.write_buf)
)?;
let n = ready!(this.io.as_mut().poll_write(cx, this.write_buf))?;
if n == 0 {
return Poll::Ready(Err(io::Error::new(
@@ -301,10 +307,13 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer and shutdown underlying I/O stream.
pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn close<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
ready!(this.io.as_mut().poll_flush(cx))?;
@@ -325,10 +334,10 @@ where
}
}
impl<T, U> Sink<U::Item> for Framed<T, U>
impl<T, U, I> Sink<I> for Framed<T, U>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
U::Error: From<io::Error>,
{
type Error = U::Error;
@@ -341,24 +350,15 @@ where
}
}
fn start_send(
self: Pin<&mut Self>,
item: <U as Encoder>::Item,
) -> Result<(), Self::Error> {
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
self.write(item)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.flush(cx)
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close(cx)
}
}

View File

@@ -2,11 +2,13 @@
//!
//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
//! Framed streams are also known as [transports].
//! Framed streams are also known as `transports`.
//!
//! [`AsyncRead`]: #
//! [`AsyncWrite`]: #
#![deny(rust_2018_idioms, warnings)]
//! [`AsyncRead`]: AsyncRead
//! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms)]
mod bcodec;
mod framed;

View File

@@ -1,5 +1,12 @@
# Changes
## [unreleased]
### Changed
* Update `rustls` dependency to 0.18
* Update `tokio-rustls` dependency to 0.14
## [2.0.0-alpha.3] - 2020-05-08
### Fixed

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-connect/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[package.metadata.docs.rs]
@@ -48,8 +48,8 @@ open-ssl = { version="0.10", package = "openssl", optional = true }
tokio-openssl = { version = "0.4.0", optional = true }
# rustls
rust-tls = { version = "0.17.0", package = "rustls", optional = true }
tokio-rustls = { version = "0.13.0", optional = true }
rust-tls = { version = "0.18.0", package = "rustls", optional = true }
tokio-rustls = { version = "0.14.0", optional = true }
webpki = { version = "0.21", optional = true }
[dev-dependencies]

View File

@@ -1,33 +0,0 @@
# Changes
## [0.5.0] - 2019-12-29
* Simplify state management
* Allow to set custom output stream
* Removed disconnect callback
## [0.4.1] - 2019-12-11
* Disconnect callback accepts owned state
## [0.4.0] - 2019-12-11
* Remove `E` param
## [0.3.0-alpha.3] - 2019-12-07
* Migrate to tokio 0.2
## [0.3.0-alpha.2] - 2019-12-02
* Migrate to `std::future`
## [0.1.1] - 2019-10-14
* Re-register task on every dispatcher poll.
## [0.1.0] - 2019-09-25
* Initial release

View File

@@ -1,33 +0,0 @@
[package]
name = "actix-ioframe"
version = "0.5.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix framed service"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-ioframe/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
edition = "2018"
[lib]
name = "actix_ioframe"
path = "src/lib.rs"
[dependencies]
actix-service = "1.0.1"
actix-codec = "0.2.0"
actix-utils = "1.0.4"
actix-rt = "1.0.0"
bytes = "0.5.3"
either = "1.5.3"
futures-sink = { version = "0.3.4", default-features = false }
futures-core = { version = "0.3.4", default-features = false }
pin-project = "0.4.6"
log = "0.4"
[dev-dependencies]
actix-connect = "2.0.0-alpha.2"
actix-testing = "1.0.0"
futures-util = { version = "0.3.4", default-features = false }

View File

@@ -1 +0,0 @@
../LICENSE-APACHE

View File

@@ -1 +0,0 @@
../LICENSE-MIT

3
actix-ioframe/README.md Normal file
View File

@@ -0,0 +1,3 @@
# actix-ioframe
**This crate has been deprecated and removed.**

View File

@@ -1,123 +0,0 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_utils::mpsc::Receiver;
use futures_core::stream::Stream;
pub struct Connect<Io, Codec>
where
Codec: Encoder + Decoder,
{
io: Io,
_t: PhantomData<Codec>,
}
impl<Io, Codec> Connect<Io, Codec>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
pub(crate) fn new(io: Io) -> Self {
Self {
io,
_t: PhantomData,
}
}
pub fn codec(
self,
codec: Codec,
) -> ConnectResult<Io, (), Codec, Receiver<<Codec as Encoder>::Item>> {
ConnectResult {
state: (),
out: None,
framed: Framed::new(self.io, codec),
}
}
}
#[pin_project::pin_project]
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder, Out> {
pub(crate) state: St,
pub(crate) out: Option<Out>,
#[pin]
pub(crate) framed: Framed<Io, Codec>,
}
impl<Io, St, Codec: Encoder + Decoder, Out: Unpin> ConnectResult<Io, St, Codec, Out> {
#[inline]
pub fn get_ref(&self) -> &Io {
self.framed.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut Io {
self.framed.get_mut()
}
pub fn out<U>(self, out: U) -> ConnectResult<Io, St, Codec, U>
where
U: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
ConnectResult {
state: self.state,
framed: self.framed,
out: Some(out),
}
}
#[inline]
pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec, Out> {
ConnectResult {
state,
framed: self.framed,
out: self.out,
}
}
}
impl<Io, St, Codec, Out> Stream for ConnectResult<Io, St, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().framed.next_item(cx)
}
}
impl<Io, St, Codec, Out> futures_sink::Sink<<Codec as Encoder>::Item>
for ConnectResult<Io, St, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
{
type Error = <Codec as Encoder>::Error;
fn poll_ready(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.as_mut().project().framed.is_write_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn start_send(
self: Pin<&mut Self>,
item: <Codec as Encoder>::Item,
) -> Result<(), Self::Error> {
self.project().framed.write(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.as_mut().project().framed.flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.as_mut().project().framed.close(cx)
}
}

View File

@@ -1,248 +0,0 @@
//! Framed dispatcher service and related utilities
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::Service;
use actix_utils::mpsc;
use futures_core::stream::Stream;
use pin_project::pin_project;
use log::debug;
use crate::error::ServiceError;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
#[pin_project]
pub(crate) struct Dispatcher<S, T, U, Out>
where
S: Service<Request = Request<U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <U as Encoder>::Item> + Unpin,
{
service: S,
sink: Option<Out>,
state: FramedState<S, U>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<<U as Encoder>::Item, S::Error>>,
}
impl<S, T, U, Out> Dispatcher<S, T, U, Out>
where
S: Service<Request = Request<U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <U as Encoder>::Item> + Unpin,
{
pub(crate) fn new(framed: Framed<T, U>, service: S, sink: Option<Out>) -> Self {
Dispatcher {
sink,
service,
framed,
rx: mpsc::channel().1,
state: FramedState::Processing,
}
}
}
enum FramedState<S: Service, U: Encoder + Decoder> {
Processing,
Error(ServiceError<S::Error, U>),
FramedError(ServiceError<S::Error, U>),
FlushAndStop,
Stopping,
}
impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
fn take_error(&mut self) -> ServiceError<S::Error, U> {
match std::mem::replace(self, FramedState::Processing) {
FramedState::Error(err) => err,
_ => panic!(),
}
}
fn take_framed_error(&mut self) -> ServiceError<S::Error, U> {
match std::mem::replace(self, FramedState::Processing) {
FramedState::FramedError(err) => err,
_ => panic!(),
}
}
}
impl<S, T, U, Out> Dispatcher<S, T, U, Out>
where
S: Service<Request = Request<U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <U as Encoder>::Item> + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
loop {
let this = self.as_mut().project();
match this.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
let item = match this.framed.next_item(cx) {
Poll::Ready(Some(Ok(el))) => el,
Poll::Ready(Some(Err(err))) => {
*this.state = FramedState::FramedError(ServiceError::Decoder(err));
return true;
}
Poll::Pending => return false,
Poll::Ready(None) => {
log::trace!("Client disconnected");
*this.state = FramedState::Stopping;
return true;
}
};
let tx = this.rx.sender();
let fut = this.service.call(item);
actix_rt::spawn(async move {
let item = fut.await;
let item = match item {
Ok(Some(item)) => Ok(item),
Ok(None) => return,
Err(err) => Err(err),
};
let _ = tx.send(item);
});
}
Poll::Pending => return false,
Poll::Ready(Err(err)) => {
*this.state = FramedState::Error(ServiceError::Service(err));
return true;
}
}
}
}
/// write to framed object
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
loop {
let mut this = self.as_mut().project();
while !this.framed.is_write_buf_full() {
match Pin::new(&mut this.rx).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => {
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state = FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
continue;
}
Poll::Ready(Some(Err(err))) => {
*this.state = FramedState::Error(ServiceError::Service(err));
return true;
}
Poll::Ready(None) | Poll::Pending => (),
}
if this.sink.is_some() {
match Pin::new(this.sink.as_mut().unwrap()).poll_next(cx) {
Poll::Ready(Some(msg)) => {
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state =
FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
continue;
}
Poll::Ready(None) => {
let _ = this.sink.take();
*this.state = FramedState::FlushAndStop;
return true;
}
Poll::Pending => (),
}
}
break;
}
if !this.framed.is_write_buf_empty() {
match this.framed.as_mut().flush(cx) {
Poll::Pending => break,
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
*this.state = FramedState::FramedError(ServiceError::Encoder(err));
return true;
}
}
} else {
break;
}
}
false
}
pub(crate) fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), ServiceError<S::Error, U>>> {
let mut this = self.as_mut().project();
match this.state {
FramedState::Processing => loop {
let read = self.as_mut().poll_read(cx);
let write = self.as_mut().poll_write(cx);
if read || write {
continue;
} else {
return Poll::Pending;
}
},
FramedState::Error(_) => {
// flush write buffer
if !this.framed.is_write_buf_empty() {
if let Poll::Pending = this.framed.flush(cx) {
return Poll::Pending;
}
}
Poll::Ready(Err(this.state.take_error()))
}
FramedState::FlushAndStop => {
// drain service responses
match Pin::new(this.rx).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => {
if this.framed.as_mut().write(msg).is_err() {
return Poll::Ready(Ok(()));
}
}
Poll::Ready(Some(Err(_))) => return Poll::Ready(Ok(())),
Poll::Ready(None) | Poll::Pending => (),
}
// flush io
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
}
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(_) => (),
}
};
Poll::Ready(Ok(()))
}
FramedState::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
FramedState::Stopping => Poll::Ready(Ok(())),
}
}
}

View File

@@ -1,49 +0,0 @@
use std::fmt;
use actix_codec::{Decoder, Encoder};
/// Framed service errors
pub enum ServiceError<E, U: Encoder + Decoder> {
/// Inner service error
Service(E),
/// Encoder parse error
Encoder(<U as Encoder>::Error),
/// Decoder parse error
Decoder(<U as Decoder>::Error),
}
impl<E, U: Encoder + Decoder> From<E> for ServiceError<E, U> {
fn from(err: E) -> Self {
ServiceError::Service(err)
}
}
impl<E, U: Encoder + Decoder> fmt::Debug for ServiceError<E, U>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
ServiceError::Service(ref e) => write!(fmt, "ServiceError::Service({:?})", e),
ServiceError::Encoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e),
ServiceError::Decoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e),
}
}
}
impl<E, U: Encoder + Decoder> fmt::Display for ServiceError<E, U>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
ServiceError::Service(ref e) => write!(fmt, "{}", e),
ServiceError::Encoder(ref e) => write!(fmt, "{:?}", e),
ServiceError::Decoder(ref e) => write!(fmt, "{:?}", e),
}
}
}

View File

@@ -1,11 +0,0 @@
// #![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity, clippy::too_many_arguments)]
mod connect;
mod dispatcher;
mod error;
mod service;
pub use self::connect::{Connect, ConnectResult};
pub use self::error::ServiceError;
pub use self::service::{Builder, FactoryBuilder};

View File

@@ -1,416 +0,0 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory};
use either::Either;
use futures_core::{ready, stream::Stream};
use pin_project::project;
use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::Dispatcher;
use crate::error::ServiceError;
type RequestItem<U> = <U as Decoder>::Item;
type ResponseItem<U> = Option<<U as Encoder>::Item>;
/// Service builder - structure that follows the builder pattern
/// for building instances for framed services.
pub struct Builder<St, C, Io, Codec, Out> {
connect: C,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, Io, Codec, Out> Builder<St, C, Io, Codec, Out>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
Io: AsyncRead + AsyncWrite,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
/// Construct framed handler service with specified connect service
pub fn new<F>(connect: F) -> Builder<St, C, Io, Codec, Out>
where
F: IntoService<C>,
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
Codec: Decoder + Encoder,
Out: Stream<Item = <Codec as Encoder>::Item>,
{
Builder {
connect: connect.into_service(),
_t: PhantomData,
}
}
/// Provide stream items handler service and construct service factory.
pub fn build<F, T>(self, service: F) -> FramedServiceImpl<St, C, T, Io, Codec, Out>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
{
FramedServiceImpl {
connect: self.connect,
handler: Rc::new(service.into_factory()),
_t: PhantomData,
}
}
}
/// Service builder - structure that follows the builder pattern
/// for building instances for framed services.
pub struct FactoryBuilder<St, C, Io, Codec, Out> {
connect: C,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, Io, Codec, Out> FactoryBuilder<St, C, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
Codec: Decoder + Encoder,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
/// Construct framed handler new service with specified connect service
pub fn new<F>(connect: F) -> FactoryBuilder<St, C, Io, Codec, Out>
where
F: IntoServiceFactory<C>,
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
Codec: Decoder + Encoder,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
FactoryBuilder {
connect: connect.into_factory(),
_t: PhantomData,
}
}
pub fn build<F, T, Cfg>(self, service: F) -> FramedService<St, C, T, Io, Codec, Out, Cfg>
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
{
FramedService {
connect: self.connect,
handler: Rc::new(service.into_factory()),
_t: PhantomData,
}
}
}
pub struct FramedService<St, C, T, Io, Codec, Out, Cfg> {
connect: C,
handler: Rc<T>,
_t: PhantomData<(St, Io, Codec, Out, Cfg)>,
}
impl<St, C, T, Io, Codec, Out, Cfg> ServiceFactory
for FramedService<St, C, T, Io, Codec, Out, Cfg>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Config = Cfg;
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type InitError = C::InitError;
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec, Out>;
type Future = FramedServiceResponse<St, C, T, Io, Codec, Out>;
fn new_service(&self, _: Cfg) -> Self::Future {
// create connect service and then create service impl
FramedServiceResponse {
fut: self.connect.new_service(()),
handler: self.handler.clone(),
}
}
}
#[pin_project::pin_project]
pub struct FramedServiceResponse<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
#[pin]
fut: C::Future,
handler: Rc<T>,
}
impl<St, C, T, Io, Codec, Out> Future for FramedServiceResponse<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: ServiceFactory<
Config = (),
Request = Connect<Io, Codec>,
Response = ConnectResult<Io, St, Codec, Out>,
>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Output = Result<FramedServiceImpl<St, C::Service, T, Io, Codec, Out>, C::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let connect = ready!(this.fut.poll(cx))?;
Poll::Ready(Ok(FramedServiceImpl {
connect,
handler: this.handler.clone(),
_t: PhantomData,
}))
}
}
pub struct FramedServiceImpl<St, C, T, Io, Codec, Out> {
connect: C,
handler: Rc<T>,
_t: PhantomData<(St, Io, Codec, Out)>,
}
impl<St, C, T, Io, Codec, Out> Service for FramedServiceImpl<St, C, T, Io, Codec, Out>
where
Io: AsyncRead + AsyncWrite,
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Codec: Decoder + Encoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Request = Io;
type Response = ();
type Error = ServiceError<C::Error, Codec>;
type Future = FramedServiceImplResponse<St, Io, Codec, Out, C, T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.connect.poll_ready(cx).map_err(|e| e.into())
}
fn call(&mut self, req: Io) -> Self::Future {
FramedServiceImplResponse {
inner: FramedServiceImplResponseInner::Connect(
self.connect.call(Connect::new(req)),
self.handler.clone(),
),
}
}
}
#[pin_project::pin_project]
pub struct FramedServiceImplResponse<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
#[pin]
inner: FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>,
}
impl<St, Io, Codec, Out, C, T> Future for FramedServiceImplResponse<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
type Output = Result<(), ServiceError<C::Error, Codec>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
loop {
match this.inner.poll(cx) {
Either::Left(new) => {
this = self.as_mut().project();
this.inner.set(new)
}
Either::Right(poll) => return poll,
};
}
}
}
#[pin_project::pin_project]
enum FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
Connect(#[pin] C::Future, Rc<T>),
Handler(#[pin] T::Future, Option<Framed<Io, Codec>>, Option<Out>),
Dispatcher(#[pin] Dispatcher<T::Service, Io, Codec, Out>),
}
impl<St, Io, Codec, Out, C, T> FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>
where
C: Service<Request = Connect<Io, Codec>, Response = ConnectResult<Io, St, Codec, Out>>,
T: ServiceFactory<
Config = St,
Request = RequestItem<Codec>,
Response = ResponseItem<Codec>,
Error = C::Error,
InitError = C::Error,
>,
<T::Service as Service>::Error: 'static,
<T::Service as Service>::Future: 'static,
Io: AsyncRead + AsyncWrite,
Codec: Encoder + Decoder,
<Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug,
Out: Stream<Item = <Codec as Encoder>::Item> + Unpin,
{
#[project]
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Either<
FramedServiceImplResponseInner<St, Io, Codec, Out, C, T>,
Poll<Result<(), ServiceError<C::Error, Codec>>>,
> {
#[project]
match self.project() {
FramedServiceImplResponseInner::Connect(fut, handler) => match fut.poll(cx) {
Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler(
handler.new_service(res.state),
Some(res.framed),
res.out,
)),
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
},
FramedServiceImplResponseInner::Handler(fut, framed, out) => {
match fut.poll(cx) {
Poll::Ready(Ok(handler)) => {
Either::Left(FramedServiceImplResponseInner::Dispatcher(
Dispatcher::new(framed.take().unwrap(), handler, out.take()),
))
}
Poll::Pending => Either::Right(Poll::Pending),
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
}
}
FramedServiceImplResponseInner::Dispatcher(fut) => {
Either::Right(fut.poll(cx))
}
}
}
}

View File

@@ -1,55 +0,0 @@
use std::cell::Cell;
use std::rc::Rc;
use actix_codec::BytesCodec;
use actix_service::{fn_factory_with_config, fn_service, IntoService, Service};
use actix_testing::TestServer;
use actix_utils::mpsc;
use bytes::{Bytes, BytesMut};
use futures_util::future::ok;
use actix_ioframe::{Builder, Connect, FactoryBuilder};
#[derive(Clone)]
struct State(Option<mpsc::Sender<Bytes>>);
#[actix_rt::test]
async fn test_basic() {
let client_item = Rc::new(Cell::new(false));
let srv = TestServer::with(move || {
FactoryBuilder::new(fn_service(|conn: Connect<_, _>| {
ok(conn.codec(BytesCodec).state(State(None)))
}))
// echo
.build(fn_service(|t: BytesMut| ok(Some(t.freeze()))))
});
let item = client_item.clone();
let mut client = Builder::new(fn_service(move |conn: Connect<_, _>| {
async move {
let (tx, rx) = mpsc::channel();
let _ = tx.send(Bytes::from_static(b"Hello"));
Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx))))
}
}))
.build(fn_factory_with_config(move |mut cfg: State| {
let item = item.clone();
ok((move |t: BytesMut| {
assert_eq!(t.freeze(), Bytes::from_static(b"Hello"));
item.set(true);
// drop Sender, which will close connection
cfg.0.take();
ok::<_, ()>(None)
})
.into_service())
}));
let conn = actix_connect::default_connector()
.call(actix_connect::Connect::with(String::new(), srv.addr()))
.await
.unwrap();
client.call(conn.into_parts().0).await.unwrap();
assert!(client_item.get());
}

View File

@@ -6,7 +6,7 @@ description = "Actix runtime macros"
repository = "https://github.com/actix/actix-net"
documentation = "https://docs.rs/actix-macros/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-rt/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-server/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config"]
edition = "2018"
workspace = ".."

View File

@@ -1,5 +1,14 @@
# Changes
## Unreleased - 2020-xx-xx
## 1.0.6 - 2020-08-09
### Fixed
* Removed unsound custom Cell implementation that allowed obtaining several mutable references to the same data, which is undefined behavior in Rust and could lead to violations of memory safety. External code could obtain several mutable references to the same data through service combinators. Attempts to acquire several mutable references to the same data will instead result in a panic.
## [1.0.5] - 2020-01-16
### Fixed

View File

@@ -1,14 +1,15 @@
[package]
name = "actix-service"
version = "1.0.5"
version = "1.0.6"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix service"
keywords = ["network", "framework", "async", "futures"]
description = "Service trait and combinators for representing asynchronous request/response operations."
keywords = ["network", "framework", "async", "futures", "service"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-service/"
documentation = "https://docs.rs/actix-service"
readme = "actix-service/README.md"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]
@@ -17,7 +18,7 @@ path = "src/lib.rs"
[dependencies]
futures-util = "0.3.1"
pin-project = "0.4.6"
pin-project = "0.4.17"
[dev-dependencies]
actix-rt = "1.0.0"

7
actix-service/README.md Normal file
View File

@@ -0,0 +1,7 @@
# actix-service
> Service trait and combinators for representing asynchronous request/response operations.
See documentation for detailed explanations these components: [https://docs.rs/actix-service](docs).
[docs]: https://docs.rs/actix-service

View File

@@ -1,28 +1,26 @@
use actix_service::boxed::BoxFuture;
use actix_service::IntoService;
use actix_service::Service;
/// Benchmark various implementations of and_then
use criterion::{criterion_main, Criterion};
use futures_util::future::join_all;
use futures_util::future::TryFutureExt;
use std::cell::{RefCell, UnsafeCell};
use std::task::{Context, Poll};
use std::rc::Rc;
use actix_service::{Service};
use actix_service::IntoService;
use std::future::Future;
use std::pin::Pin;
use futures_util::future::TryFutureExt;
use actix_service::boxed::BoxFuture;
use std::rc::Rc;
use std::task::{Context, Poll};
/*
* Test services A,B for AndThen service implementations
*/
async fn svc1(_: ()) -> Result<usize, ()> {
Ok(1)
Ok(1)
}
async fn svc2(req: usize) -> Result<usize, ()> {
Ok(req + 1)
Ok(req + 1)
}
/*
@@ -30,45 +28,44 @@ async fn svc2(req: usize) -> Result<usize, ()> {
* Cut down version of actix_service::AndThenService based on actix-service::Cell
*/
struct AndThenUC<A, B>(Rc<UnsafeCell<(A, B)>>);
struct AndThenUC<A,B>(Rc<UnsafeCell<(A, B)>>);
impl<A,B> AndThenUC<A,B> {
fn new(a: A, b: B) -> Self
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Rc::new(UnsafeCell::new((a,b))))
}
impl<A, B> AndThenUC<A, B> {
fn new(a: A, b: B) -> Self
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Rc::new(UnsafeCell::new((a, b))))
}
}
impl<A,B> Clone for AndThenUC<A,B> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
impl<A, B> Clone for AndThenUC<A, B> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<A,B> Service for AndThenUC<A,B>
impl<A, B> Service for AndThenUC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>
B: Service<Request = A::Response, Error = A::Error>,
{
type Request = A::Request;
type Response = B::Response;
type Error = A::Error;
type Future = AndThenServiceResponse<A,B>;
type Request = A::Request;
type Response = B::Response;
type Error = A::Error;
type Future = AndThenServiceResponse<A, B>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = unsafe { &mut *(*self.0).get() }.0.call(req);
AndThenServiceResponse {
state: State::A(fut, Some(self.0.clone()))
}
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = unsafe { &mut *(*self.0).get() }.0.call(req);
AndThenServiceResponse {
state: State::A(fut, Some(self.0.clone())),
}
}
}
#[pin_project::pin_project]
@@ -81,7 +78,7 @@ where
state: State<A, B>,
}
#[pin_project::pin_project]
#[pin_project::pin_project(project = StateProj)]
enum State<A, B>
where
A: Service,
@@ -99,13 +96,11 @@ where
{
type Output = Result<B::Response, A::Error>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[project]
match this.state.as_mut().project() {
State::A(fut, b) => match fut.poll(cx)? {
StateProj::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
@@ -115,163 +110,160 @@ where
}
Poll::Pending => Poll::Pending,
},
State::B(fut) => fut.poll(cx).map(|r| {
StateProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
State::Empty => panic!("future must not be polled after it returned `Poll::Ready`"),
StateProj::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
/*
* AndThenRC - AndThen service based on RefCell
*/
struct AndThenRC<A,B>(Rc<RefCell<(A, B)>>);
struct AndThenRC<A, B>(Rc<RefCell<(A, B)>>);
impl<A,B> AndThenRC<A,B> {
fn new(a: A, b: B) -> Self
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Rc::new(RefCell::new((a,b))))
}
}
impl<A,B> Clone for AndThenRC<A,B> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<A,B> Service for AndThenRC<A,B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>
{
type Request = A::Request;
type Response = B::Response;
type Error = A::Error;
type Future = AndThenServiceResponseRC<A,B>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.0.borrow_mut().0.call(req);
AndThenServiceResponseRC {
state: StateRC::A(fut, Some(self.0.clone()))
}
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenServiceResponseRC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
#[pin]
state: StateRC<A, B>,
}
#[pin_project::pin_project]
enum StateRC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Rc<RefCell<(A, B)>>>),
B(#[pin] B::Future),
Empty,
}
impl<A, B> Future for AndThenServiceResponseRC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
type Output = Result<B::Response, A::Error>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[project]
match this.state.as_mut().project() {
StateRC::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let b = b.take().unwrap();
this.state.set(StateRC::Empty); // drop fut A
let fut = b.borrow_mut().1.call(res);
this.state.set(StateRC::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateRC::B(fut) => fut.poll(cx).map(|r| {
this.state.set(StateRC::Empty);
r
}),
StateRC::Empty => panic!("future must not be polled after it returned `Poll::Ready`"),
}
}
}
impl<A, B> AndThenRC<A, B> {
fn new(a: A, b: B) -> Self
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Rc::new(RefCell::new((a, b))))
}
}
/*
* AndThenRCFuture - AndThen service based on RefCell
impl<A, B> Clone for AndThenRC<A, B> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<A, B> Service for AndThenRC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
type Request = A::Request;
type Response = B::Response;
type Error = A::Error;
type Future = AndThenServiceResponseRC<A, B>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.0.borrow_mut().0.call(req);
AndThenServiceResponseRC {
state: StateRC::A(fut, Some(self.0.clone())),
}
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenServiceResponseRC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
#[pin]
state: StateRC<A, B>,
}
#[pin_project::pin_project(project = StateRCProj)]
enum StateRC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Rc<RefCell<(A, B)>>>),
B(#[pin] B::Future),
Empty,
}
impl<A, B> Future for AndThenServiceResponseRC<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
type Output = Result<B::Response, A::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateRCProj::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let b = b.take().unwrap();
this.state.set(StateRC::Empty); // drop fut A
let fut = b.borrow_mut().1.call(res);
this.state.set(StateRC::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateRCProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(StateRC::Empty);
r
}),
StateRCProj::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
/*
* AndThenRCFuture - AndThen service based on RefCell
* and standard futures::future::and_then combinator in a Box
*/
struct AndThenRCFuture<A,B>(Rc<RefCell<(A, B)>>);
struct AndThenRCFuture<A, B>(Rc<RefCell<(A, B)>>);
impl<A,B> AndThenRCFuture<A,B> {
fn new(a: A, b: B) -> Self
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Rc::new(RefCell::new((a,b))))
}
}
impl<A,B> Clone for AndThenRCFuture<A,B> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<A,B> Service for AndThenRCFuture<A,B>
where
A: Service + 'static,
A::Future: 'static,
B: Service<Request = A::Response, Error = A::Error> + 'static,
B::Future: 'static
{
type Request = A::Request;
type Response = B::Response;
type Error = A::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.0.borrow_mut().0.call(req);
let core = self.0.clone();
let fut2 = move |res| (*core).borrow_mut().1.call(res);
Box::pin(
fut.and_then(fut2)
)
}
}
impl<A, B> AndThenRCFuture<A, B> {
fn new(a: A, b: B) -> Self
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Rc::new(RefCell::new((a, b))))
}
}
impl<A, B> Clone for AndThenRCFuture<A, B> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<A, B> Service for AndThenRCFuture<A, B>
where
A: Service + 'static,
A::Future: 'static,
B: Service<Request = A::Response, Error = A::Error> + 'static,
B::Future: 'static,
{
type Request = A::Request;
type Response = B::Response;
type Error = A::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.0.borrow_mut().0.call(req);
let core = self.0.clone();
let fut2 = move |res| (*core).borrow_mut().1.call(res);
Box::pin(fut.and_then(fut2))
}
}
/// Criterion Benchmark for async Service
/// Should be used from within criterion group:
@@ -300,25 +292,41 @@ where
// exclude request generation, it appears it takes significant time vs call (3us vs 1us)
let start = std::time::Instant::now();
// benchmark body
rt.block_on(async move {
join_all(srvs.iter_mut().map(|srv| srv.call(()))).await
});
let elapsed = start.elapsed();
rt.block_on(async move { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await });
// check that at least first request succeeded
elapsed
start.elapsed()
})
});
}
pub fn service_benches() {
let mut criterion: ::criterion::Criterion<_> =
::criterion::Criterion::default().configure_from_args();
bench_async_service(&mut criterion, AndThenUC::new(svc1.into_service(), svc2.into_service()), "AndThen with UnsafeCell");
bench_async_service(&mut criterion, AndThenRC::new(svc1.into_service(), svc2.into_service()), "AndThen with RefCell");
bench_async_service(&mut criterion, AndThenUC::new(svc1.into_service(), svc2.into_service()), "AndThen with UnsafeCell");
bench_async_service(&mut criterion, AndThenRC::new(svc1.into_service(), svc2.into_service()), "AndThen with RefCell");
bench_async_service(&mut criterion, AndThenRCFuture::new(svc1.into_service(), svc2.into_service()), "AndThen with RefCell via future::and_then");
bench_async_service(
&mut criterion,
AndThenUC::new(svc1.into_service(), svc2.into_service()),
"AndThen with UnsafeCell",
);
bench_async_service(
&mut criterion,
AndThenRC::new(svc1.into_service(), svc2.into_service()),
"AndThen with RefCell",
);
bench_async_service(
&mut criterion,
AndThenUC::new(svc1.into_service(), svc2.into_service()),
"AndThen with UnsafeCell",
);
bench_async_service(
&mut criterion,
AndThenRC::new(svc1.into_service(), svc2.into_service()),
"AndThen with RefCell",
);
bench_async_service(
&mut criterion,
AndThenRCFuture::new(svc1.into_service(), svc2.into_service()),
"AndThen with RefCell via future::and_then",
);
}
criterion_main!(service_benches);

View File

@@ -1,73 +1,72 @@
use actix_service::Service;
use criterion::{criterion_main, Criterion};
use futures_util::future::join_all;
use std::cell::{RefCell, UnsafeCell};
use std::task::{Context, Poll};
use std::rc::Rc;
use actix_service::{Service};
use futures_util::future::{ok, Ready};
use std::cell::{RefCell, UnsafeCell};
use std::rc::Rc;
use std::task::{Context, Poll};
struct SrvUC(Rc<UnsafeCell<usize>>);
impl Default for SrvUC {
fn default() -> Self {
Self(Rc::new(UnsafeCell::new(0)))
}
fn default() -> Self {
Self(Rc::new(UnsafeCell::new(0)))
}
}
impl Clone for SrvUC {
fn clone(&self) -> Self {
Self(self.0.clone())
}
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl Service for SrvUC {
type Request = ();
type Response = usize;
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
type Request = ();
type Response = usize;
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
unsafe { *(*self.0).get() = *(*self.0).get() + 1 };
ok(unsafe { *self.0.get() })
}
fn call(&mut self, _: ()) -> Self::Future {
unsafe { *(*self.0).get() = *(*self.0).get() + 1 };
ok(unsafe { *self.0.get() })
}
}
struct SrvRC(Rc<RefCell<usize>>);
impl Default for SrvRC {
fn default() -> Self {
Self(Rc::new(RefCell::new(0)))
}
fn default() -> Self {
Self(Rc::new(RefCell::new(0)))
}
}
impl Clone for SrvRC {
fn clone(&self) -> Self {
Self(self.0.clone())
}
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl Service for SrvRC {
type Request = ();
type Response = usize;
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
type Request = ();
type Response = usize;
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
let prev = *self.0.borrow();
*(*self.0).borrow_mut() = prev + 1;
ok(*self.0.borrow())
}
fn call(&mut self, _: ()) -> Self::Future {
let prev = *self.0.borrow();
*(*self.0).borrow_mut() = prev + 1;
ok(*self.0.borrow())
}
}
/// Criterion Benchmark for async Service
/// Should be used from within criterion group:
/// ```rust,ignore
@@ -95,17 +94,13 @@ where
// exclude request generation, it appears it takes significant time vs call (3us vs 1us)
let start = std::time::Instant::now();
// benchmark body
rt.block_on(async move {
join_all(srvs.iter_mut().map(|srv| srv.call(()))).await
});
let elapsed = start.elapsed();
rt.block_on(async move { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await });
// check that at least first request succeeded
elapsed
start.elapsed()
})
});
}
pub fn service_benches() {
let mut criterion: ::criterion::Criterion<_> =
::criterion::Criterion::default().configure_from_args();

View File

@@ -1,16 +1,16 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
use crate::cell::Cell;
/// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully.
///
/// This is created by the `ServiceExt::and_then` method.
pub(crate) struct AndThenService<A, B>(Cell<(A, B)>);
pub(crate) struct AndThenService<A, B>(Rc<RefCell<(A, B)>>);
impl<A, B> AndThenService<A, B> {
/// Create new `AndThen` combinator
@@ -19,7 +19,7 @@ impl<A, B> AndThenService<A, B> {
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Cell::new((a, b)))
Self(Rc::new(RefCell::new((a, b))))
}
}
@@ -40,7 +40,7 @@ where
type Future = AndThenServiceResponse<A, B>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
let mut srv = self.0.borrow_mut();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
@@ -51,7 +51,7 @@ where
fn call(&mut self, req: A::Request) -> Self::Future {
AndThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),
state: State::A(self.0.borrow_mut().0.call(req), Some(self.0.clone())),
}
}
}
@@ -66,13 +66,13 @@ where
state: State<A, B>,
}
#[pin_project::pin_project]
#[pin_project::pin_project(project = StateProj)]
enum State<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Cell<(A, B)>>),
A(#[pin] A::Future, Option<Rc<RefCell<(A, B)>>>),
B(#[pin] B::Future),
Empty,
}
@@ -84,27 +84,27 @@ where
{
type Output = Result<B::Response, A::Error>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[project]
match this.state.as_mut().project() {
State::A(fut, b) => match fut.poll(cx)? {
StateProj::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let mut b = b.take().unwrap();
let b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.get_mut().1.call(res);
let fut = b.borrow_mut().1.call(res);
this.state.set(State::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
State::B(fut) => fut.poll(cx).map(|r| {
StateProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
State::Empty => panic!("future must not be polled after it returned `Poll::Ready`"),
StateProj::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}

View File

@@ -1,10 +1,10 @@
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use crate::cell::Cell;
use crate::{Service, ServiceFactory};
/// `Apply` service combinator
@@ -16,7 +16,7 @@ where
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
srv: Cell<(A, B, F)>,
srv: Rc<RefCell<(A, B, F)>>,
r: PhantomData<(Fut, Res, Err)>,
}
@@ -31,7 +31,7 @@ where
/// Create new `Apply` combinator
pub(crate) fn new(a: A, b: B, f: F) -> Self {
Self {
srv: Cell::new((a, b, f)),
srv: Rc::new(RefCell::new((a, b, f))),
r: PhantomData,
}
}
@@ -67,7 +67,7 @@ where
type Future = AndThenApplyFnFuture<A, B, F, Fut, Res, Err>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let inner = self.srv.get_mut();
let mut inner = self.srv.borrow_mut();
let not_ready = inner.0.poll_ready(cx)?.is_pending();
if inner.1.poll_ready(cx)?.is_pending() || not_ready {
Poll::Pending
@@ -77,7 +77,7 @@ where
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.srv.get_mut().0.call(req);
let fut = self.srv.borrow_mut().0.call(req);
AndThenApplyFnFuture {
state: State::A(fut, Some(self.srv.clone())),
}
@@ -98,7 +98,7 @@ where
state: State<A, B, F, Fut, Res, Err>,
}
#[pin_project::pin_project]
#[pin_project::pin_project(project = StateProj)]
enum State<A, B, F, Fut, Res, Err>
where
A: Service,
@@ -108,7 +108,7 @@ where
Err: From<A::Error>,
Err: From<B::Error>,
{
A(#[pin] A::Future, Option<Cell<(A, B, F)>>),
A(#[pin] A::Future, Option<Rc<RefCell<(A, B, F)>>>),
B(#[pin] Fut),
Empty,
}
@@ -123,28 +123,28 @@ where
{
type Output = Result<Res, Err>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[project]
match this.state.as_mut().project() {
State::A(fut, b) => match fut.poll(cx)? {
StateProj::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let mut b = b.take().unwrap();
let b = b.take().unwrap();
this.state.set(State::Empty);
let b = b.get_mut();
let fut = (&mut b.2)(res, &mut b.1);
let (_, b, f) = &mut *b.borrow_mut();
let fut = f(res, b);
this.state.set(State::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
State::B(fut) => fut.poll(cx).map(|r| {
StateProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
State::Empty => panic!("future must not be polled after it returned `Poll::Ready`"),
StateProj::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
@@ -257,11 +257,11 @@ where
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(AndThenApplyFn {
srv: Cell::new((
srv: Rc::new(RefCell::new((
this.a.take().unwrap(),
this.b.take().unwrap(),
this.f.clone(),
)),
))),
r: PhantomData,
}))
} else {
@@ -298,10 +298,9 @@ mod tests {
#[actix_rt::test]
async fn test_service() {
let mut srv = pipeline(ok)
.and_then_apply_fn(Srv, |req: &'static str, s| {
s.call(()).map_ok(move |res| (req, res))
});
let mut srv = pipeline(ok).and_then_apply_fn(Srv, |req: &'static str, s| {
s.call(()).map_ok(move |res| (req, res))
});
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
@@ -312,11 +311,10 @@ mod tests {
#[actix_rt::test]
async fn test_service_factory() {
let new_srv = pipeline_factory(|| ok::<_, ()>(fn_service(ok)))
.and_then_apply_fn(
|| ok(Srv),
|req: &'static str, s| s.call(()).map_ok(move |res| (req, res)),
);
let new_srv = pipeline_factory(|| ok::<_, ()>(fn_service(ok))).and_then_apply_fn(
|| ok(Srv),
|req: &'static str, s| s.call(()).map_ok(move |res| (req, res)),
);
let mut srv = new_srv.new_service(()).await.unwrap();
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));

View File

@@ -1,9 +1,10 @@
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use crate::cell::Cell;
use crate::{Service, ServiceFactory};
/// Convert `Fn(Config, &mut Service1) -> Future<Service2>` fn to a service factory
@@ -26,7 +27,7 @@ where
S: Service,
{
ApplyConfigService {
srv: Cell::new((srv, f)),
srv: Rc::new(RefCell::new((srv, f))),
_t: PhantomData,
}
}
@@ -53,7 +54,7 @@ where
S: Service,
{
ApplyConfigServiceFactory {
srv: Cell::new((factory, f)),
srv: Rc::new(RefCell::new((factory, f))),
_t: PhantomData,
}
}
@@ -66,7 +67,7 @@ where
R: Future<Output = Result<S, E>>,
S: Service,
{
srv: Cell<(T, F)>,
srv: Rc<RefCell<(T, F)>>,
_t: PhantomData<(C, R, S)>,
}
@@ -102,10 +103,8 @@ where
type Future = R;
fn new_service(&self, cfg: C) -> Self::Future {
unsafe {
let srv = self.srv.get_mut_unsafe();
(srv.1)(cfg, &mut srv.0)
}
let (t, f) = &mut *self.srv.borrow_mut();
f(cfg, t)
}
}
@@ -117,7 +116,7 @@ where
R: Future<Output = Result<S, T::InitError>>,
S: Service,
{
srv: Cell<(T, F)>,
srv: Rc<RefCell<(T, F)>>,
_t: PhantomData<(C, R, S)>,
}
@@ -157,7 +156,7 @@ where
ApplyConfigServiceFactoryResponse {
cfg: Some(cfg),
store: self.srv.clone(),
state: State::A(self.srv.get_ref().0.new_service(())),
state: State::A(self.srv.borrow().0.new_service(())),
}
}
}
@@ -172,12 +171,12 @@ where
S: Service,
{
cfg: Option<C>,
store: Cell<(T, F)>,
store: Rc<RefCell<(T, F)>>,
#[pin]
state: State<T, R, S>,
}
#[pin_project::pin_project]
#[pin_project::pin_project(project = StateProj)]
enum State<T, R, S>
where
T: ServiceFactory<Config = ()>,
@@ -200,28 +199,29 @@ where
{
type Output = Result<S, T::InitError>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[project]
match this.state.as_mut().project() {
State::A(fut) => match fut.poll(cx)? {
StateProj::A(fut) => match fut.poll(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(srv) => {
this.state.set(State::B(srv));
self.poll(cx)
}
},
State::B(srv) => match srv.poll_ready(cx)? {
StateProj::B(srv) => match srv.poll_ready(cx)? {
Poll::Ready(_) => {
let fut = (this.store.get_mut().1)(this.cfg.take().unwrap(), srv);
this.state.set(State::C(fut));
{
let (_, f) = &mut *this.store.borrow_mut();
let fut = f(this.cfg.take().unwrap(), srv);
this.state.set(State::C(fut));
}
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
State::C(fut) => fut.poll(cx),
StateProj::C(fut) => fut.poll(cx),
}
}
}

View File

@@ -1,57 +0,0 @@
//! Custom cell impl, internal use only
use std::task::{Context, Poll};
use std::{cell::UnsafeCell, fmt, rc::Rc};
pub(crate) struct Cell<T> {
inner: Rc<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Cell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> Cell<T> {
pub(crate) fn new(inner: T) -> Self {
Self {
inner: Rc::new(UnsafeCell::new(inner)),
}
}
pub(crate) fn get_ref(&self) -> &T {
unsafe { &*self.inner.as_ref().get() }
}
pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
&mut *self.inner.as_ref().get()
}
}
impl<T: crate::Service> crate::Service for Cell<T> {
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().poll_ready(cx)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.get_mut().call(req)
}
}

View File

@@ -29,7 +29,7 @@ where
/// /// Service that divides two usize values.
/// async fn div((x, y): (usize, usize)) -> Result<usize, io::Error> {
/// if y == 0 {
/// Err(io::Error::new(io::ErrorKind::Other, "divide by zdro"))
/// Err(io::Error::new(io::ErrorKind::Other, "divide by zero"))
/// } else {
/// Ok(x / y)
/// }

View File

@@ -1,3 +1,5 @@
//! See [`Service`](trait.Service.html) docs for information on this crate's foundational trait.
#![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)]
@@ -12,7 +14,6 @@ mod and_then_apply_fn;
mod apply;
mod apply_cfg;
pub mod boxed;
mod cell;
mod fn_service;
mod map;
mod map_config;
@@ -30,21 +31,23 @@ pub use self::map_config::{map_config, unit_config};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
pub use self::transform::{apply, Transform};
/// An asynchronous function from `Request` to a `Response`.
/// An asynchronous operation from `Request` to a `Response`.
///
/// `Service` represents a service that represanting interation, taking requests and giving back
/// replies. You can think about service as a function with one argument and result as a return
/// type. In general form it looks like `async fn(Req) -> Result<Res, Err>`. `Service`
/// trait just generalizing form of this function. Each parameter described as an assotiated type.
/// The `Service` trait models a request/response interaction, receiving requests and returning
/// replies. You can think about a service as a function with one argument that returns some result
/// asynchronously. Conceptually, the operation looks like this:
///
/// Services provides a symmetric and uniform API, same abstractions represents
/// clients and servers. Services describe only `transforamtion` operation
/// which encorouge to simplify api surface and phrases `value transformation`.
/// That leads to simplier design of each service. That also allows better testability
/// and better composition.
/// ```rust,ignore
/// async fn(Request) -> Result<Response, Err>
/// ```
///
/// Services could be represented in several different forms. In general,
/// Service is a type that implements `Service` trait.
/// The `Service` trait just generalizes this form where each parameter is described as an
/// associated type on the trait. Services can also have mutable state that influence computation.
///
/// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent
/// both clients and servers. Services describe only _transformation_ operations which encourage
/// simple API surfaces. This leads to simpler design of each service, improves test-ability and
/// makes composition easier.
///
/// ```rust,ignore
/// struct MyService;
@@ -53,7 +56,7 @@ pub use self::transform::{apply, Transform};
/// type Request = u8;
/// type Response = u64;
/// type Error = MyError;
/// type Future = Pin<Box<Future<Output=Result<Self::Response, Self::Error>>>;
/// type Future = Pin<Box<Future<Output=Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... }
///
@@ -61,8 +64,8 @@ pub use self::transform::{apply, Transform};
/// }
/// ```
///
/// Service can have mutable state that influence computation.
/// This service could be rewritten as a simple function:
/// Sometimes it is not necessary to implement the Service trait. For example, the above service
/// could be rewritten as a simple function and passed to [fn_service](fn.fn_service.html).
///
/// ```rust,ignore
/// async fn my_service(req: u8) -> Result<u64, MyError>;
@@ -90,11 +93,9 @@ pub trait Service {
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
///
/// There are several notes to consider:
///
/// # Notes
/// 1. `.poll_ready()` might be called on different task from actual service call.
///
/// 2. In case of chained services, `.poll_ready()` get called for all services at once.
/// 1. In case of chained services, `.poll_ready()` get called for all services at once.
fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Process the request and return the response asynchronously.
@@ -128,7 +129,7 @@ pub trait Service {
/// Map this service's error to a different error, returning a new service.
///
/// This function is similar to the `Result::map_err` where it will change
/// the error type of the underlying service. This is useful for example to
/// the error type of the underlying service. For example, this can be useful to
/// ensure that services have the same error type.
///
/// Note that this function consumes the receiving service and returns a
@@ -142,42 +143,42 @@ pub trait Service {
}
}
/// Creates new `Service` values.
/// Factory for creating `Service`s.
///
/// Acts as a service factory. This is useful for cases where new `Service`
/// values must be produced. One case is a TCP server listener. The listener
/// accepts new TCP streams, obtains a new `Service` value using the
/// `ServiceFactory` trait, and uses that new `Service` value to process inbound
/// Acts as a service factory. This is useful for cases where new `Service`s
/// must be produced. One case is a TCP server listener. The listener
/// accepts new TCP streams, obtains a new `Service` using the
/// `ServiceFactory` trait, and uses the new `Service` to process inbound
/// requests on that new TCP stream.
///
/// `Config` is a service factory configuration type.
pub trait ServiceFactory {
/// Requests handled by the service.
/// Requests handled by the created services.
type Request;
/// Responses given by the service
/// Responses given by the created services.
type Response;
/// Errors produced by the service
/// Errors produced by the created services.
type Error;
/// Service factory configuration
/// Service factory configuration.
type Config;
/// The `Service` value created by this factory
/// The kind of `Service` created by this factory.
type Service: Service<
Request = Self::Request,
Response = Self::Response,
Error = Self::Error,
>;
/// Errors produced while building a service.
/// Errors potentially raised while building a service.
type InitError;
/// The future of the `Service` instance.
type Future: Future<Output = Result<Self::Service, Self::InitError>>;
/// Create and return a new service value asynchronously.
/// Create and return a new service asynchronously.
fn new_service(&self, cfg: Self::Config) -> Self::Future;
/// Map this service's output to a different type, returning a new service

View File

@@ -1,16 +1,16 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
use crate::cell::Cell;
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `Pipeline::then` method.
pub(crate) struct ThenService<A, B>(Cell<(A, B)>);
pub(crate) struct ThenService<A, B>(Rc<RefCell<(A, B)>>);
impl<A, B> ThenService<A, B> {
/// Create new `.then()` combinator
@@ -19,7 +19,7 @@ impl<A, B> ThenService<A, B> {
A: Service,
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error>,
{
Self(Cell::new((a, b)))
Self(Rc::new(RefCell::new((a, b))))
}
}
@@ -40,7 +40,7 @@ where
type Future = ThenServiceResponse<A, B>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
let mut srv = self.0.borrow_mut();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
@@ -51,7 +51,7 @@ where
fn call(&mut self, req: A::Request) -> Self::Future {
ThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),
state: State::A(self.0.borrow_mut().0.call(req), Some(self.0.clone())),
}
}
}
@@ -66,13 +66,13 @@ where
state: State<A, B>,
}
#[pin_project::pin_project]
#[pin_project::pin_project(project = StateProj)]
enum State<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
A(#[pin] A::Future, Option<Cell<(A, B)>>),
A(#[pin] A::Future, Option<Rc<RefCell<(A, B)>>>),
B(#[pin] B::Future),
Empty,
}
@@ -84,27 +84,27 @@ where
{
type Output = Result<B::Response, B::Error>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[project]
match this.state.as_mut().project() {
State::A(fut, b) => match fut.poll(cx) {
StateProj::A(fut, b) => match fut.poll(cx) {
Poll::Ready(res) => {
let mut b = b.take().unwrap();
let b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.get_mut().1.call(res);
let fut = b.borrow_mut().1.call(res);
this.state.set(State::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
State::B(fut) => fut.poll(cx).map(|r| {
StateProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
State::Empty => panic!("future must not be polled after it returned `Poll::Ready`"),
StateProj::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}

View File

@@ -211,7 +211,7 @@ where
state: ApplyTransformFutureState<T, S>,
}
#[pin_project::pin_project]
#[pin_project::pin_project(project = ApplyTransformFutureStateProj)]
pub enum ApplyTransformFutureState<T, S>
where
S: ServiceFactory,
@@ -228,13 +228,11 @@ where
{
type Output = Result<T::Transform, T::InitError>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
#[project]
match this.state.as_mut().project() {
ApplyTransformFutureState::A(fut) => match fut.poll(cx)? {
ApplyTransformFutureStateProj::A(fut) => match fut.poll(cx)? {
Poll::Ready(srv) => {
let fut = this.store.0.new_transform(srv);
this.state.set(ApplyTransformFutureState::B(fut));
@@ -242,7 +240,7 @@ where
}
Poll::Pending => Poll::Pending,
},
ApplyTransformFutureState::B(fut) => fut.poll(cx),
ApplyTransformFutureStateProj::B(fut) => fut.poll(cx),
}
}
}

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-testing/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
workspace = ".."
readme = "README.md"

View File

@@ -1,5 +1,11 @@
# Changes
## [0.3.3] - 2020-07-14
### Changed
* Update parking_lot to 0.11
## [0.3.2] - 2020-05-20
## Added
@@ -12,7 +18,7 @@
### Changed
* Use parking_lot 0.10
* Update parking_lot to 0.10
## [0.3.0] - 2019-12-02

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-threadpool"
version = "0.3.2"
version = "0.3.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix thread pool for sync code"
keywords = ["actix", "network", "framework", "async", "futures"]
@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-threadpool/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = ".."
@@ -20,7 +20,7 @@ path = "src/lib.rs"
[dependencies]
derive_more = "0.99.2"
futures-channel = "0.3.1"
parking_lot = "0.10"
parking_lot = "0.11"
lazy_static = "1.3"
log = "0.4"
num_cpus = "1.10"

View File

@@ -1,5 +1,13 @@
# Changes
## [unreleased]
### Changed
* Update `rustls` dependency to 0.18
* Update `tokio-rustls` dependency to 0.14
* Update `webpki-roots` dependency to 0.20
## [2.0.0-alpha.1] - 2020-03-03
### Changed

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tls/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
workspace = ".."
@@ -46,10 +46,10 @@ open-ssl = { version="0.10", package = "openssl", optional = true }
tokio-openssl = { version = "0.4.0", optional = true }
# rustls
rust-tls = { version = "0.17.0", package = "rustls", optional = true }
rust-tls = { version = "0.18.0", package = "rustls", optional = true }
webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.19", optional = true }
tokio-rustls = { version = "0.13.0", optional = true }
webpki-roots = { version = "0.20", optional = true }
tokio-rustls = { version = "0.14.0", optional = true }
# native-tls
native-tls = { version="0.2", optional = true }

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tracing/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]

View File

@@ -1,5 +1,10 @@
# Changes
## Unreleased - 2020-xx-xx
* Upgrade `tokio-util` to `0.3`.
* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`.
## [1.0.6] - 2020-01-08
* Add `Clone` impl for `condition::Waiter`

View File

@@ -8,7 +8,7 @@ homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-utils/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]
@@ -25,6 +25,6 @@ either = "1.5.3"
futures-channel = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false }
pin-project = "0.4.6"
pin-project = "0.4.17"
log = "0.4"
slab = "0.4"

View File

@@ -1,48 +0,0 @@
//! Custom cell impl
use std::cell::UnsafeCell;
use std::fmt;
use std::rc::Rc;
pub(crate) struct Cell<T> {
pub(crate) inner: Rc<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for Cell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> Cell<T> {
pub(crate) fn new(inner: T) -> Self {
Self {
inner: Rc::new(UnsafeCell::new(inner)),
}
}
pub(crate) fn strong_count(&self) -> usize {
Rc::strong_count(&self.inner)
}
pub(crate) fn get_ref(&self) -> &T {
unsafe { &*self.inner.as_ref().get() }
}
pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
&mut *self.inner.as_ref().get()
}
}

View File

@@ -1,14 +1,15 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use slab::Slab;
use crate::cell::Cell;
use crate::task::LocalWaker;
/// Condition allows to notify multiple receivers at the same time
pub struct Condition(Cell<Inner>);
pub struct Condition(Rc<RefCell<Inner>>);
struct Inner {
data: Slab<Option<LocalWaker>>,
@@ -22,12 +23,12 @@ impl Default for Condition {
impl Condition {
pub fn new() -> Condition {
Condition(Cell::new(Inner { data: Slab::new() }))
Condition(Rc::new(RefCell::new(Inner { data: Slab::new() })))
}
/// Get condition waiter
pub fn wait(&mut self) -> Waiter {
let token = self.0.get_mut().data.insert(None);
let token = self.0.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.0.clone(),
@@ -36,7 +37,7 @@ impl Condition {
/// Notify all waiters
pub fn notify(&self) {
let inner = self.0.get_ref();
let inner = self.0.borrow();
for item in inner.data.iter() {
if let Some(waker) = item.1 {
waker.wake();
@@ -54,12 +55,12 @@ impl Drop for Condition {
#[must_use = "Waiter do nothing unless polled"]
pub struct Waiter {
token: usize,
inner: Cell<Inner>,
inner: Rc<RefCell<Inner>>,
}
impl Clone for Waiter {
fn clone(&self) -> Self {
let token = unsafe { self.inner.get_mut_unsafe() }.data.insert(None);
let token = self.inner.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.inner.clone(),
@@ -73,7 +74,8 @@ impl Future for Waiter {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let inner = unsafe { this.inner.get_mut().data.get_unchecked_mut(this.token) };
let mut inner = this.inner.borrow_mut();
let inner = unsafe { inner.data.get_unchecked_mut(this.token) };
if inner.is_none() {
let waker = LocalWaker::default();
waker.register(cx.waker());
@@ -89,7 +91,7 @@ impl Future for Waiter {
impl Drop for Waiter {
fn drop(&mut self) {
self.inner.get_mut().data.remove(self.token);
self.inner.borrow_mut().data.remove(self.token);
}
}

View File

@@ -3,7 +3,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory};
use futures_util::{future, ready, future::Future};
use futures_util::{future, future::Future, ready};
/// Combine two different service types into a single type.
///

View File

@@ -6,31 +6,28 @@ use std::{fmt, mem};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures_util::{future::Future, FutureExt, stream::Stream};
use futures_util::{future::Future, stream::Stream, FutureExt};
use log::debug;
use crate::mpsc;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
/// Framed transport errors
pub enum DispatcherError<E, U: Encoder + Decoder> {
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
Service(E),
Encoder(<U as Encoder>::Error),
Encoder(<U as Encoder<I>>::Error),
Decoder(<U as Decoder>::Error),
}
impl<E, U: Encoder + Decoder> From<E> for DispatcherError<E, U> {
impl<E, U: Encoder<I> + Decoder, I> From<E> for DispatcherError<E, U, I> {
fn from(err: E) -> Self {
DispatcherError::Service(err)
}
}
impl<E, U: Encoder + Decoder> fmt::Debug for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Debug for DispatcherError<E, U, I>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -42,10 +39,10 @@ where
}
}
impl<E, U: Encoder + Decoder> fmt::Display for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Display for DispatcherError<E, U, I>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -62,44 +59,44 @@ pub enum Message<T> {
Close,
}
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
#[pin_project::pin_project]
pub struct Dispatcher<S, T, U>
pub struct Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Encoder<I> + Decoder,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
service: S,
state: State<S, U>,
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
tx: mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
}
enum State<S: Service, U: Encoder + Decoder> {
enum State<S: Service, U: Encoder<I> + Decoder, I> {
Processing,
Error(DispatcherError<S::Error, U>),
FramedError(DispatcherError<S::Error, U>),
Error(DispatcherError<S::Error, U, I>),
FramedError(DispatcherError<S::Error, U, I>),
FlushAndStop,
Stopping,
}
impl<S: Service, U: Encoder + Decoder> State<S, U> {
fn take_error(&mut self) -> DispatcherError<S::Error, U> {
impl<S: Service, U: Encoder<I> + Decoder, I> State<S, U, I> {
fn take_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::Error(err) => err,
_ => panic!(),
}
}
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U> {
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::FramedError(err) => err,
_ => panic!(),
@@ -107,15 +104,16 @@ impl<S: Service, U: Encoder + Decoder> State<S, U> {
}
}
impl<S, T, U> Dispatcher<S, T, U>
impl<S, T, U, I> Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Decoder>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
let (tx, rx) = mpsc::channel();
@@ -132,7 +130,7 @@ where
pub fn with_rx<F: IntoService<S>>(
framed: Framed<T, U>,
service: F,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self {
let tx = rx.sender();
Dispatcher {
@@ -145,7 +143,7 @@ where
}
/// Get sink
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>> {
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
self.tx.clone()
}
@@ -172,13 +170,13 @@ where
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let this = self.as_mut().project();
@@ -214,13 +212,13 @@ where
/// write to framed object
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let mut this = self.as_mut().project();
@@ -263,18 +261,18 @@ where
}
}
impl<S, T, U> Future for Dispatcher<S, T, U>
impl<S, T, U, I> Future for Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Decoder>::Error: std::fmt::Debug,
{
type Output = Result<(), DispatcherError<S::Error, U>>;
type Output = Result<(), DispatcherError<S::Error, U, I>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {

View File

@@ -1,8 +1,7 @@
//! Actix utils - various helper services
#![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]
mod cell;
pub mod condition;
pub mod counter;
pub mod either;

View File

@@ -1,24 +1,25 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
use std::any::Any;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use futures_sink::Sink;
use futures_util::stream::Stream;
use crate::cell::Cell;
use crate::task::LocalWaker;
/// Creates a unbounded in-memory channel with buffered storage.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Cell::new(Shared {
let shared = Rc::new(RefCell::new(Shared {
has_receiver: true,
buffer: VecDeque::new(),
blocked_recv: LocalWaker::new(),
});
}));
let sender = Sender {
shared: shared.clone(),
};
@@ -38,7 +39,7 @@ struct Shared<T> {
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Sender<T> {
shared: Cell<Shared<T>>,
shared: Rc<RefCell<Shared<T>>>,
}
impl<T> Unpin for Sender<T> {}
@@ -46,7 +47,7 @@ impl<T> Unpin for Sender<T> {}
impl<T> Sender<T> {
/// Sends the provided message along this channel.
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
let shared = unsafe { self.shared.get_mut_unsafe() };
let mut shared = self.shared.borrow_mut();
if !shared.has_receiver {
return Err(SendError(item)); // receiver was dropped
};
@@ -60,7 +61,7 @@ impl<T> Sender<T> {
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.shared.get_mut().has_receiver = false;
self.shared.borrow_mut().has_receiver = false;
}
}
@@ -94,8 +95,8 @@ impl<T> Sink<T> for Sender<T> {
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let count = self.shared.strong_count();
let shared = self.shared.get_mut();
let count = Rc::strong_count(&self.shared);
let shared = self.shared.borrow_mut();
// check is last sender is about to drop
if shared.has_receiver && count == 2 {
@@ -110,7 +111,7 @@ impl<T> Drop for Sender<T> {
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Receiver<T> {
shared: Cell<Shared<T>>,
shared: Rc<RefCell<Shared<T>>>,
}
impl<T> Receiver<T> {
@@ -127,15 +128,16 @@ impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.shared.strong_count() == 1 {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut shared = self.shared.borrow_mut();
if Rc::strong_count(&self.shared) == 1 {
// All senders have been dropped, so drain the buffer and end the
// stream.
Poll::Ready(self.shared.get_mut().buffer.pop_front())
} else if let Some(msg) = self.shared.get_mut().buffer.pop_front() {
Poll::Ready(shared.buffer.pop_front())
} else if let Some(msg) = shared.buffer.pop_front() {
Poll::Ready(Some(msg))
} else {
self.shared.get_mut().blocked_recv.register(cx.waker());
shared.blocked_recv.register(cx.waker());
Poll::Pending
}
}
@@ -143,7 +145,7 @@ impl<T> Stream for Receiver<T> {
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let shared = self.shared.get_mut();
let mut shared = self.shared.borrow_mut();
shared.buffer.clear();
shared.has_receiver = false;
}

View File

@@ -1,20 +1,21 @@
//! A one-shot, futures-aware channel.
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
pub use futures_channel::oneshot::Canceled;
use slab::Slab;
use crate::cell::Cell;
use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Cell::new(Inner {
let inner = Rc::new(RefCell::new(Inner {
value: None,
rx_task: LocalWaker::new(),
});
}));
let tx = Sender {
inner: inner.clone(),
};
@@ -24,14 +25,14 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
/// Creates a new futures-aware, pool of one-shot's.
pub fn pool<T>() -> Pool<T> {
Pool(Cell::new(Slab::new()))
Pool(Rc::new(RefCell::new(Slab::new())))
}
/// Represents the completion half of a oneshot through which the result of a
/// computation is signaled.
#[derive(Debug)]
pub struct Sender<T> {
inner: Cell<Inner<T>>,
inner: Rc<RefCell<Inner<T>>>,
}
/// A future representing the completion of a computation happening elsewhere in
@@ -39,7 +40,7 @@ pub struct Sender<T> {
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
inner: Cell<Inner<T>>,
inner: Rc<RefCell<Inner<T>>>,
}
// The channels do not ever project Pin to the inner T
@@ -63,9 +64,9 @@ impl<T> Sender<T> {
/// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(mut self, val: T) -> Result<(), T> {
if self.inner.strong_count() == 2 {
let inner = self.inner.get_mut();
pub fn send(self, val: T) -> Result<(), T> {
if Rc::strong_count(&self.inner) == 2 {
let mut inner = self.inner.borrow_mut();
inner.value = Some(val);
inner.rx_task.wake();
Ok(())
@@ -77,13 +78,13 @@ impl<T> Sender<T> {
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
pub fn is_canceled(&self) -> bool {
self.inner.strong_count() == 1
Rc::strong_count(&self.inner) == 1
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.get_ref().rx_task.wake();
self.inner.borrow().rx_task.wake();
}
}
@@ -94,22 +95,22 @@ impl<T> Future for Receiver<T> {
let this = self.get_mut();
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = this.inner.get_mut().value.take() {
if let Some(val) = this.inner.borrow_mut().value.take() {
return Poll::Ready(Ok(val));
}
// Check if sender is dropped and return error if it is.
if this.inner.strong_count() == 1 {
if Rc::strong_count(&this.inner) == 1 {
Poll::Ready(Err(Canceled))
} else {
this.inner.get_ref().rx_task.register(cx.waker());
this.inner.borrow().rx_task.register(cx.waker());
Poll::Pending
}
}
}
/// Futures-aware, pool of one-shot's.
pub struct Pool<T>(Cell<Slab<PoolInner<T>>>);
pub struct Pool<T>(Rc<RefCell<Slab<PoolInner<T>>>>);
bitflags::bitflags! {
pub struct Flags: u8 {
@@ -127,7 +128,7 @@ struct PoolInner<T> {
impl<T> Pool<T> {
pub fn channel(&mut self) -> (PSender<T>, PReceiver<T>) {
let token = self.0.get_mut().insert(PoolInner {
let token = self.0.borrow_mut().insert(PoolInner {
flags: Flags::all(),
value: None,
waker: LocalWaker::default(),
@@ -157,7 +158,7 @@ impl<T> Clone for Pool<T> {
#[derive(Debug)]
pub struct PSender<T> {
token: usize,
inner: Cell<Slab<PoolInner<T>>>,
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}
/// A future representing the completion of a computation happening elsewhere in
@@ -166,7 +167,7 @@ pub struct PSender<T> {
#[must_use = "futures do nothing unless polled"]
pub struct PReceiver<T> {
token: usize,
inner: Cell<Slab<PoolInner<T>>>,
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}
// The oneshots do not ever project Pin to the inner T
@@ -184,8 +185,9 @@ impl<T> PSender<T> {
/// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(mut self, val: T) -> Result<(), T> {
let inner = unsafe { self.inner.get_mut().get_unchecked_mut(self.token) };
pub fn send(self, val: T) -> Result<(), T> {
let mut inner = self.inner.borrow_mut();
let inner = unsafe { inner.get_unchecked_mut(self.token) };
if inner.flags.contains(Flags::RECEIVER) {
inner.value = Some(val);
@@ -199,7 +201,7 @@ impl<T> PSender<T> {
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
pub fn is_canceled(&self) -> bool {
!unsafe { self.inner.get_ref().get_unchecked(self.token) }
!unsafe { self.inner.borrow().get_unchecked(self.token) }
.flags
.contains(Flags::RECEIVER)
}
@@ -207,23 +209,25 @@ impl<T> PSender<T> {
impl<T> Drop for PSender<T> {
fn drop(&mut self) {
let inner = unsafe { self.inner.get_mut().get_unchecked_mut(self.token) };
if inner.flags.contains(Flags::RECEIVER) {
inner.waker.wake();
inner.flags.remove(Flags::SENDER);
let mut inner = self.inner.borrow_mut();
let inner_token = unsafe { inner.get_unchecked_mut(self.token) };
if inner_token.flags.contains(Flags::RECEIVER) {
inner_token.waker.wake();
inner_token.flags.remove(Flags::SENDER);
} else {
self.inner.get_mut().remove(self.token);
inner.remove(self.token);
}
}
}
impl<T> Drop for PReceiver<T> {
fn drop(&mut self) {
let inner = unsafe { self.inner.get_mut().get_unchecked_mut(self.token) };
if inner.flags.contains(Flags::SENDER) {
inner.flags.remove(Flags::RECEIVER);
let mut inner = self.inner.borrow_mut();
let inner_token = unsafe { inner.get_unchecked_mut(self.token) };
if inner_token.flags.contains(Flags::SENDER) {
inner_token.flags.remove(Flags::RECEIVER);
} else {
self.inner.get_mut().remove(self.token);
inner.remove(self.token);
}
}
}
@@ -233,7 +237,8 @@ impl<T> Future for PReceiver<T> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let inner = unsafe { this.inner.get_mut().get_unchecked_mut(this.token) };
let mut inner = this.inner.borrow_mut();
let inner = unsafe { inner.get_unchecked_mut(this.token) };
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = inner.value.take() {

View File

@@ -3,7 +3,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service};
use futures_util::{FutureExt, stream::Stream};
use futures_util::{stream::Stream, FutureExt};
use crate::mpsc;

View File

@@ -1,4 +1,6 @@
use std::cell::RefCell;
use std::convert::Infallible;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
@@ -6,10 +8,8 @@ use actix_rt::time::delay_for;
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, ready, FutureExt, Ready};
use super::cell::Cell;
#[derive(Clone, Debug)]
pub struct LowResTime(Cell<Inner>);
pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
@@ -28,7 +28,7 @@ impl Inner {
impl LowResTime {
pub fn with(resolution: Duration) -> LowResTime {
LowResTime(Cell::new(Inner::new(resolution)))
LowResTime(Rc::new(RefCell::new(Inner::new(resolution))))
}
pub fn timer(&self) -> LowResTimeService {
@@ -38,7 +38,7 @@ impl LowResTime {
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Cell::new(Inner::new(Duration::from_secs(1))))
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1)))))
}
}
@@ -57,30 +57,30 @@ impl ServiceFactory for LowResTime {
}
#[derive(Clone, Debug)]
pub struct LowResTimeService(Cell<Inner>);
pub struct LowResTimeService(Rc<RefCell<Inner>>);
impl LowResTimeService {
pub fn with(resolution: Duration) -> LowResTimeService {
LowResTimeService(Cell::new(Inner::new(resolution)))
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> Instant {
let cur = self.0.get_ref().current;
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = Instant::now();
let mut inner = self.0.clone();
let inner = self.0.clone();
let interval = {
let mut b = inner.get_mut();
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
actix_rt::spawn(delay_for(interval).then(move |_| {
inner.get_mut().current.take();
inner.borrow_mut().current.take();
ready(())
}));
now
@@ -104,7 +104,7 @@ impl Service for LowResTimeService {
}
#[derive(Clone, Debug)]
pub struct SystemTime(Cell<SystemTimeInner>);
pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
@@ -122,30 +122,30 @@ impl SystemTimeInner {
}
#[derive(Clone, Debug)]
pub struct SystemTimeService(Cell<SystemTimeInner>);
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
pub fn with(resolution: Duration) -> SystemTimeService {
SystemTimeService(Cell::new(SystemTimeInner::new(resolution)))
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> time::SystemTime {
let cur = self.0.get_ref().current;
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = time::SystemTime::now();
let mut inner = self.0.clone();
let inner = self.0.clone();
let interval = {
let mut b = inner.get_mut();
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
actix_rt::spawn(delay_for(interval).then(move |_| {
inner.get_mut().current.take();
inner.borrow_mut().current.take();
ready(())
}));
now

View File

@@ -7,7 +7,7 @@ keywords = ["actix"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-router/"
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]

View File

@@ -7,9 +7,13 @@ use crate::ResourcePath;
macro_rules! unsupported_type {
($trait_fn:ident, $name:expr) => {
fn $trait_fn<V>(self, _: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
where
V: Visitor<'de>,
{
Err(de::value::Error::custom(concat!("unsupported type: ", $name)))
Err(de::value::Error::custom(concat!(
"unsupported type: ",
$name
)))
}
};
}
@@ -17,20 +21,25 @@ macro_rules! unsupported_type {
macro_rules! parse_single_value {
($trait_fn:ident, $visit_fn:ident, $tp:tt) => {
fn $trait_fn<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
where
V: Visitor<'de>,
{
if self.path.len() != 1 {
Err(de::value::Error::custom(
format!("wrong number of parameters: {} expected 1",
self.path.len()).as_str()))
format!("wrong number of parameters: {} expected 1", self.path.len())
.as_str(),
))
} else {
let v = self.path[0].parse().map_err(
|_| de::value::Error::custom(
format!("can not parse {:?} to a {}", &self.path[0], $tp)))?;
let v = self.path[0].parse().map_err(|_| {
de::value::Error::custom(format!(
"can not parse {:?} to a {}",
&self.path[0], $tp
))
})?;
visitor.$visit_fn(v)
}
}
}
};
}
pub struct PathDeserializer<'de, T: ResourcePath + 'de> {
@@ -268,14 +277,15 @@ impl<'de> Deserializer<'de> for Key<'de> {
macro_rules! parse_value {
($trait_fn:ident, $visit_fn:ident, $tp:tt) => {
fn $trait_fn<V>(self, visitor: V) -> Result<V::Value, Self::Error>
where V: Visitor<'de>
where
V: Visitor<'de>,
{
let v = self.value.parse().map_err(
|_| de::value::Error::custom(
format!("can not parse {:?} to a {}", self.value, $tp)))?;
let v = self.value.parse().map_err(|_| {
de::value::Error::custom(format!("can not parse {:?} to a {}", self.value, $tp))
})?;
visitor.$visit_fn(v)
}
}
};
}
struct Value<'de> {

View File

@@ -37,7 +37,6 @@ impl ResourcePath for bytestring::ByteString {
/// Helper trait for type that could be converted to path pattern
pub trait IntoPattern {
/// Signle patter
fn is_single(&self) -> bool;
fn patterns(&self) -> Vec<String>;

View File

@@ -7,7 +7,7 @@ keywords = ["actix"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/bytestring/"
license = "MIT/Apache-2.0"
license = "MIT OR Apache-2.0"
edition = "2018"
[lib]

View File

@@ -33,6 +33,13 @@ impl ByteString {
}
/// Creates a new `ByteString` from a Bytes.
///
/// # Safety
/// This function is unsafe because it does not check the bytes passed to it
/// are valid UTF-8. If this constraint is violated,
/// it may cause memory unsafety issues with future users of the `ByteString`,
/// as we assume that `ByteString`s are valid UTF-8.
/// However, the most likely issue is that the data gets corrupted.
pub const unsafe fn from_bytes_unchecked(src: Bytes) -> ByteString {
Self(src)
}