diff --git a/src/body.rs b/src/body.rs index c10b067a..c78ea817 100644 --- a/src/body.rs +++ b/src/body.rs @@ -69,10 +69,10 @@ impl Body { /// Is this binary body. #[inline] - pub(crate) fn binary(self) -> Binary { + pub(crate) fn into_binary(self) -> Option { match self { - Body::Binary(b) => b, - _ => panic!(), + Body::Binary(b) => Some(b), + _ => None, } } } diff --git a/src/error.rs b/src/error.rs index 277814d2..1e60c348 100644 --- a/src/error.rs +++ b/src/error.rs @@ -374,7 +374,7 @@ impl ResponseError for cookie::ParseError { #[derive(Debug)] /// A set of errors that can occur during dispatching http requests -pub enum DispatchError { +pub enum DispatchError { /// Service error // #[fail(display = "Application specific error: {}", _0)] Service(E), @@ -413,13 +413,13 @@ pub enum DispatchError { Unknown, } -impl From for DispatchError { +impl From for DispatchError { fn from(err: ParseError) -> Self { DispatchError::Parse(err) } } -impl From for DispatchError { +impl From for DispatchError { fn from(err: io::Error) -> Self { DispatchError::Io(err) } diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 7b6d31fe..3bf17a8b 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -1,15 +1,15 @@ use std::collections::VecDeque; +use std::fmt::{Debug, Display}; use std::time::Instant; use actix_net::codec::Framed; use actix_net::service::Service; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; -use log::Level::Debug; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; -use error::{Error, ParseError, PayloadError}; +use error::{ParseError, PayloadError}; use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use body::Body; @@ -38,7 +38,7 @@ bitflags! { /// Dispatcher for HTTP/1.1 protocol pub struct Dispatcher where - S::Error: Into, + S::Error: Debug + Display, { service: S, flags: Flags, @@ -61,7 +61,7 @@ enum Message { enum State { None, - Response(S::Future), + ServiceCall(S::Future), SendResponse(Option), SendResponseWithPayload(Option<(OutMessage, Body)>), Payload(Body), @@ -81,7 +81,7 @@ impl Dispatcher where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Into, + S::Error: Debug + Display, { /// Create http/1 dispatcher. pub fn new(stream: T, config: ServiceConfig, service: S) -> Self { @@ -177,7 +177,7 @@ where State::None => loop { break if let Some(msg) = self.messages.pop_front() { match msg { - Message::Item(req) => Some(self.handle_request(req)), + Message::Item(req) => Some(self.handle_request(req)?), Message::Error(res) => Some(State::SendResponse(Some( OutMessage::Response(res), ))), @@ -187,23 +187,23 @@ where }; }, State::Payload(ref mut _body) => unimplemented!(), - State::Response(ref mut fut) => match fut.poll() { - Ok(Async::Ready(mut res)) => { + State::ServiceCall(ref mut fut) => match fut + .poll() + .map_err(DispatchError::Service)? + { + Async::Ready(mut res) => { self.framed.get_codec_mut().prepare_te(&mut res); - if res.body().is_streaming() { - unimplemented!() - } else { + let body = res.replace_body(Body::Empty); + if body.is_empty() { Some(State::SendResponse(Some(OutMessage::Response(res)))) + } else { + Some(State::SendResponseWithPayload(Some(( + OutMessage::Response(res), + body, + )))) } } - Ok(Async::NotReady) => None, - Err(err) => { - let err = err.into(); - if log_enabled!(Debug) { - debug!("{:?}", err); - } - Some(State::SendResponse(Some(OutMessage::Response(err.into())))) - } + Async::NotReady => None, }, State::SendResponse(ref mut item) => { let msg = item.take().expect("SendResponse is empty"); @@ -273,25 +273,24 @@ where Ok(()) } - fn handle_request(&mut self, req: Request) -> State { + fn handle_request( + &mut self, req: Request, + ) -> Result, DispatchError> { let mut task = self.service.call(req); - match task.poll() { - Ok(Async::Ready(mut res)) => { + match task.poll().map_err(DispatchError::Service)? { + Async::Ready(mut res) => { self.framed.get_codec_mut().prepare_te(&mut res); - if res.body().is_streaming() { - unimplemented!() + let body = res.replace_body(Body::Empty); + if body.is_empty() { + Ok(State::SendResponse(Some(OutMessage::Response(res)))) } else { - State::SendResponse(Some(OutMessage::Response(res))) + Ok(State::SendResponseWithPayload(Some(( + OutMessage::Response(res), + body, + )))) } } - Ok(Async::NotReady) => State::Response(task), - Err(err) => { - let err = err.into(); - if log_enabled!(Debug) { - debug!("{:?}", err); - } - State::SendResponse(Some(OutMessage::Response(err.into()))) - } + Async::NotReady => Ok(State::ServiceCall(task)), } } @@ -302,7 +301,7 @@ where InMessage::Message(msg) => { // handle request early if self.state.is_empty() { - self.state = self.handle_request(msg); + self.state = self.handle_request(msg)?; } else { self.messages.push_back(Message::Item(msg)); } @@ -445,7 +444,7 @@ impl Future for Dispatcher where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Into, + S::Error: Debug + Display, { type Item = (); type Error = DispatchError; diff --git a/src/h1/service.rs b/src/h1/service.rs index 3ac073ad..aa59614d 100644 --- a/src/h1/service.rs +++ b/src/h1/service.rs @@ -1,3 +1,4 @@ +use std::fmt::{Debug, Display}; use std::marker::PhantomData; use actix_net::codec::Framed; @@ -6,7 +7,7 @@ use futures::{future, Async, Future, Poll, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use config::ServiceConfig; -use error::{DispatchError, Error, ParseError}; +use error::{DispatchError, ParseError}; use request::Request; use response::Response; @@ -24,7 +25,7 @@ impl H1Service where S: NewService, S::Service: Clone, - S::Error: Into, + S::Error: Debug + Display, { /// Create new `HttpService` instance. pub fn new>(cfg: ServiceConfig, service: F) -> Self { @@ -41,7 +42,7 @@ where T: AsyncRead + AsyncWrite, S: NewService + Clone, S::Service: Clone, - S::Error: Into, + S::Error: Debug + Display, { type Request = T; type Response = (); @@ -70,7 +71,7 @@ where T: AsyncRead + AsyncWrite, S: NewService, S::Service: Clone, - S::Error: Into, + S::Error: Debug + Display, { type Item = H1ServiceHandler; type Error = S::InitError; @@ -94,7 +95,7 @@ pub struct H1ServiceHandler { impl H1ServiceHandler where S: Service + Clone, - S::Error: Into, + S::Error: Debug + Display, { fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler { H1ServiceHandler { @@ -109,7 +110,7 @@ impl Service for H1ServiceHandler where T: AsyncRead + AsyncWrite, S: Service + Clone, - S::Error: Into, + S::Error: Debug + Display, { type Request = T; type Response = ();