diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index e2193abe..ae76143a 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.0-alpha.3] + +* Use tokio 0.2 + * Fix low/high watermark for write/read buffers ## [0.2.0-alpha.2] diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 3084aee8..bd3572b8 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-codec" -version = "0.2.0-alpha.2" +version = "0.2.0-alpha.3" authors = ["Nikolay Kim "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,9 +18,10 @@ name = "actix_codec" path = "src/lib.rs" [dependencies] -bytes = "0.4.12" +bitflags = "1.2.1" +bytes = "0.5.2" futures = "0.3.1" -pin-project = "0.4.5" -tokio-io = "=0.2.0-alpha.6" -tokio-codec = "=0.2.0-alpha.6" +pin-project = "0.4.6" +tokio = { version = "0.2.2", default-features=false } +tokio-util = { version = "0.2.0", default-features=false, features=["codec"] } log = "0.4" \ No newline at end of file diff --git a/actix-codec/src/bcodec.rs b/actix-codec/src/bcodec.rs index d7523bf0..c71c0fa4 100644 --- a/actix-codec/src/bcodec.rs +++ b/actix-codec/src/bcodec.rs @@ -1,7 +1,7 @@ +use bytes::{BufMut, Bytes, BytesMut}; use std::io; -use bytes::{Bytes, BytesMut}; -use tokio_codec::{Decoder, Encoder}; +use super::{Decoder, Encoder}; /// Bytes codec. /// @@ -14,7 +14,8 @@ impl Encoder for BytesCodec { type Error = io::Error; fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.extend_from_slice(&item[..]); + dst.reserve(item.len()); + dst.put(item); Ok(()) } } @@ -27,7 +28,8 @@ impl Decoder for BytesCodec { if src.is_empty() { Ok(None) } else { - Ok(Some(src.take())) + let len = src.len(); + Ok(Some(src.split_to(len))) } } } diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 682f446c..a82863d6 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -1,19 +1,22 @@ -#![allow(deprecated)] - -use std::fmt; -use std::io::{self}; use std::pin::Pin; use std::task::{Context, Poll}; +use std::{fmt, io}; use bytes::{BufMut, BytesMut}; use futures::{ready, Sink, Stream}; use pin_project::pin_project; -use tokio_codec::{Decoder, Encoder}; -use tokio_io::{AsyncRead, AsyncWrite}; + +use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; const LW: usize = 1024; const HW: usize = 8 * 1024; -const INITIAL_CAPACITY: usize = 8 * 1024; + +bitflags::bitflags! { + struct Flags: u8 { + const EOF = 0b0001; + const READABLE = 0b0010; + } +} /// A unified `Stream` and `Sink` interface to an underlying I/O object, using /// the `Encoder` and `Decoder` traits to encode and decode frames. @@ -23,12 +26,9 @@ const INITIAL_CAPACITY: usize = 8 * 1024; pub struct Framed { io: T, codec: U, - eof: bool, - is_readable: bool, + flags: Flags, read_buf: BytesMut, write_buf: BytesMut, - write_lw: usize, - write_hw: usize, } impl Framed @@ -57,27 +57,9 @@ where Framed { io, codec, - eof: false, - is_readable: false, - read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), + flags: Flags::empty(), + read_buf: BytesMut::with_capacity(HW), write_buf: BytesMut::with_capacity(HW), - write_lw: LW, - write_hw: HW, - } - } - - /// Same as `Framed::new()` with ability to specify write buffer low/high capacity watermarks. - pub fn new_with_caps(io: T, codec: U, lw: usize, hw: usize) -> Framed { - debug_assert!((lw < hw) && hw != 0); - Framed { - io, - codec, - eof: false, - is_readable: false, - read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), - write_buf: BytesMut::with_capacity(hw), - write_lw: lw, - write_hw: hw, } } } @@ -108,11 +90,8 @@ impl Framed { Framed { io: parts.io, codec: parts.codec, - eof: false, - is_readable: false, + flags: parts.flags, write_buf: parts.write_buf, - write_lw: parts.write_buf_lw, - write_hw: parts.write_buf_hw, read_buf: parts.read_buf, } } @@ -154,7 +133,7 @@ impl Framed { /// Check if write buffer is full. pub fn is_write_buf_full(&self) -> bool { - self.write_buf.len() >= self.write_hw + self.write_buf.len() >= HW } /// Consumes the `Frame`, returning its underlying I/O stream. @@ -169,14 +148,11 @@ impl Framed { /// Consume the `Frame`, returning `Frame` with different codec. pub fn into_framed(self, codec: U2) -> Framed { Framed { - io: self.io, codec, - eof: self.eof, - is_readable: self.is_readable, + io: self.io, + flags: self.flags, read_buf: self.read_buf, write_buf: self.write_buf, - write_lw: self.write_lw, - write_hw: self.write_hw, } } @@ -188,12 +164,9 @@ impl Framed { Framed { io: f(self.io), codec: self.codec, - eof: self.eof, - is_readable: self.is_readable, + flags: self.flags, read_buf: self.read_buf, write_buf: self.write_buf, - write_lw: self.write_lw, - write_hw: self.write_hw, } } @@ -205,12 +178,9 @@ impl Framed { Framed { io: self.io, codec: f(self.codec), - eof: self.eof, - is_readable: self.is_readable, + flags: self.flags, read_buf: self.read_buf, write_buf: self.write_buf, - write_lw: self.write_lw, - write_hw: self.write_hw, } } @@ -224,11 +194,9 @@ impl Framed { FramedParts { io: self.io, codec: self.codec, + flags: self.flags, read_buf: self.read_buf, write_buf: self.write_buf, - write_buf_lw: self.write_lw, - write_buf_hw: self.write_hw, - _priv: (), } } } @@ -241,8 +209,8 @@ impl Framed { U: Encoder, { let remaining = self.write_buf.remaining_mut(); - if remaining < self.write_lw { - self.write_buf.reserve(self.write_hw - remaining); + if remaining < LW { + self.write_buf.reserve(HW - remaining); } self.codec.encode(item, &mut self.write_buf)?; @@ -251,7 +219,7 @@ impl Framed { /// Check if framed is able to write more data pub fn is_ready(&self) -> bool { - self.write_buf.len() < self.write_hw + self.write_buf.len() < HW } pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> @@ -266,8 +234,8 @@ impl Framed { // readable, it can be assumed that the decoder will never become // readable again, at which point the stream is terminated. - if self.is_readable { - if self.eof { + if self.flags.contains(Flags::READABLE) { + if self.flags.contains(Flags::EOF) { match self.codec.decode_eof(&mut self.read_buf) { Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))), Ok(None) => return Poll::Ready(None), @@ -288,10 +256,10 @@ impl Framed { } } - self.is_readable = false; + self.flags.remove(Flags::READABLE); } - assert!(!self.eof); + debug_assert!(!self.flags.contains(Flags::EOF)); // Otherwise, try to read more data and try again. Make sure we've got room let remaining = self.read_buf.remaining_mut(); @@ -307,9 +275,9 @@ impl Framed { }; if cnt == 0 { - self.eof = true; + self.flags.insert(Flags::EOF); } - self.is_readable = true; + self.flags.insert(Flags::READABLE); } } @@ -441,15 +409,7 @@ pub struct FramedParts { /// A buffer with unprocessed data which are not written yet. pub write_buf: BytesMut, - /// A buffer low watermark capacity - pub write_buf_lw: usize, - - /// A buffer high watermark capacity - pub write_buf_hw: usize, - - /// This private field allows us to add additional fields in the future in a - /// backwards compatible way. - _priv: (), + flags: Flags, } impl FramedParts { @@ -458,11 +418,9 @@ impl FramedParts { FramedParts { io, codec, + flags: Flags::empty(), read_buf: BytesMut::new(), write_buf: BytesMut::new(), - write_buf_lw: LW, - write_buf_hw: HW, - _priv: (), } } @@ -472,10 +430,8 @@ impl FramedParts { io, codec, read_buf, + flags: Flags::empty(), write_buf: BytesMut::new(), - write_buf_lw: LW, - write_buf_hw: HW, - _priv: (), } } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 233fa0dc..6348b58f 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -6,9 +6,6 @@ //! //! [`AsyncRead`]: # //! [`AsyncWrite`]: # -//! [`Sink`]: # -//! [`Stream`]: # -//! [transports]: # #![deny(rust_2018_idioms, warnings)] #![allow(clippy::type_complexity)] @@ -18,5 +15,5 @@ mod framed; pub use self::bcodec::BytesCodec; pub use self::framed::{Framed, FramedParts}; -pub use tokio_codec::{Decoder, Encoder}; -pub use tokio_io::{AsyncRead, AsyncWrite}; +pub use tokio::io::{AsyncRead, AsyncWrite}; +pub use tokio_util::codec::{Decoder, Encoder};