mirror of
https://github.com/fafhrd91/actix-net
synced 2025-02-17 13:33:31 +01:00
solve framed integration with actix-http (#179)
This commit is contained in:
parent
fecdfcd8d4
commit
b3010c13e0
@ -5,6 +5,9 @@
|
|||||||
* Upgrade `tokio-util` to `0.3`.
|
* Upgrade `tokio-util` to `0.3`.
|
||||||
* Improve `BytesCodec` `.encode()` performance
|
* Improve `BytesCodec` `.encode()` performance
|
||||||
* Simplify `BytesCodec` `.decode()`
|
* Simplify `BytesCodec` `.decode()`
|
||||||
|
* Rename methods on `Framed` to better describe their use.
|
||||||
|
* Add method on `Framed` to get a pinned reference to the underlying I/O.
|
||||||
|
* Add method on `Framed` check emptiness of read buffer.
|
||||||
|
|
||||||
## [0.2.0] - 2019-12-10
|
## [0.2.0] - 2019-12-10
|
||||||
|
|
||||||
|
@ -23,6 +23,12 @@ bitflags::bitflags! {
|
|||||||
|
|
||||||
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
|
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
|
||||||
/// the `Encoder` and `Decoder` traits to encode and decode frames.
|
/// the `Encoder` and `Decoder` traits to encode and decode frames.
|
||||||
|
///
|
||||||
|
/// Raw I/O objects work with byte sequences, but higher-level code usually
|
||||||
|
/// wants to batch these into meaningful chunks, called "frames". This
|
||||||
|
/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder`
|
||||||
|
/// traits to handle encoding and decoding of message frames. Note that
|
||||||
|
/// the incoming and outgoing frame types may be distinct.
|
||||||
#[pin_project]
|
#[pin_project]
|
||||||
pub struct Framed<T, U> {
|
pub struct Framed<T, U> {
|
||||||
#[pin]
|
#[pin]
|
||||||
@ -38,15 +44,6 @@ where
|
|||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite,
|
||||||
U: Decoder,
|
U: Decoder,
|
||||||
{
|
{
|
||||||
/// Provides a `Stream` and `Sink` interface for reading and writing to this
|
|
||||||
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
|
|
||||||
///
|
|
||||||
/// Raw I/O objects work with byte sequences, but higher-level code usually
|
|
||||||
/// wants to batch these into meaningful chunks, called "frames". This
|
|
||||||
/// method layers framing on top of an I/O object, by using the `Codec`
|
|
||||||
/// traits to handle encoding and decoding of messages frames. Note that
|
|
||||||
/// the incoming and outgoing frame types may be distinct.
|
|
||||||
///
|
|
||||||
/// This function returns a *single* object that is both `Stream` and
|
/// This function returns a *single* object that is both `Stream` and
|
||||||
/// `Sink`; grouping this into a single object is often useful for layering
|
/// `Sink`; grouping this into a single object is often useful for layering
|
||||||
/// things like gzip or TLS, which require both read and write access to the
|
/// things like gzip or TLS, which require both read and write access to the
|
||||||
@ -63,40 +60,13 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T, U> Framed<T, U> {
|
impl<T, U> Framed<T, U> {
|
||||||
/// Provides a `Stream` and `Sink` interface for reading and writing to this
|
|
||||||
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
|
|
||||||
///
|
|
||||||
/// Raw I/O objects work with byte sequences, but higher-level code usually
|
|
||||||
/// wants to batch these into meaningful chunks, called "frames". This
|
|
||||||
/// method layers framing on top of an I/O object, by using the `Codec`
|
|
||||||
/// traits to handle encoding and decoding of messages frames. Note that
|
|
||||||
/// the incoming and outgoing frame types may be distinct.
|
|
||||||
///
|
|
||||||
/// This function returns a *single* object that is both `Stream` and
|
|
||||||
/// `Sink`; grouping this into a single object is often useful for layering
|
|
||||||
/// things like gzip or TLS, which require both read and write access to the
|
|
||||||
/// underlying object.
|
|
||||||
///
|
|
||||||
/// This objects takes a stream and a readbuffer and a writebuffer. These
|
|
||||||
/// field can be obtained from an existing `Framed` with the
|
|
||||||
/// `into_parts` method.
|
|
||||||
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
|
|
||||||
Framed {
|
|
||||||
io: parts.io,
|
|
||||||
codec: parts.codec,
|
|
||||||
flags: parts.flags,
|
|
||||||
write_buf: parts.write_buf,
|
|
||||||
read_buf: parts.read_buf,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a reference to the underlying codec.
|
/// Returns a reference to the underlying codec.
|
||||||
pub fn get_codec(&self) -> &U {
|
pub fn codec_ref(&self) -> &U {
|
||||||
&self.codec
|
&self.codec
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a mutable reference to the underlying codec.
|
/// Returns a mutable reference to the underlying codec.
|
||||||
pub fn get_codec_mut(&mut self) -> &mut U {
|
pub fn codec_mut(&mut self) -> &mut U {
|
||||||
&mut self.codec
|
&mut self.codec
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,20 +76,29 @@ impl<T, U> Framed<T, U> {
|
|||||||
/// Note that care should be taken to not tamper with the underlying stream
|
/// Note that care should be taken to not tamper with the underlying stream
|
||||||
/// of data coming in as it may corrupt the stream of frames otherwise
|
/// of data coming in as it may corrupt the stream of frames otherwise
|
||||||
/// being worked with.
|
/// being worked with.
|
||||||
pub fn get_ref(&self) -> &T {
|
pub fn io_ref(&self) -> &T {
|
||||||
&self.io
|
&self.io
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a mutable reference to the underlying I/O stream wrapped by
|
/// Returns a mutable reference to the underlying I/O stream.
|
||||||
/// `Frame`.
|
|
||||||
///
|
///
|
||||||
/// Note that care should be taken to not tamper with the underlying stream
|
/// Note that care should be taken to not tamper with the underlying stream
|
||||||
/// of data coming in as it may corrupt the stream of frames otherwise
|
/// of data coming in as it may corrupt the stream of frames otherwise
|
||||||
/// being worked with.
|
/// being worked with.
|
||||||
pub fn get_mut(&mut self) -> &mut T {
|
pub fn io_mut(&mut self) -> &mut T {
|
||||||
&mut self.io
|
&mut self.io
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a `Pin` of a mutable reference to the underlying I/O stream.
|
||||||
|
pub fn io_pin(self: Pin<&mut Self>) -> Pin<&mut T> {
|
||||||
|
self.project().io
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if read buffer is empty.
|
||||||
|
pub fn is_read_buf_empty(&self) -> bool {
|
||||||
|
self.read_buf.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
/// Check if write buffer is empty.
|
/// Check if write buffer is empty.
|
||||||
pub fn is_write_buf_empty(&self) -> bool {
|
pub fn is_write_buf_empty(&self) -> bool {
|
||||||
self.write_buf.is_empty()
|
self.write_buf.is_empty()
|
||||||
@ -130,8 +109,15 @@ impl<T, U> Framed<T, U> {
|
|||||||
self.write_buf.len() >= HW
|
self.write_buf.len() >= HW
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if framed is able to write more data.
|
||||||
|
///
|
||||||
|
/// `Framed` object considers ready if there is free space in write buffer.
|
||||||
|
pub fn is_write_ready(&self) -> bool {
|
||||||
|
self.write_buf.len() < HW
|
||||||
|
}
|
||||||
|
|
||||||
/// Consume the `Frame`, returning `Frame` with different codec.
|
/// Consume the `Frame`, returning `Frame` with different codec.
|
||||||
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
|
pub fn replace_codec<U2, I2>(self, codec: U2) -> Framed<T, U2> {
|
||||||
Framed {
|
Framed {
|
||||||
codec,
|
codec,
|
||||||
io: self.io,
|
io: self.io,
|
||||||
@ -142,7 +128,7 @@ impl<T, U> Framed<T, U> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Consume the `Frame`, returning `Frame` with different io.
|
/// Consume the `Frame`, returning `Frame` with different io.
|
||||||
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
|
pub fn into_map_io<F, T2>(self, f: F) -> Framed<T2, U>
|
||||||
where
|
where
|
||||||
F: Fn(T) -> T2,
|
F: Fn(T) -> T2,
|
||||||
{
|
{
|
||||||
@ -156,7 +142,7 @@ impl<T, U> Framed<T, U> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Consume the `Frame`, returning `Frame` with different codec.
|
/// Consume the `Frame`, returning `Frame` with different codec.
|
||||||
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
|
pub fn into_map_codec<F, U2>(self, f: F) -> Framed<T, U2>
|
||||||
where
|
where
|
||||||
F: Fn(U) -> U2,
|
F: Fn(U) -> U2,
|
||||||
{
|
{
|
||||||
@ -168,22 +154,6 @@ impl<T, U> Framed<T, U> {
|
|||||||
write_buf: self.write_buf,
|
write_buf: self.write_buf,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
|
|
||||||
/// with unprocessed data, and the codec.
|
|
||||||
///
|
|
||||||
/// Note that care should be taken to not tamper with the underlying stream
|
|
||||||
/// of data coming in as it may corrupt the stream of frames otherwise
|
|
||||||
/// being worked with.
|
|
||||||
pub fn into_parts(self) -> FramedParts<T, U> {
|
|
||||||
FramedParts {
|
|
||||||
io: self.io,
|
|
||||||
codec: self.codec,
|
|
||||||
flags: self.flags,
|
|
||||||
read_buf: self.read_buf,
|
|
||||||
write_buf: self.write_buf,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, U> Framed<T, U> {
|
impl<T, U> Framed<T, U> {
|
||||||
@ -203,13 +173,6 @@ impl<T, U> Framed<T, U> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if framed is able to write more data.
|
|
||||||
///
|
|
||||||
/// `Framed` object considers ready if there is free space in write buffer.
|
|
||||||
pub fn is_write_ready(&self) -> bool {
|
|
||||||
self.write_buf.len() < HW
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to read underlying I/O stream and decode item.
|
/// Try to read underlying I/O stream and decode item.
|
||||||
pub fn next_item(
|
pub fn next_item(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
@ -376,6 +339,41 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T, U> Framed<T, U> {
|
||||||
|
/// This function returns a *single* object that is both `Stream` and
|
||||||
|
/// `Sink`; grouping this into a single object is often useful for layering
|
||||||
|
/// things like gzip or TLS, which require both read and write access to the
|
||||||
|
/// underlying object.
|
||||||
|
///
|
||||||
|
/// These objects take a stream, a read buffer and a write buffer. These
|
||||||
|
/// fields can be obtained from an existing `Framed` with the `into_parts` method.
|
||||||
|
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
|
||||||
|
Framed {
|
||||||
|
io: parts.io,
|
||||||
|
codec: parts.codec,
|
||||||
|
flags: parts.flags,
|
||||||
|
write_buf: parts.write_buf,
|
||||||
|
read_buf: parts.read_buf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
|
||||||
|
/// with unprocessed data, and the codec.
|
||||||
|
///
|
||||||
|
/// Note that care should be taken to not tamper with the underlying stream
|
||||||
|
/// of data coming in as it may corrupt the stream of frames otherwise
|
||||||
|
/// being worked with.
|
||||||
|
pub fn into_parts(self) -> FramedParts<T, U> {
|
||||||
|
FramedParts {
|
||||||
|
io: self.io,
|
||||||
|
codec: self.codec,
|
||||||
|
flags: self.flags,
|
||||||
|
read_buf: self.read_buf,
|
||||||
|
write_buf: self.write_buf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// `FramedParts` contains an export of the data of a Framed transport.
|
/// `FramedParts` contains an export of the data of a Framed transport.
|
||||||
/// It can be used to construct a new `Framed` with a different codec.
|
/// It can be used to construct a new `Framed` with a different codec.
|
||||||
/// It contains all current buffers and the inner transport.
|
/// It contains all current buffers and the inner transport.
|
||||||
|
@ -8,7 +8,9 @@
|
|||||||
//! [`AsyncWrite`]: AsyncWrite
|
//! [`AsyncWrite`]: AsyncWrite
|
||||||
//! [`Sink`]: futures_sink::Sink
|
//! [`Sink`]: futures_sink::Sink
|
||||||
//! [`Stream`]: futures_core::Stream
|
//! [`Stream`]: futures_core::Stream
|
||||||
|
|
||||||
#![deny(rust_2018_idioms)]
|
#![deny(rust_2018_idioms)]
|
||||||
|
#![warn(missing_docs)]
|
||||||
|
|
||||||
mod bcodec;
|
mod bcodec;
|
||||||
mod framed;
|
mod framed;
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
|
|
||||||
* Upgrade `tokio-util` to `0.3`.
|
* Upgrade `tokio-util` to `0.3`.
|
||||||
* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`.
|
* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`.
|
||||||
|
* Rename method to correctly spelled `LocalWaker::is_registered`.
|
||||||
|
|
||||||
## [1.0.6] - 2020-01-08
|
## [1.0.6] - 2020-01-08
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ use crate::task::LocalWaker;
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
/// Simple counter with ability to notify task on reaching specific number
|
/// Simple counter with ability to notify task on reaching specific number
|
||||||
///
|
///
|
||||||
/// Counter could be cloned, total ncount is shared across all clones.
|
/// Counter could be cloned, total n-count is shared across all clones.
|
||||||
pub struct Counter(Rc<CounterInner>);
|
pub struct Counter(Rc<CounterInner>);
|
||||||
|
|
||||||
struct CounterInner {
|
struct CounterInner {
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
//! Framed dispatcher service and related utilities
|
//! Framed dispatcher service and related utilities
|
||||||
|
|
||||||
#![allow(type_alias_bounds)]
|
#![allow(type_alias_bounds)]
|
||||||
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::{fmt, mem};
|
use std::{fmt, mem};
|
@ -152,7 +152,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_newtransform() {
|
async fn test_new_transform() {
|
||||||
let wait_time = Duration::from_millis(50);
|
let wait_time = Duration::from_millis(50);
|
||||||
|
|
||||||
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
|
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
|
||||||
|
@ -1,11 +1,12 @@
|
|||||||
//! Actix utils - various helper services
|
//! Actix utils - various helper services
|
||||||
|
|
||||||
#![deny(rust_2018_idioms)]
|
#![deny(rust_2018_idioms)]
|
||||||
#![allow(clippy::type_complexity)]
|
#![allow(clippy::type_complexity)]
|
||||||
|
|
||||||
pub mod condition;
|
pub mod condition;
|
||||||
pub mod counter;
|
pub mod counter;
|
||||||
|
pub mod dispatcher;
|
||||||
pub mod either;
|
pub mod either;
|
||||||
pub mod framed;
|
|
||||||
pub mod inflight;
|
pub mod inflight;
|
||||||
pub mod keepalive;
|
pub mod keepalive;
|
||||||
pub mod mpsc;
|
pub mod mpsc;
|
||||||
|
@ -170,7 +170,7 @@ pub struct PReceiver<T> {
|
|||||||
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
|
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// The oneshots do not ever project Pin to the inner T
|
// The one-shots do not ever project Pin to the inner T
|
||||||
impl<T> Unpin for PReceiver<T> {}
|
impl<T> Unpin for PReceiver<T> {}
|
||||||
impl<T> Unpin for PSender<T> {}
|
impl<T> Unpin for PSender<T> {}
|
||||||
|
|
||||||
|
@ -231,7 +231,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_inorder() {
|
async fn test_in_order() {
|
||||||
let (tx1, rx1) = oneshot::channel();
|
let (tx1, rx1) = oneshot::channel();
|
||||||
let (tx2, rx2) = oneshot::channel();
|
let (tx2, rx2) = oneshot::channel();
|
||||||
let (tx3, rx3) = oneshot::channel();
|
let (tx3, rx3) = oneshot::channel();
|
||||||
|
@ -36,7 +36,7 @@ impl LocalWaker {
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Check if waker has been registered.
|
/// Check if waker has been registered.
|
||||||
pub fn is_registed(&self) -> bool {
|
pub fn is_registered(&self) -> bool {
|
||||||
unsafe { (*self.waker.get()).is_some() }
|
unsafe { (*self.waker.get()).is_some() }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,7 +173,7 @@ mod tests {
|
|||||||
///
|
///
|
||||||
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
|
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn lowres_time_service_time_does_not_immediately_change() {
|
async fn low_res_time_service_time_does_not_immediately_change() {
|
||||||
let resolution = Duration::from_millis(50);
|
let resolution = Duration::from_millis(50);
|
||||||
let time_service = LowResTimeService::with(resolution);
|
let time_service = LowResTimeService::with(resolution);
|
||||||
assert_eq!(time_service.now(), time_service.now());
|
assert_eq!(time_service.now(), time_service.now());
|
||||||
@ -210,7 +210,7 @@ mod tests {
|
|||||||
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
|
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
|
||||||
/// and second value is greater than the first one at least by a resolution interval.
|
/// and second value is greater than the first one at least by a resolution interval.
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn lowres_time_service_time_updates_after_resolution_interval() {
|
async fn low_res_time_service_time_updates_after_resolution_interval() {
|
||||||
let resolution = Duration::from_millis(100);
|
let resolution = Duration::from_millis(100);
|
||||||
let wait_time = Duration::from_millis(300);
|
let wait_time = Duration::from_millis(300);
|
||||||
let time_service = LowResTimeService::with(resolution);
|
let time_service = LowResTimeService::with(resolution);
|
||||||
|
@ -220,7 +220,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_timeout_newservice() {
|
async fn test_timeout_new_service() {
|
||||||
let resolution = Duration::from_millis(100);
|
let resolution = Duration::from_millis(100);
|
||||||
let wait_time = Duration::from_millis(500);
|
let wait_time = Duration::from_millis(500);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user