diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index 8959d057..1101b6de 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.0] - 2019-12-10 + +* Use specific futures dependencies + ## [0.2.0-alpha.4] * Fix buffer remaining capacity calcualtion @@ -14,17 +18,14 @@ * Migrated to `std::future` - ## [0.1.2] - 2019-03-27 * Added `Framed::map_io()` method. - ## [0.1.1] - 2019-03-06 * Added `FramedParts::with_read_buffer()` method. - ## [0.1.0] - 2018-12-09 * Move codec to separate crate diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 9f2f64b9..9decfdc1 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-codec" -version = "0.2.0-alpha.4" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -9,7 +9,6 @@ repository = "https://github.com/actix/actix-net.git" documentation = "https://docs.rs/actix-codec/" categories = ["network-programming", "asynchronous"] license = "MIT/Apache-2.0" -exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] edition = "2018" workspace = ".." @@ -20,7 +19,8 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" bytes = "0.5.2" -futures = "0.3.1" +futures-core = "0.3.1" +futures-sink = "0.3.1" tokio = { version = "0.2.4", default-features=false } tokio-util = { version = "0.2.0", default-features=false, features=["codec"] } log = "0.4" \ No newline at end of file diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index ed1ce7a1..e5d45804 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -3,7 +3,8 @@ use std::task::{Context, Poll}; use std::{fmt, io}; use bytes::BytesMut; -use futures::{ready, Sink, Stream}; +use futures_core::{ready, Stream}; +use futures_sink::Sink; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; @@ -19,8 +20,6 @@ bitflags::bitflags! { /// A unified `Stream` and `Sink` interface to an underlying I/O object, using /// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. pub struct Framed { io: T, codec: U, @@ -49,10 +48,6 @@ where /// `Sink`; grouping this into a single object is often useful for layering /// things like gzip or TLS, which require both read and write access to the /// underlying object. - /// - /// If you want to work more directly with the streams and sink, consider - /// calling `split` on the `Framed` returned by this method, which will - /// break them into separate objects, allowing them to interact more easily. pub fn new(io: T, codec: U) -> Framed { Framed { io, @@ -82,10 +77,6 @@ impl Framed { /// This objects takes a stream and a readbuffer and a writebuffer. These /// field can be obtained from an existing `Framed` with the /// `into_parts` method. - /// - /// If you want to work more directly with the streams and sink, consider - /// calling `split` on the `Framed` returned by this method, which will - /// break them into separate objects, allowing them to interact more easily. pub fn from_parts(parts: FramedParts) -> Framed { Framed { io: parts.io, @@ -136,15 +127,6 @@ impl Framed { self.write_buf.len() >= HW } - /// Consumes the `Frame`, returning its underlying I/O stream. - /// - /// Note that care should be taken to not tamper with the underlying stream - /// of data coming in as it may corrupt the stream of frames otherwise - /// being worked with. - pub fn into_inner(self) -> T { - self.io - } - /// Consume the `Frame`, returning `Frame` with different codec. pub fn into_framed(self, codec: U2) -> Framed { Framed { @@ -217,11 +199,14 @@ impl Framed { Ok(()) } - /// Check if framed is able to write more data - pub fn is_ready(&self) -> bool { + /// Check if framed is able to write more data. + /// + /// `Framed` object considers ready if there is free space in write buffer. + pub fn is_write_ready(&self) -> bool { self.write_buf.len() < HW } + /// Try to read underlying I/O stream and decode item. pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> where T: AsyncRead, @@ -251,9 +236,7 @@ impl Framed { return Poll::Ready(Some(Ok(frame))); } Err(e) => return Poll::Ready(Some(Err(e))), - _ => { - // Need more data - } + _ => (), // Need more data } self.flags.remove(Flags::READABLE); @@ -281,6 +264,7 @@ impl Framed { } } + /// Flush write buffer to underlying I/O stream. pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> where T: AsyncWrite, @@ -298,14 +282,12 @@ impl Framed { if n == 0 { return Poll::Ready(Err(io::Error::new( io::ErrorKind::WriteZero, - "failed to \ - write frame to transport", + "failed to write frame to transport", ) .into())); } - // TODO: Add a way to `bytes` to do this w/o returning the drained - // data. + // remove written data let _ = self.write_buf.split_to(n); } @@ -316,6 +298,7 @@ impl Framed { Poll::Ready(Ok(())) } + /// Flush write buffer and shutdown underlying I/O stream. pub fn close(&mut self, cx: &mut Context<'_>) -> Poll> where T: AsyncWrite, @@ -350,7 +333,7 @@ where type Error = U::Error; fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - if self.is_ready() { + if self.is_write_ready() { Poll::Ready(Ok(())) } else { Poll::Pending