1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-13 01:37:05 +02:00

Compare commits

...

5 Commits

Author SHA1 Message Date
Rob Ede
d28687d0d7 promote codec/utils out of beta (#184) 2020-08-24 09:18:37 +01:00
Rob Ede
27c6be9881 remove unused type parameter from Framed::replace_codec (#183) 2020-08-20 00:30:26 +01:00
Rob Ede
119dc39f5b prepare codec and utils betas (#182) 2020-08-19 11:00:12 +01:00
Rob Ede
b3010c13e0 solve framed integration with actix-http (#179) 2020-08-18 23:27:37 +01:00
Adrian Wechner
fecdfcd8d4 assert workers greater than zero (#167) 2020-08-18 16:44:22 +01:00
20 changed files with 121 additions and 96 deletions

View File

@@ -1,10 +1,21 @@
# Changes
## Unreleased - 2020-xx-xx
## 0.3.0 - 2020-08-23
* No changes from beta 2.
## 0.3.0-beta.2 - 2020-08-19
* Remove unused type parameter from `Framed::replace_codec`.
## 0.3.0-beta.1 - 2020-08-19
* Use `.advance()` instead of `.split_to()`.
* Upgrade `tokio-util` to `0.3`.
* Improve `BytesCodec` `.encode()` performance
* Simplify `BytesCodec` `.decode()`
* Rename methods on `Framed` to better describe their use.
* Add method on `Framed` to get a pinned reference to the underlying I/O.
* Add method on `Framed` check emptiness of read buffer.
## [0.2.0] - 2019-12-10

View File

@@ -1,8 +1,8 @@
[package]
name = "actix-codec"
version = "0.2.0"
version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Utilities for encoding and decoding frames"
description = "Codec utilities for working with framed protocols."
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
@@ -10,7 +10,6 @@ documentation = "https://docs.rs/actix-codec/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
workspace = ".."
[lib]
name = "actix_codec"
@@ -21,7 +20,7 @@ bitflags = "1.2.1"
bytes = "0.5.2"
futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }
log = "0.4"
pin-project = "0.4.17"
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }

View File

@@ -23,6 +23,12 @@ bitflags::bitflags! {
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder`
/// traits to handle encoding and decoding of message frames. Note that
/// the incoming and outgoing frame types may be distinct.
#[pin_project]
pub struct Framed<T, U> {
#[pin]
@@ -38,15 +44,6 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
@@ -63,40 +60,13 @@ where
}
impl<T, U> Framed<T, U> {
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// This objects takes a stream and a readbuffer and a writebuffer. These
/// field can be obtained from an existing `Framed` with the
/// `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}
/// Returns a reference to the underlying codec.
pub fn get_codec(&self) -> &U {
pub fn codec_ref(&self) -> &U {
&self.codec
}
/// Returns a mutable reference to the underlying codec.
pub fn get_codec_mut(&mut self) -> &mut U {
pub fn codec_mut(&mut self) -> &mut U {
&mut self.codec
}
@@ -106,20 +76,29 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
pub fn io_ref(&self) -> &T {
&self.io
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Frame`.
/// Returns a mutable reference to the underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
pub fn io_mut(&mut self) -> &mut T {
&mut self.io
}
/// Returns a `Pin` of a mutable reference to the underlying I/O stream.
pub fn io_pin(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().io
}
/// Check if read buffer is empty.
pub fn is_read_buf_empty(&self) -> bool {
self.read_buf.is_empty()
}
/// Check if write buffer is empty.
pub fn is_write_buf_empty(&self) -> bool {
self.write_buf.is_empty()
@@ -130,8 +109,15 @@ impl<T, U> Framed<T, U> {
self.write_buf.len() >= HW
}
/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
pub fn replace_codec<U2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
io: self.io,
@@ -142,7 +128,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
pub fn into_map_io<F, T2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
@@ -156,7 +142,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
pub fn into_map_codec<F, U2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
@@ -168,22 +154,6 @@ impl<T, U> Framed<T, U> {
write_buf: self.write_buf,
}
}
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
}
impl<T, U> Framed<T, U> {
@@ -203,13 +173,6 @@ impl<T, U> Framed<T, U> {
Ok(())
}
/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}
/// Try to read underlying I/O stream and decode item.
pub fn next_item(
mut self: Pin<&mut Self>,
@@ -376,6 +339,41 @@ where
}
}
impl<T, U> Framed<T, U> {
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// These objects take a stream, a read buffer and a write buffer. These
/// fields can be obtained from an existing `Framed` with the `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
}
/// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new `Framed` with a different codec.
/// It contains all current buffers and the inner transport.

View File

@@ -8,7 +8,9 @@
//! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms)]
#![warn(missing_docs)]
mod bcodec;
mod framed;

View File

@@ -32,8 +32,8 @@ uri = ["http"]
[dependencies]
actix-service = "1.0.3"
actix-codec = "0.2.0"
actix-utils = "1.0.6"
actix-codec = "0.3.0"
actix-utils = "2.0.0"
actix-rt = "1.0.0"
derive_more = "0.99.2"
either = "1.5.3"

View File

@@ -1,5 +1,11 @@
# Changes
## Unreleased
### Changed
* workers must be greater than 0
## [1.0.3] - 2020-05-19
### Changed

View File

@@ -23,8 +23,8 @@ default = []
[dependencies]
actix-service = "1.0.1"
actix-rt = "1.0.0"
actix-codec = "0.2.0"
actix-utils = "1.0.4"
actix-codec = "0.3.0"
actix-utils = "2.0.0"
log = "0.4"
num_cpus = "1.11"

View File

@@ -72,8 +72,9 @@ impl ServerBuilder {
/// Set number of workers to start.
///
/// By default server uses number of available logical cpu as workers
/// count.
/// count. Workers must be greater than 0.
pub fn workers(mut self, num: usize) -> Self {
assert_ne!(num, 0, "workers must be greater than 0");
self.threads = num;
self
}

View File

@@ -33,8 +33,8 @@ nativetls = ["native-tls", "tokio-tls"]
[dependencies]
actix-service = "1.0.0"
actix-codec = "0.2.0"
actix-utils = "1.0.0"
actix-codec = "0.3.0"
actix-utils = "2.0.0"
actix-rt = "1.0.0"
derive_more = "0.99.2"
either = "1.5.2"

View File

@@ -2,8 +2,13 @@
## Unreleased - 2020-xx-xx
## 2.0.0 - 2020-08-23
* No changes from beta 1.
## 2.0.0-beta.1 - 2020-08-19
* Upgrade `tokio-util` to `0.3`.
* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`.
* Rename method to correctly spelled `LocalWaker::is_registered`.
## [1.0.6] - 2020-01-08

View File

@@ -1,8 +1,8 @@
[package]
name = "actix-utils"
version = "1.0.6"
version = "2.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services"
description = "Various network related services and utilities for the Actix ecosystem."
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
@@ -16,15 +16,15 @@ name = "actix_utils"
path = "src/lib.rs"
[dependencies]
actix-service = "1.0.1"
actix-rt = "1.0.0"
actix-codec = "0.2.0"
bitflags = "1.2"
actix-codec = "0.3.0"
actix-rt = "1.1.1"
actix-service = "1.0.6"
bitflags = "1.2.1"
bytes = "0.5.3"
either = "1.5.3"
futures-channel = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false }
pin-project = "0.4.17"
log = "0.4"
pin-project = "0.4.17"
slab = "0.4"

View File

@@ -7,7 +7,7 @@ use crate::task::LocalWaker;
#[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number
///
/// Counter could be cloned, total ncount is shared across all clones.
/// Counter could be cloned, total n-count is shared across all clones.
pub struct Counter(Rc<CounterInner>);
struct CounterInner {

View File

@@ -1,5 +1,7 @@
//! Framed dispatcher service and related utilities
#![allow(type_alias_bounds)]
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, mem};

View File

@@ -152,7 +152,7 @@ mod tests {
}
#[actix_rt::test]
async fn test_newtransform() {
async fn test_new_transform() {
let wait_time = Duration::from_millis(50);
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));

View File

@@ -1,11 +1,12 @@
//! Actix utils - various helper services
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]
pub mod condition;
pub mod counter;
pub mod dispatcher;
pub mod either;
pub mod framed;
pub mod inflight;
pub mod keepalive;
pub mod mpsc;

View File

@@ -170,7 +170,7 @@ pub struct PReceiver<T> {
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}
// The oneshots do not ever project Pin to the inner T
// The one-shots do not ever project Pin to the inner T
impl<T> Unpin for PReceiver<T> {}
impl<T> Unpin for PSender<T> {}

View File

@@ -231,7 +231,7 @@ mod tests {
}
#[actix_rt::test]
async fn test_inorder() {
async fn test_in_order() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();

View File

@@ -36,7 +36,7 @@ impl LocalWaker {
#[inline]
/// Check if waker has been registered.
pub fn is_registed(&self) -> bool {
pub fn is_registered(&self) -> bool {
unsafe { (*self.waker.get()).is_some() }
}

View File

@@ -173,7 +173,7 @@ mod tests {
///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[actix_rt::test]
async fn lowres_time_service_time_does_not_immediately_change() {
async fn low_res_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
@@ -210,7 +210,7 @@ mod tests {
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test]
async fn lowres_time_service_time_updates_after_resolution_interval() {
async fn low_res_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution);

View File

@@ -220,7 +220,7 @@ mod tests {
}
#[actix_rt::test]
async fn test_timeout_newservice() {
async fn test_timeout_new_service() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(500);