diff --git a/src/service.rs b/src/service.rs index c76530320..774efb74e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,13 +1,13 @@ -use std::io; use std::marker::PhantomData; use actix_net::codec::Framed; use actix_net::service::{NewService, Service}; use futures::future::{ok, Either, FutureResult}; use futures::{Async, AsyncSink, Future, Poll, Sink}; -use tokio_io::AsyncWrite; +use tokio_io::{AsyncRead, AsyncWrite}; -use error::ResponseError; +use body::Body; +use error::{Error, ResponseError}; use h1::{Codec, Message}; use response::Response; @@ -113,20 +113,43 @@ pub struct SendResponse(PhantomData<(T,)>); impl Default for SendResponse where - T: AsyncWrite, + T: AsyncRead + AsyncWrite, { fn default() -> Self { SendResponse(PhantomData) } } +impl SendResponse +where + T: AsyncRead + AsyncWrite, +{ + pub fn send( + mut framed: Framed, + mut res: Response, + ) -> impl Future, Error = Error> { + // init codec + framed.get_codec_mut().prepare_te(&mut res); + + // extract body from response + let body = res.replace_body(Body::Empty); + + // write response + SendResponseFut { + res: Some(Message::Item(res)), + body: Some(body), + framed: Some(framed), + } + } +} + impl NewService for SendResponse where - T: AsyncWrite, + T: AsyncRead + AsyncWrite, { type Request = (Response, Framed); type Response = Framed; - type Error = io::Error; + type Error = Error; type InitError = (); type Service = SendResponse; type Future = FutureResult; @@ -138,20 +161,23 @@ where impl Service for SendResponse where - T: AsyncWrite, + T: AsyncRead + AsyncWrite, { type Request = (Response, Framed); type Response = Framed; - type Error = io::Error; + type Error = Error; type Future = SendResponseFut; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, (res, framed): Self::Request) -> Self::Future { + fn call(&mut self, (mut res, mut framed): Self::Request) -> Self::Future { + framed.get_codec_mut().prepare_te(&mut res); + let body = res.replace_body(Body::Empty); SendResponseFut { res: Some(Message::Item(res)), + body: Some(body), framed: Some(framed), } } @@ -159,29 +185,72 @@ where pub struct SendResponseFut { res: Option>, + body: Option, framed: Option>, } impl Future for SendResponseFut where - T: AsyncWrite, + T: AsyncRead + AsyncWrite, { type Item = Framed; - type Error = io::Error; + type Error = Error; fn poll(&mut self) -> Poll { - if let Some(res) = self.res.take() { - match self.framed.as_mut().unwrap().start_send(res)? { - AsyncSink::Ready => (), - AsyncSink::NotReady(res) => { - self.res = Some(res); - return Ok(Async::NotReady); + // send response + if self.res.is_some() { + let framed = self.framed.as_mut().unwrap(); + if !framed.is_write_buf_full() { + if let Some(res) = self.res.take() { + println!("SEND RESP: {:?}", res); + framed.force_send(res)?; } } } - match self.framed.as_mut().unwrap().poll_complete()? { - Async::Ready(_) => Ok(Async::Ready(self.framed.take().unwrap())), - Async::NotReady => Ok(Async::NotReady), + + // send body + if self.res.is_none() && self.body.is_some() { + let framed = self.framed.as_mut().unwrap(); + if !framed.is_write_buf_full() { + let body = self.body.take().unwrap(); + match body { + Body::Empty => (), + Body::Streaming(mut stream) => loop { + match stream.poll()? { + Async::Ready(item) => { + let done = item.is_none(); + framed.force_send(Message::Chunk(item.into()))?; + if !done { + if !framed.is_write_buf_full() { + continue; + } else { + self.body = Some(Body::Streaming(stream)); + break; + } + } + } + Async::NotReady => { + self.body = Some(Body::Streaming(stream)); + break; + } + } + }, + Body::Binary(mut bin) => { + framed.force_send(Message::Chunk(Some(bin.take())))?; + framed.force_send(Message::Chunk(None))?; + } + } + } } + + // flush + match self.framed.as_mut().unwrap().poll_complete()? { + Async::Ready(_) => if self.res.is_some() || self.body.is_some() { + return self.poll(); + }, + Async::NotReady => return Ok(Async::NotReady), + } + + Ok(Async::Ready(self.framed.take().unwrap())) } }