1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

make Reply generic over error too

This commit is contained in:
Nikolay Kim 2018-05-02 16:33:29 -07:00
parent 32a2866449
commit 7036656ae4
6 changed files with 112 additions and 138 deletions

View File

@ -1,5 +1,4 @@
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use futures::future::{err, ok, Future};
@ -189,77 +188,74 @@ where
/// * Message(T) - ready item
/// * Error(Error) - error happen during reply process
/// * Future<T, Error> - reply process completes in the future
pub struct Reply<T>(ReplyItem<T>);
pub struct Reply<I, E = Error>(Option<ReplyResult<I, E>>);
impl<T> Future for Reply<T> {
type Item = T;
type Error = Error;
impl<I, E> Future for Reply<I, E> {
type Item = I;
type Error = E;
fn poll(&mut self) -> Poll<T, Error> {
let item = mem::replace(&mut self.0, ReplyItem::None);
match item {
ReplyItem::Error(err) => Err(err),
ReplyItem::Message(msg) => Ok(Async::Ready(msg)),
ReplyItem::Future(mut fut) => match fut.poll() {
fn poll(&mut self) -> Poll<I, E> {
let res = self.0.take().expect("use after resolve");
match res {
ReplyResult::Ok(msg) => Ok(Async::Ready(msg)),
ReplyResult::Err(err) => Err(err),
ReplyResult::Future(mut fut) => match fut.poll() {
Ok(Async::NotReady) => {
self.0 = ReplyItem::Future(fut);
self.0 = Some(ReplyResult::Future(fut));
Ok(Async::NotReady)
}
Ok(Async::Ready(msg)) => Ok(Async::Ready(msg)),
Err(err) => Err(err),
},
ReplyItem::None => panic!("use after resolve"),
}
}
}
pub(crate) enum ReplyItem<T> {
None,
Error(Error),
Message(T),
Future(Box<Future<Item = T, Error = Error>>),
pub(crate) enum ReplyResult<I, E> {
Ok(I),
Err(E),
Future(Box<Future<Item = I, Error = E>>),
}
impl<T> Reply<T> {
impl<I, E> Reply<I, E> {
/// Create async response
#[inline]
pub fn async<F>(fut: F) -> Reply<T>
pub fn async<F>(fut: F) -> Reply<I, E>
where
F: Future<Item = T, Error = Error> + 'static,
F: Future<Item = I, Error = E> + 'static,
{
Reply(ReplyItem::Future(Box::new(fut)))
Reply(Some(ReplyResult::Future(Box::new(fut))))
}
/// Send response
#[inline]
pub fn response<R: Into<T>>(response: R) -> Reply<T> {
Reply(ReplyItem::Message(response.into()))
pub fn response<R: Into<I>>(response: R) -> Reply<I, E> {
Reply(Some(ReplyResult::Ok(response.into())))
}
/// Send error
#[inline]
pub fn error<R: Into<Error>>(err: R) -> Reply<T> {
Reply(ReplyItem::Error(err.into()))
pub fn error<R: Into<E>>(err: R) -> Reply<I, E> {
Reply(Some(ReplyResult::Err(err.into())))
}
#[inline]
pub(crate) fn into(self) -> ReplyItem<T> {
self.0
pub(crate) fn into(self) -> ReplyResult<I, E> {
self.0.expect("use after resolve")
}
#[cfg(test)]
pub(crate) fn as_msg(&self) -> &T {
match self.0 {
ReplyItem::Message(ref resp) => resp,
ReplyResult::Ok(ref resp) => resp,
_ => panic!(),
}
}
#[cfg(test)]
pub(crate) fn as_err(&self) -> Option<&Error> {
pub(crate) fn as_err(&self) -> Option<&E> {
match self.0 {
ReplyItem::Error(ref err) => Some(err),
ReplyResult::Err(ref err) => Some(err),
_ => None,
}
}
@ -280,14 +276,14 @@ impl Responder for HttpResponse {
#[inline]
fn respond_to(self, _: HttpRequest) -> Result<Reply<HttpResponse>, Error> {
Ok(Reply(ReplyItem::Message(self)))
Ok(Reply(Some(ReplyResult::Ok(self))))
}
}
impl<T> From<T> for Reply<T> {
#[inline]
fn from(resp: T) -> Reply<T> {
Reply(ReplyItem::Message(resp))
Reply(Some(ReplyResult::Ok(resp)))
}
}
@ -311,7 +307,7 @@ impl<T, E: Into<Error>> From<Result<Reply<T>, E>> for Reply<T> {
fn from(res: Result<Reply<T>, E>) -> Self {
match res {
Ok(val) => val,
Err(err) => Reply(ReplyItem::Error(err.into())),
Err(err) => Reply(Some(ReplyResult::Err(err.into()))),
}
}
}
@ -320,8 +316,8 @@ impl<T, E: Into<Error>> From<Result<T, E>> for Reply<T> {
#[inline]
fn from(res: Result<T, E>) -> Self {
match res {
Ok(val) => Reply(ReplyItem::Message(val)),
Err(err) => Reply(ReplyItem::Error(err.into())),
Ok(val) => Reply(Some(ReplyResult::Ok(val))),
Err(err) => Reply(Some(ReplyResult::Err(err.into()))),
}
}
}
@ -332,8 +328,8 @@ impl<T, E: Into<Error>> From<Result<Box<Future<Item = T, Error = Error>>, E>>
#[inline]
fn from(res: Result<Box<Future<Item = T, Error = Error>>, E>) -> Self {
match res {
Ok(fut) => Reply(ReplyItem::Future(fut)),
Err(err) => Reply(ReplyItem::Error(err.into())),
Ok(fut) => Reply(Some(ReplyResult::Future(fut))),
Err(err) => Reply(Some(ReplyResult::Err(err.into()))),
}
}
}
@ -341,7 +337,7 @@ impl<T, E: Into<Error>> From<Result<Box<Future<Item = T, Error = Error>>, E>>
impl<T> From<Box<Future<Item = T, Error = Error>>> for Reply<T> {
#[inline]
fn from(fut: Box<Future<Item = T, Error = Error>>) -> Reply<T> {
Reply(ReplyItem::Future(fut))
Reply(Some(ReplyResult::Future(fut)))
}
}
@ -360,8 +356,8 @@ where
fn respond_to(self, req: HttpRequest) -> Result<Reply<HttpResponse>, Error> {
let fut = self.map_err(|e| e.into())
.then(move |r| match r.respond_to(req) {
Ok(reply) => match reply.into().0 {
ReplyItem::Message(resp) => ok(resp),
Ok(reply) => match reply.into().into() {
ReplyResult::Ok(resp) => ok(resp),
_ => panic!("Nested async replies are not supported"),
},
Err(e) => err(e),
@ -456,8 +452,8 @@ where
let req2 = req.drop_state();
let fut = (self.h)(req).map_err(|e| e.into()).then(move |r| {
match r.respond_to(req2) {
Ok(reply) => match reply.into().0 {
ReplyItem::Message(resp) => ok(resp),
Ok(reply) => match reply.into().into() {
ReplyResult::Ok(resp) => ok(resp),
_ => panic!("Nested async replies are not supported"),
},
Err(e) => err(e),

View File

@ -11,7 +11,7 @@ use application::Inner;
use body::{Body, BodyStream};
use context::{ActorHttpContext, Frame};
use error::Error;
use handler::{Reply, ReplyItem};
use handler::{Reply, ReplyResult};
use header::ContentEncoding;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -324,14 +324,13 @@ impl<S: 'static, H> WaitingResponse<S, H> {
info: &mut PipelineInfo<S>, reply: Reply<HttpResponse>,
) -> PipelineState<S, H> {
match reply.into() {
ReplyItem::Error(err) => RunMiddlewares::init(info, err.into()),
ReplyItem::Message(resp) => RunMiddlewares::init(info, resp),
ReplyItem::Future(fut) => PipelineState::Handler(WaitingResponse {
ReplyResult::Err(err) => RunMiddlewares::init(info, err.into()),
ReplyResult::Ok(resp) => RunMiddlewares::init(info, resp),
ReplyResult::Future(fut) => PipelineState::Handler(WaitingResponse {
fut,
_s: PhantomData,
_h: PhantomData,
}),
ReplyItem::None => panic!("use after resolve"),
}
}

View File

@ -5,7 +5,7 @@ use std::rc::Rc;
use futures::{Async, Future, Poll};
use error::Error;
use handler::{AsyncHandler, FromRequest, Handler, Reply, ReplyItem, Responder,
use handler::{AsyncHandler, FromRequest, Handler, Reply, ReplyResult, Responder,
RouteHandler, WrapHandler};
use http::StatusCode;
use httprequest::HttpRequest;
@ -417,13 +417,12 @@ impl<S: 'static> WaitingResponse<S> {
#[inline]
fn init(info: &mut ComposeInfo<S>, reply: Reply<HttpResponse>) -> ComposeState<S> {
match reply.into() {
ReplyItem::Error(err) => RunMiddlewares::init(info, err.into()),
ReplyItem::Message(resp) => RunMiddlewares::init(info, resp),
ReplyItem::Future(fut) => ComposeState::Handler(WaitingResponse {
ReplyResult::Err(err) => RunMiddlewares::init(info, err.into()),
ReplyResult::Ok(resp) => RunMiddlewares::init(info, resp),
ReplyResult::Future(fut) => ComposeState::Handler(WaitingResponse {
fut,
_s: PhantomData,
}),
ReplyItem::None => panic!("use after resolve"),
}
}

View File

@ -5,7 +5,7 @@ use std::rc::Rc;
use futures::{Async, Future, Poll};
use error::Error;
use handler::{FromRequest, Reply, ReplyItem, Responder, RouteHandler};
use handler::{FromRequest, Reply, ReplyResult, Responder, RouteHandler};
use http::Method;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -523,13 +523,12 @@ impl<S: 'static> WaitingResponse<S> {
#[inline]
fn init(info: &mut ComposeInfo<S>, reply: Reply<HttpResponse>) -> ComposeState<S> {
match reply.into() {
ReplyItem::Message(resp) => RunMiddlewares::init(info, resp),
ReplyItem::Error(err) => RunMiddlewares::init(info, err.into()),
ReplyItem::Future(fut) => ComposeState::Handler(WaitingResponse {
ReplyResult::Ok(resp) => RunMiddlewares::init(info, resp),
ReplyResult::Err(err) => RunMiddlewares::init(info, err.into()),
ReplyResult::Future(fut) => ComposeState::Handler(WaitingResponse {
fut,
_s: PhantomData,
}),
ReplyItem::None => panic!("use after resolve"),
}
}

View File

@ -21,7 +21,7 @@ use application::{App, HttpApplication};
use body::Binary;
use client::{ClientConnector, ClientRequest, ClientRequestBuilder};
use error::Error;
use handler::{Handler, ReplyItem, Responder};
use handler::{Handler, ReplyResult, Responder};
use header::{Header, IntoHeaderValue};
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -601,10 +601,9 @@ impl<S> TestRequest<S> {
match resp.respond_to(req.drop_state()) {
Ok(resp) => match resp.into().into() {
ReplyItem::Message(resp) => Ok(resp),
ReplyItem::Error(err) => Ok(err.into()),
ReplyItem::Future(_) => panic!("Async handler is not supported."),
ReplyItem::None => panic!("use after resolve"),
ReplyResult::Ok(resp) => Ok(resp),
ReplyResult::Err(err) => Ok(err.into()),
ReplyResult::Future(_) => panic!("Async handler is not supported."),
},
Err(err) => Err(err),
}
@ -628,7 +627,7 @@ impl<S> TestRequest<S> {
match core.run(fut) {
Ok(r) => match r.respond_to(req.drop_state()) {
Ok(reply) => match reply.into().into() {
ReplyItem::Message(resp) => Ok(resp),
ReplyResult::Ok(resp) => Ok(resp),
_ => panic!("Nested async replies are not supported"),
},
Err(e) => Err(e),

View File

@ -5,7 +5,7 @@ use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use error::Error;
use handler::{FromRequest, Handler, Reply, ReplyItem, Responder};
use handler::{FromRequest, Handler, Reply, ReplyResult, Responder};
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
@ -136,13 +136,12 @@ where
self.started = true;
let reply = T::from_request(&self.req, self.cfg.as_ref()).into();
match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.fut1 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
}
} else {
match self.fut1.as_mut().unwrap().poll()? {
@ -158,13 +157,12 @@ where
};
match item.into() {
ReplyItem::Error(err) => Err(err),
ReplyItem::Message(resp) => Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => Err(err),
ReplyResult::Ok(resp) => Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => {
self.fut2 = Some(fut);
self.poll()
}
ReplyItem::None => panic!("use after resolve"),
}
}
}
@ -270,37 +268,34 @@ where
self.started = true;
let reply = T1::from_request(&self.req, self.cfg1.as_ref()).into();
let item1 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.fut1 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let reply = T2::from_request(&self.req, self.cfg2.as_ref()).into();
let item2 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.item = Some(item1);
self.fut2 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let hnd: &mut F = unsafe { &mut *self.hnd.get() };
match (*hnd)(item1, item2).respond_to(self.req.drop_state()) {
Ok(item) => match item.into().into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(resp) => return Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(resp) => return Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => {
self.fut3 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
},
Err(e) => return Err(e.into()),
}
@ -311,26 +306,24 @@ where
Async::Ready(item) => {
let reply = T2::from_request(&self.req, self.cfg2.as_ref()).into();
let item2 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.item = Some(item);
self.fut2 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let hnd: &mut F = unsafe { &mut *self.hnd.get() };
match (*hnd)(item, item2).respond_to(self.req.drop_state()) {
Ok(item) => match item.into().into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(resp) => return Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(resp) => return Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => {
self.fut3 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
},
Err(e) => return Err(e.into()),
}
@ -353,10 +346,9 @@ where
};
match item.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(resp) => return Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => self.fut3 = Some(fut),
ReplyItem::None => panic!("use after resolve"),
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(resp) => return Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => self.fut3 = Some(fut),
}
self.poll()
@ -481,50 +473,46 @@ where
self.started = true;
let reply = T1::from_request(&self.req, self.cfg1.as_ref()).into();
let item1 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.fut1 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let reply = T2::from_request(&self.req, self.cfg2.as_ref()).into();
let item2 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.item1 = Some(item1);
self.fut2 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let reply = T3::from_request(&self.req, self.cfg3.as_ref()).into();
let item3 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.item1 = Some(item1);
self.item2 = Some(item2);
self.fut3 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let hnd: &mut F = unsafe { &mut *self.hnd.get() };
match (*hnd)(item1, item2, item3).respond_to(self.req.drop_state()) {
Ok(item) => match item.into().into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(resp) => return Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(resp) => return Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => {
self.fut4 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
},
Err(e) => return Err(e.into()),
}
@ -537,38 +525,35 @@ where
self.fut1.take();
let reply = T2::from_request(&self.req, self.cfg2.as_ref()).into();
let item2 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.fut2 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let reply = T3::from_request(&self.req, self.cfg3.as_ref()).into();
let item3 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.item2 = Some(item2);
self.fut3 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let hnd: &mut F = unsafe { &mut *self.hnd.get() };
match (*hnd)(self.item1.take().unwrap(), item2, item3)
.respond_to(self.req.drop_state())
{
Ok(item) => match item.into().into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(resp) => return Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(resp) => return Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => {
self.fut4 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
},
Err(e) => return Err(e.into()),
}
@ -583,27 +568,25 @@ where
self.fut2.take();
let reply = T3::from_request(&self.req, self.cfg3.as_ref()).into();
let item3 = match reply.into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(msg) => msg,
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(msg) => msg,
ReplyResult::Future(fut) => {
self.item2 = Some(item);
self.fut3 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
};
let hnd: &mut F = unsafe { &mut *self.hnd.get() };
match (*hnd)(self.item1.take().unwrap(), item, item3)
.respond_to(self.req.drop_state())
{
Ok(item) => match item.into().into() {
ReplyItem::Error(err) => return Err(err),
ReplyItem::Message(resp) => return Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => {
ReplyResult::Err(err) => return Err(err),
ReplyResult::Ok(resp) => return Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => {
self.fut4 = Some(fut);
return self.poll();
}
ReplyItem::None => panic!("use after resolve"),
},
Err(e) => return Err(e.into()),
}
@ -629,10 +612,9 @@ where
};
match item.into() {
ReplyItem::Error(err) => return Ok(Async::Ready(err.into())),
ReplyItem::Message(resp) => return Ok(Async::Ready(resp)),
ReplyItem::Future(fut) => self.fut4 = Some(fut),
ReplyItem::None => panic!("use after resolve"),
ReplyResult::Err(err) => return Ok(Async::Ready(err.into())),
ReplyResult::Ok(resp) => return Ok(Async::Ready(resp)),
ReplyResult::Future(fut) => self.fut4 = Some(fut),
}
self.poll()