use std::collections::VecDeque; use actix_net::codec::Framed; use actix_net::service::Service; use bytes::Bytes; use futures::future::{err, ok, Either}; use futures::{Async, Future, Poll, Sink, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use super::error::{ConnectorError, SendRequestError}; use super::request::RequestHead; use super::response::ClientResponse; use super::{Connect, Connection}; use body::{BodyStream, BodyType, MessageBody}; use error::Error; use h1; pub fn send_request( head: RequestHead, body: B, connector: &mut T, ) -> impl Future where T: Service, Error = ConnectorError>, B: MessageBody, Io: AsyncRead + AsyncWrite + 'static, { let tp = body.tp(); connector .call(Connect::new(head.uri.clone())) .from_err() .map(|io| Framed::new(io, h1::ClientCodec::default())) .and_then(|framed| framed.send((head, tp).into()).from_err()) .and_then(move |framed| match body.tp() { BodyType::None | BodyType::Zero => Either::A(ok(framed)), _ => Either::B(SendBody::new(body, framed)), }).and_then(|framed| { framed .into_future() .map_err(|(e, _)| SendRequestError::from(e)) .and_then(|(item, framed)| { if let Some(item) = item { let mut res = item.into_item().unwrap(); match framed.get_codec().message_type() { h1::MessageType::None => release_connection(framed), _ => res.payload = Some(Payload::stream(framed)), } ok(res) } else { err(ConnectorError::Disconnected.into()) } }) }) } struct SendBody { body: Option, framed: Option, h1::ClientCodec>>, write_buf: VecDeque>, flushed: bool, } impl SendBody where Io: AsyncRead + AsyncWrite + 'static, B: MessageBody, { fn new(body: B, framed: Framed, h1::ClientCodec>) -> Self { SendBody { body: Some(body), framed: Some(framed), write_buf: VecDeque::new(), flushed: true, } } } impl Future for SendBody where Io: AsyncRead + AsyncWrite + 'static, B: MessageBody, { type Item = Framed, h1::ClientCodec>; type Error = SendRequestError; fn poll(&mut self) -> Poll { let mut body_ready = true; loop { while body_ready && self.body.is_some() && !self.framed.as_ref().unwrap().is_write_buf_full() { match self.body.as_mut().unwrap().poll_next()? { Async::Ready(None) => { self.flushed = false; self.framed .as_mut() .unwrap() .start_send(h1::Message::Chunk(None))?; break; } Async::Ready(Some(chunk)) => { self.flushed = false; self.framed .as_mut() .unwrap() .start_send(h1::Message::Chunk(Some(chunk)))?; } Async::NotReady => body_ready = false, } } if !self.flushed { match self.framed.as_mut().unwrap().poll_complete()? { Async::Ready(_) => { self.flushed = true; continue; } Async::NotReady => return Ok(Async::NotReady), } } if self.body.is_none() { return Ok(Async::Ready(self.framed.take().unwrap())); } return Ok(Async::NotReady); } } } struct Payload { framed: Option, h1::ClientCodec>>, } impl Payload { fn stream(framed: Framed, h1::ClientCodec>) -> BodyStream { Box::new(Payload { framed: Some(framed), }) } } impl Stream for Payload { type Item = Bytes; type Error = Error; fn poll(&mut self) -> Poll, Error> { match self.framed.as_mut().unwrap().poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(Some(chunk)) => match chunk { h1::Message::Chunk(Some(chunk)) => Ok(Async::Ready(Some(chunk))), h1::Message::Chunk(None) => { release_connection(self.framed.take().unwrap()); Ok(Async::Ready(None)) } h1::Message::Item(_) => unreachable!(), }, Async::Ready(None) => Ok(Async::Ready(None)), } } } fn release_connection(framed: Framed, h1::ClientCodec>) where Io: AsyncRead + AsyncWrite + 'static, { let parts = framed.into_parts(); if parts.read_buf.is_empty() && parts.write_buf.is_empty() { parts.io.release() } else { parts.io.close() } }