diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 3882c2d5..0389d1f8 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -139,27 +139,14 @@ where fn is_empty(&self) -> bool { matches!(self, State::None) } - - fn is_call(&self) -> bool { - matches!(self, State::ServiceCall(_)) - } } + enum PollResponse { Upgrade(Request), DoNothing, DrainWriteBuf, } -impl PartialEq for PollResponse { - fn eq(&self, other: &PollResponse) -> bool { - match self { - PollResponse::DrainWriteBuf => matches!(other, PollResponse::DrainWriteBuf), - PollResponse::DoNothing => matches!(other, PollResponse::DoNothing), - _ => false, - } - } -} - impl Dispatcher where T: AsyncRead + AsyncWrite + Unpin, @@ -323,9 +310,10 @@ where message: Response<()>, body: ResponseBody, ) -> Result<(), DispatchError> { + let size = body.size(); let mut this = self.project(); this.codec - .encode(Message::Item((message, body.size())), &mut this.write_buf) + .encode(Message::Item((message, size)), &mut this.write_buf) .map_err(|err| { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); @@ -334,7 +322,7 @@ where })?; this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); - match body.size() { + match size { BodySize::None | BodySize::Empty => this.state.set(State::None), _ => this.state.set(State::SendPayload(body)), }; @@ -351,109 +339,111 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { - loop { + 'res: loop { let mut this = self.as_mut().project(); - // state is not changed on Poll::Pending. - // other variant and conditions always trigger a state change(or an error). - let state_change = match this.state.project() { + match this.state.as_mut().project() { + // no future is in InnerDispatcher state. pop next message. StateProj::None => match this.messages.pop_front() { + // handle request message. Some(DispatcherMessage::Item(req)) => { - self.as_mut().handle_request(req, cx)?; - true + // Handle `EXPECT: 100-Continue` header + if req.head().expect() { + // set InnerDispatcher state and continue loop to poll it. + let task = this.flow.expect.call(req); + this.state.set(State::ExpectCall(task)); + } else { + // the same as expect call. + let task = this.flow.service.call(req); + this.state.set(State::ServiceCall(task)); + }; } + // handle error message. Some(DispatcherMessage::Error(res)) => { + // send_response would update InnerDispatcher state to SendPayload or + // None(If response body is empty). + // continue loop to poll it. self.as_mut() .send_response(res, ResponseBody::Other(Body::Empty))?; - true } + // return with upgrade request and poll it exclusively. Some(DispatcherMessage::Upgrade(req)) => { return Ok(PollResponse::Upgrade(req)); } - None => false, - }, - StateProj::ExpectCall(fut) => match fut.poll(cx) { - Poll::Ready(Ok(req)) => { - self.as_mut().send_continue(); - this = self.as_mut().project(); - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall(fut)); - continue; - } - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; - true - } - Poll::Pending => false, + // all messages are dealt with. + None => return Ok(PollResponse::DoNothing), }, StateProj::ServiceCall(fut) => match fut.poll(cx) { + // service call resolved. send response. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.as_mut().send_response(res, body)?; - continue; } + // send service call error as response Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); self.as_mut().send_response(res, body.into_body())?; - true } - Poll::Pending => false, + // service call pending and could be waiting for more chunk messages. + // (pipeline message limit and/or payload can_read limit) + Poll::Pending => { + // no new message is decoded and no new payload is feed. + // nothing to do except waiting for new incoming data from client. + if !self.as_mut().poll_request(cx)? { + return Ok(PollResponse::DoNothing); + } + // otherwise keep loop. + } }, StateProj::SendPayload(mut stream) => { - loop { - if this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { - match stream.as_mut().poll_next(cx) { - Poll::Ready(Some(Ok(item))) => { - this.codec.encode( - Message::Chunk(Some(item)), - &mut this.write_buf, - )?; - continue; - } - Poll::Ready(None) => { - this.codec.encode( - Message::Chunk(None), - &mut this.write_buf, - )?; - this = self.as_mut().project(); - this.state.set(State::None); - } - Poll::Ready(Some(Err(_))) => { - return Err(DispatchError::Unknown) - } - Poll::Pending => return Ok(PollResponse::DoNothing), + // keep populate writer buffer until buffer size limit hit, + // get blocked or finished. + while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + match stream.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(item))) => { + this.codec.encode( + Message::Chunk(Some(item)), + &mut this.write_buf, + )?; } - } else { - return Ok(PollResponse::DrainWriteBuf); + Poll::Ready(None) => { + this.codec + .encode(Message::Chunk(None), &mut this.write_buf)?; + // payload stream finished. + // set state to None and handle next message + this.state.set(State::None); + continue 'res; + } + Poll::Ready(Some(Err(e))) => { + return Err(DispatchError::Service(e)) + } + Poll::Pending => return Ok(PollResponse::DoNothing), } - break; } - continue; + // buffer is beyond max size. + // return and try to write the whole buffer to io stream. + return Ok(PollResponse::DrainWriteBuf); } - }; - - // state is changed and continue when the state is not Empty - if state_change { - if !self.state.is_empty() { - continue; - } - } else { - // if read-backpressure is enabled and we consumed some data. - // we may read more data and retry - if self.state.is_call() { - if self.as_mut().poll_request(cx)? { - continue; + StateProj::ExpectCall(fut) => match fut.poll(cx) { + // expect resolved. write continue to buffer and set InnerDispatcher state + // to service call. + Poll::Ready(Ok(req)) => { + this.write_buf + .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall(fut)); } - } else if !self.messages.is_empty() { - continue; - } + // send expect error as response + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_response(res, body.into_body())?; + } + // expect must be solved before progress can be made. + Poll::Pending => return Ok(PollResponse::DoNothing), + }, } - break; } - - Ok(PollResponse::DoNothing) } fn handle_request(