From dba86fbbf87a2b68979fd4eb3ee220f457e8f06b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 14 Nov 2018 10:26:49 -0800 Subject: [PATCH] allow to force write to Framed object --- CHANGES.md | 6 ++++++ src/codec/framed.rs | 25 +++++++++++++++++++++++++ src/codec/framed_write.rs | 26 +++++++++++++++++++++++++- 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 23476c88..229d2d74 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.2.2] - 2018-11-14 + +* Refactor Connector and Resolver services + +* Add low/high caps to Framed + ## [0.2.0] - 2018-11-08 ### Added diff --git a/src/codec/framed.rs b/src/codec/framed.rs index b05d7abd..fcf5d2da 100644 --- a/src/codec/framed.rs +++ b/src/codec/framed.rs @@ -54,10 +54,19 @@ where /// Same as `Framed::new()` with ability to specify write buffer low/high capacity watermarks. pub fn new_with_caps(inner: T, codec: U, lw: usize, hw: usize) -> Framed { + debug_assert!((lw < hw) && hw != 0); Framed { inner: framed_read2(framed_write2(Fuse(inner, codec), lw, hw)), } } + + /// Force send item + pub fn force_send( + &mut self, + item: ::Item, + ) -> Result<(), ::Error> { + self.inner.get_mut().force_send(item) + } } impl Framed { @@ -153,6 +162,22 @@ impl Framed { } } + /// Consume the `Frame`, returning `Frame` with different codec. + pub fn map_codec(self, f: F) -> Framed + where + F: Fn(U) -> U2, + { + let (inner, read_buf) = self.inner.into_parts(); + let (inner, write_buf, lw, hw) = inner.into_parts(); + + Framed { + inner: framed_read2_with_buffer( + framed_write2_with_buffer(Fuse(inner.0, f(inner.1)), write_buf, lw, hw), + read_buf, + ), + } + } + /// Consumes the `Frame`, returning its underlying I/O stream, the buffer /// with unprocessed data, and the codec. /// diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 337bb4eb..02ea5738 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -79,6 +79,16 @@ impl FramedWrite { } } +impl FramedWrite +where + E: Encoder, +{ + /// Force send item + pub fn force_send(&mut self, item: E::Item) -> Result<(), E::Error> { + self.inner.force_send(item) + } +} + impl Sink for FramedWrite where T: AsyncWrite, @@ -186,6 +196,20 @@ impl FramedWrite2 { } } +impl FramedWrite2 +where + T: Encoder, +{ + pub fn force_send(&mut self, item: T::Item) -> Result<(), T::Error> { + let len = self.buffer.len(); + if len < self.low_watermark { + self.buffer.reserve(self.high_watermark - len) + } + self.inner.encode(item, &mut self.buffer)?; + Ok(()) + } +} + impl Sink for FramedWrite2 where T: AsyncWrite + Encoder, @@ -203,7 +227,7 @@ where self.buffer.reserve(self.high_watermark - len) } - try!(self.inner.encode(item, &mut self.buffer)); + self.inner.encode(item, &mut self.buffer)?; Ok(AsyncSink::Ready) }