use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use crate::body::{BodySize, MessageBody, ResponseBody}; use crate::error::Error; use crate::h1::{Codec, Message}; use crate::response::Response; /// Send http/1 response #[pin_project::pin_project] pub struct SendResponse { res: Option, BodySize)>>, #[pin] body: Option>, framed: Option>, } impl SendResponse where B: MessageBody, { pub fn new(framed: Framed, response: Response) -> Self { let (res, body) = response.into_parts(); SendResponse { res: Some((res, body.size()).into()), body: Some(body), framed: Some(framed), } } } impl Future for SendResponse where T: AsyncRead + AsyncWrite, B: MessageBody + Unpin, { type Output = Result, Error>; // TODO: rethink if we need loops in polls fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); let mut body_done = this.body.is_none(); loop { let mut body_ready = !body_done; let framed = this.framed.as_mut().unwrap(); // send body if this.res.is_none() && body_ready { while body_ready && !body_done && !framed.is_write_buf_full() { match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { Poll::Ready(item) => { // body is done when item is None body_done = item.is_none(); if body_done { let _ = this.body.take(); } framed.write(Message::Chunk(item))?; } Poll::Pending => body_ready = false, } } } // flush write buffer if !framed.is_write_buf_empty() { match framed.flush(cx)? { Poll::Ready(_) => { if body_ready { continue; } else { return Poll::Pending; } } Poll::Pending => return Poll::Pending, } } // send response if let Some(res) = this.res.take() { framed.write(res)?; continue; } if body_done { if body_ready { continue; } else { return Poll::Pending; } } else { break; } } Poll::Ready(Ok(this.framed.take().unwrap())) } }