use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use pin_project_lite::pin_project; use crate::{ body::{BodySize, MessageBody}, h1::{Codec, Message}, Error, Response, }; pin_project! { /// Send HTTP/1 response pub struct SendResponse { res: Option, BodySize)>>, #[pin] body: Option, #[pin] framed: Option>, } } impl SendResponse where B: MessageBody, B::Error: Into, { pub fn new(framed: Framed, response: Response) -> Self { let (res, body) = response.into_parts(); SendResponse { res: Some((res, body.size()).into()), body: Some(body), framed: Some(framed), } } } impl Future for SendResponse where T: AsyncRead + AsyncWrite + Unpin, B: MessageBody, B::Error: Into, { type Output = Result, Error>; // TODO: rethink if we need loops in polls fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); let mut body_done = this.body.is_none(); loop { let mut body_ready = !body_done; // send body if this.res.is_none() && body_ready { while body_ready && !body_done && !this .framed .as_ref() .as_pin_ref() .unwrap() .is_write_buf_full() { let next = match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) { Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)), Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, }; match next { Poll::Ready(item) => { // body is done when item is None body_done = item.is_none(); if body_done { this.body.set(None); } let framed = this.framed.as_mut().as_pin_mut().unwrap(); framed .write(Message::Chunk(item)) .map_err(|err| Error::new_send_response().with_cause(err))?; } Poll::Pending => body_ready = false, } } } let framed = this.framed.as_mut().as_pin_mut().unwrap(); // flush write buffer if !framed.is_write_buf_empty() { match framed .flush(cx) .map_err(|err| Error::new_send_response().with_cause(err))? { Poll::Ready(_) => { if body_ready { continue; } else { return Poll::Pending; } } Poll::Pending => return Poll::Pending, } } // send response if let Some(res) = this.res.take() { framed .write(res) .map_err(|err| Error::new_send_response().with_cause(err))?; continue; } if !body_done { if body_ready { continue; } else { return Poll::Pending; } } else { break; } } let framed = this.framed.take().unwrap(); Poll::Ready(Ok(framed)) } }