use std::fmt; use std::io::{self, Read}; use bytes::BytesMut; use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; use super::framed::Fuse; /// A `Sink` of frames encoded to an `AsyncWrite`. pub struct FramedWrite { inner: FramedWrite2>, } pub struct FramedWrite2 { inner: T, buffer: BytesMut, } const INITIAL_CAPACITY: usize = 8 * 1024; const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; impl FramedWrite where T: AsyncWrite, E: Encoder, { /// Creates a new `FramedWrite` with the given `encoder`. pub fn new(inner: T, encoder: E) -> FramedWrite { FramedWrite { inner: framed_write2(Fuse(inner, encoder)), } } } impl FramedWrite { /// Returns a reference to the underlying I/O stream wrapped by /// `FramedWrite`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { &self.inner.inner.0 } /// Returns a mutable reference to the underlying I/O stream wrapped by /// `FramedWrite`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_mut(&mut self) -> &mut T { &mut self.inner.inner.0 } /// Consumes the `FramedWrite`, returning its underlying I/O stream. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn into_inner(self) -> T { self.inner.inner.0 } /// Returns a reference to the underlying decoder. pub fn encoder(&self) -> &E { &self.inner.inner.1 } /// Returns a mutable reference to the underlying decoder. pub fn encoder_mut(&mut self) -> &mut E { &mut self.inner.inner.1 } } impl Sink for FramedWrite where T: AsyncWrite, E: Encoder, { type SinkItem = E::Item; type SinkError = E::Error; fn start_send(&mut self, item: E::Item) -> StartSend { self.inner.start_send(item) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { self.inner.poll_complete() } fn close(&mut self) -> Poll<(), Self::SinkError> { Ok(try!(self.inner.close())) } } impl Stream for FramedWrite where T: Stream, { type Item = T::Item; type Error = T::Error; fn poll(&mut self) -> Poll, Self::Error> { self.inner.inner.0.poll() } } impl fmt::Debug for FramedWrite where T: fmt::Debug, U: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("FramedWrite") .field("inner", &self.inner.get_ref().0) .field("encoder", &self.inner.get_ref().1) .field("buffer", &self.inner.buffer) .finish() } } // ===== impl FramedWrite2 ===== pub fn framed_write2(inner: T) -> FramedWrite2 { FramedWrite2 { inner: inner, buffer: BytesMut::with_capacity(INITIAL_CAPACITY), } } pub fn framed_write2_with_buffer(inner: T, mut buf: BytesMut) -> FramedWrite2 { if buf.capacity() < INITIAL_CAPACITY { let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); buf.reserve(bytes_to_reserve); } FramedWrite2 { inner: inner, buffer: buf, } } impl FramedWrite2 { pub fn get_ref(&self) -> &T { &self.inner } pub fn into_inner(self) -> T { self.inner } pub fn into_parts(self) -> (T, BytesMut) { (self.inner, self.buffer) } pub fn get_mut(&mut self) -> &mut T { &mut self.inner } } impl Sink for FramedWrite2 where T: AsyncWrite + Encoder, { type SinkItem = T::Item; type SinkError = T::Error; fn start_send(&mut self, item: T::Item) -> StartSend { // If the buffer is already over 8KiB, then attempt to flush it. If after // flushing it's *still* over 8KiB, then apply backpressure (reject the // send). if self.buffer.len() >= BACKPRESSURE_BOUNDARY { try!(self.poll_complete()); if self.buffer.len() >= BACKPRESSURE_BOUNDARY { return Ok(AsyncSink::NotReady(item)); } } try!(self.inner.encode(item, &mut self.buffer)); Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { trace!("flushing framed transport"); while !self.buffer.is_empty() { trace!("writing; remaining={}", self.buffer.len()); let n = try_ready!(self.inner.poll_write(&self.buffer)); if n == 0 { return Err(io::Error::new( io::ErrorKind::WriteZero, "failed to \ write frame to transport", ) .into()); } // TODO: Add a way to `bytes` to do this w/o returning the drained // data. let _ = self.buffer.split_to(n); } // Try flushing the underlying IO try_ready!(self.inner.poll_flush()); trace!("framed transport flushed"); return Ok(Async::Ready(())); } fn close(&mut self) -> Poll<(), Self::SinkError> { try_ready!(self.poll_complete()); Ok(try!(self.inner.shutdown())) } } impl Decoder for FramedWrite2 { type Item = T::Item; type Error = T::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, T::Error> { self.inner.decode(src) } fn decode_eof(&mut self, src: &mut BytesMut) -> Result, T::Error> { self.inner.decode_eof(src) } } impl Read for FramedWrite2 { fn read(&mut self, dst: &mut [u8]) -> io::Result { self.inner.read(dst) } } impl AsyncRead for FramedWrite2 { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } }