mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-01 08:45:10 +02:00
simplify error handling
This commit is contained in:
@ -1,15 +1,15 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::time::Instant;
|
||||
|
||||
use actix_net::codec::Framed;
|
||||
use actix_net::service::Service;
|
||||
|
||||
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
|
||||
use log::Level::Debug;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::Delay;
|
||||
|
||||
use error::{Error, ParseError, PayloadError};
|
||||
use error::{ParseError, PayloadError};
|
||||
use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
|
||||
|
||||
use body::Body;
|
||||
@ -38,7 +38,7 @@ bitflags! {
|
||||
/// Dispatcher for HTTP/1.1 protocol
|
||||
pub struct Dispatcher<T, S: Service>
|
||||
where
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
service: S,
|
||||
flags: Flags,
|
||||
@ -61,7 +61,7 @@ enum Message {
|
||||
|
||||
enum State<S: Service> {
|
||||
None,
|
||||
Response(S::Future),
|
||||
ServiceCall(S::Future),
|
||||
SendResponse(Option<OutMessage>),
|
||||
SendResponseWithPayload(Option<(OutMessage, Body)>),
|
||||
Payload(Body),
|
||||
@ -81,7 +81,7 @@ impl<T, S> Dispatcher<T, S>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
S: Service<Request = Request, Response = Response>,
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
/// Create http/1 dispatcher.
|
||||
pub fn new(stream: T, config: ServiceConfig, service: S) -> Self {
|
||||
@ -177,7 +177,7 @@ where
|
||||
State::None => loop {
|
||||
break if let Some(msg) = self.messages.pop_front() {
|
||||
match msg {
|
||||
Message::Item(req) => Some(self.handle_request(req)),
|
||||
Message::Item(req) => Some(self.handle_request(req)?),
|
||||
Message::Error(res) => Some(State::SendResponse(Some(
|
||||
OutMessage::Response(res),
|
||||
))),
|
||||
@ -187,23 +187,23 @@ where
|
||||
};
|
||||
},
|
||||
State::Payload(ref mut _body) => unimplemented!(),
|
||||
State::Response(ref mut fut) => match fut.poll() {
|
||||
Ok(Async::Ready(mut res)) => {
|
||||
State::ServiceCall(ref mut fut) => match fut
|
||||
.poll()
|
||||
.map_err(DispatchError::Service)?
|
||||
{
|
||||
Async::Ready(mut res) => {
|
||||
self.framed.get_codec_mut().prepare_te(&mut res);
|
||||
if res.body().is_streaming() {
|
||||
unimplemented!()
|
||||
} else {
|
||||
let body = res.replace_body(Body::Empty);
|
||||
if body.is_empty() {
|
||||
Some(State::SendResponse(Some(OutMessage::Response(res))))
|
||||
} else {
|
||||
Some(State::SendResponseWithPayload(Some((
|
||||
OutMessage::Response(res),
|
||||
body,
|
||||
))))
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => None,
|
||||
Err(err) => {
|
||||
let err = err.into();
|
||||
if log_enabled!(Debug) {
|
||||
debug!("{:?}", err);
|
||||
}
|
||||
Some(State::SendResponse(Some(OutMessage::Response(err.into()))))
|
||||
}
|
||||
Async::NotReady => None,
|
||||
},
|
||||
State::SendResponse(ref mut item) => {
|
||||
let msg = item.take().expect("SendResponse is empty");
|
||||
@ -273,25 +273,24 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_request(&mut self, req: Request) -> State<S> {
|
||||
fn handle_request(
|
||||
&mut self, req: Request,
|
||||
) -> Result<State<S>, DispatchError<S::Error>> {
|
||||
let mut task = self.service.call(req);
|
||||
match task.poll() {
|
||||
Ok(Async::Ready(mut res)) => {
|
||||
match task.poll().map_err(DispatchError::Service)? {
|
||||
Async::Ready(mut res) => {
|
||||
self.framed.get_codec_mut().prepare_te(&mut res);
|
||||
if res.body().is_streaming() {
|
||||
unimplemented!()
|
||||
let body = res.replace_body(Body::Empty);
|
||||
if body.is_empty() {
|
||||
Ok(State::SendResponse(Some(OutMessage::Response(res))))
|
||||
} else {
|
||||
State::SendResponse(Some(OutMessage::Response(res)))
|
||||
Ok(State::SendResponseWithPayload(Some((
|
||||
OutMessage::Response(res),
|
||||
body,
|
||||
))))
|
||||
}
|
||||
}
|
||||
Ok(Async::NotReady) => State::Response(task),
|
||||
Err(err) => {
|
||||
let err = err.into();
|
||||
if log_enabled!(Debug) {
|
||||
debug!("{:?}", err);
|
||||
}
|
||||
State::SendResponse(Some(OutMessage::Response(err.into())))
|
||||
}
|
||||
Async::NotReady => Ok(State::ServiceCall(task)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -302,7 +301,7 @@ where
|
||||
InMessage::Message(msg) => {
|
||||
// handle request early
|
||||
if self.state.is_empty() {
|
||||
self.state = self.handle_request(msg);
|
||||
self.state = self.handle_request(msg)?;
|
||||
} else {
|
||||
self.messages.push_back(Message::Item(msg));
|
||||
}
|
||||
@ -445,7 +444,7 @@ impl<T, S> Future for Dispatcher<T, S>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
S: Service<Request = Request, Response = Response>,
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = DispatchError<S::Error>;
|
||||
|
@ -1,3 +1,4 @@
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use actix_net::codec::Framed;
|
||||
@ -6,7 +7,7 @@ use futures::{future, Async, Future, Poll, Stream};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use config::ServiceConfig;
|
||||
use error::{DispatchError, Error, ParseError};
|
||||
use error::{DispatchError, ParseError};
|
||||
use request::Request;
|
||||
use response::Response;
|
||||
|
||||
@ -24,7 +25,7 @@ impl<T, S> H1Service<T, S>
|
||||
where
|
||||
S: NewService,
|
||||
S::Service: Clone,
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
/// Create new `HttpService` instance.
|
||||
pub fn new<F: IntoNewService<S>>(cfg: ServiceConfig, service: F) -> Self {
|
||||
@ -41,7 +42,7 @@ where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request = Request, Response = Response> + Clone,
|
||||
S::Service: Clone,
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
type Request = T;
|
||||
type Response = ();
|
||||
@ -70,7 +71,7 @@ where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
S: NewService<Request = Request, Response = Response>,
|
||||
S::Service: Clone,
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
type Item = H1ServiceHandler<T, S::Service>;
|
||||
type Error = S::InitError;
|
||||
@ -94,7 +95,7 @@ pub struct H1ServiceHandler<T, S> {
|
||||
impl<T, S> H1ServiceHandler<T, S>
|
||||
where
|
||||
S: Service<Request = Request, Response = Response> + Clone,
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler<T, S> {
|
||||
H1ServiceHandler {
|
||||
@ -109,7 +110,7 @@ impl<T, S> Service for H1ServiceHandler<T, S>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
S: Service<Request = Request, Response = Response> + Clone,
|
||||
S::Error: Into<Error>,
|
||||
S::Error: Debug + Display,
|
||||
{
|
||||
type Request = T;
|
||||
type Response = ();
|
||||
|
Reference in New Issue
Block a user