diff --git a/middleware/src/read_response_body.rs b/middleware/src/read_response_body.rs index f20be35d..26497bea 100644 --- a/middleware/src/read_response_body.rs +++ b/middleware/src/read_response_body.rs @@ -1,10 +1,10 @@ use actix_service::{Service, Transform}; -use actix_web::body::{Body, MessageBody, ResponseBody}; -use actix_web::error::PayloadError; +use actix_web::body::{BodySize, MessageBody, ResponseBody}; +use std::marker::PhantomData; use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; +use futures::Async; use futures::future::{ok, FutureResult}; -use futures::stream::Stream; use futures::{Future, Poll}; pub struct Logging; @@ -12,11 +12,10 @@ pub struct Logging; impl Transform for Logging where S: Service, Error = Error>, - S::Future: 'static, B: MessageBody + 'static, { type Request = ServiceRequest; - type Response = ServiceResponse; + type Response = ServiceResponse>; type Error = Error; type InitError = (); type Transform = LoggingMiddleware; @@ -35,36 +34,78 @@ pub struct LoggingMiddleware { impl Service for LoggingMiddleware where - S: Service, Error = Error> - + 'static, - S::Future: 'static, - B: MessageBody + 'static, + S: Service, Error = Error>, + B: MessageBody, { type Request = ServiceRequest; - type Response = ServiceResponse; + type Response = ServiceResponse>; type Error = Error; - type Future = Box>; + type Future = WrapperStream; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.service.poll_ready() } fn call(&mut self, req: ServiceRequest) -> Self::Future { - Box::new(self.service.call(req).and_then(|res| { - Ok(res.map_body(|_, body| { - ResponseBody::Other(Body::Bytes( - body.fold(BytesMut::new(), move |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok::<_, PayloadError>(body) - }) - .and_then(move |bytes| { - println!("response body: {:?}", &bytes); - ok(bytes.freeze()) - }) - .wait() - .unwrap(), - )) - })) - })) + WrapperStream { + fut: self.service.call(req), + _t: PhantomData, + } + } +} + +pub struct WrapperStream +where + B: MessageBody, + S: Service, +{ + fut: S::Future, + _t: PhantomData<(B,)>, +} + +impl Future for WrapperStream +where + B: MessageBody, + S: Service, Error = Error>, +{ + type Item = ServiceResponse>; + type Error = Error; + + fn poll(&mut self) -> Poll { + let res = futures::try_ready!(self.fut.poll()); + + Ok(Async::Ready(res.map_body(move |_, body| { + ResponseBody::Body(BodyLogger { + body, + body_accum: BytesMut::new(), + }) + }))) + } +} + +pub struct BodyLogger { + body: ResponseBody, + body_accum: BytesMut, +} + +impl Drop for BodyLogger { + fn drop(&mut self) { + println!("response body: {:?}", self.body_accum); + } +} + +impl MessageBody for BodyLogger { + fn size(&self) -> BodySize { + self.body.size() + } + + fn poll_next(&mut self) -> Poll, Error> { + match self.body.poll_next()? { + Async::Ready(Some(chunk)) => { + self.body_accum.extend_from_slice(&chunk); + Ok(Async::Ready(Some(chunk))) + } + val => Ok(val), + } } }