From 56859e14d2ca44025a07ce3ae799a3325d5ba04e Mon Sep 17 00:00:00 2001 From: Sven-Hendrik Haase Date: Wed, 18 Sep 2019 04:37:35 +0200 Subject: [PATCH 1/2] Add middleware to log the response body --- middleware/Cargo.toml | 2 +- middleware/README.md | 6 +- middleware/src/main.rs | 7 +- .../{read_body.rs => read_request_body.rs} | 0 middleware/src/read_response_body.rs | 70 +++++++++++++++++++ 5 files changed, 81 insertions(+), 4 deletions(-) rename middleware/src/{read_body.rs => read_request_body.rs} (100%) create mode 100644 middleware/src/read_response_body.rs diff --git a/middleware/Cargo.toml b/middleware/Cargo.toml index da90471..84e6643 100644 --- a/middleware/Cargo.toml +++ b/middleware/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "middleware-example" version = "0.1.0" -authors = ["Gorm Casper "] +authors = ["Gorm Casper ", "Sven-Hendrik Haase "] edition = "2018" workspace = ".." diff --git a/middleware/README.md b/middleware/README.md index 401b9fd..68a13c8 100644 --- a/middleware/README.md +++ b/middleware/README.md @@ -19,10 +19,14 @@ they function. A middleware implementing a request guard which sketches a rough approximation of what a login could look like. -### read_body::Logging +### read_request_body::Logging A middleware demonstrating how to read out the incoming request body. +### read_response_body::Logging + +A middleware demonstrating how to read out the outgoing response body. + ### simple::SayHi A minimal middleware demonstrating the sequence of operations in an actix middleware. diff --git a/middleware/src/main.rs b/middleware/src/main.rs index 49dfb98..d4ff9e3 100644 --- a/middleware/src/main.rs +++ b/middleware/src/main.rs @@ -5,7 +5,9 @@ use futures::future::Future; #[allow(dead_code)] mod redirect; #[allow(dead_code)] -mod read_body; +mod read_request_body; +#[allow(dead_code)] +mod read_response_body; #[allow(dead_code)] mod simple; @@ -16,7 +18,8 @@ fn main() -> std::io::Result<()> { HttpServer::new(|| { App::new() .wrap(redirect::CheckLogin) - .wrap(read_body::Logging) + .wrap(read_request_body::Logging) + .wrap(read_response_body::Logging) .wrap(simple::SayHi) .wrap_fn(|req, srv| { println!("Hi from start. You requested: {}", req.path()); diff --git a/middleware/src/read_body.rs b/middleware/src/read_request_body.rs similarity index 100% rename from middleware/src/read_body.rs rename to middleware/src/read_request_body.rs diff --git a/middleware/src/read_response_body.rs b/middleware/src/read_response_body.rs new file mode 100644 index 0000000..f20be35 --- /dev/null +++ b/middleware/src/read_response_body.rs @@ -0,0 +1,70 @@ +use actix_service::{Service, Transform}; +use actix_web::body::{Body, MessageBody, ResponseBody}; +use actix_web::error::PayloadError; +use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error}; +use bytes::BytesMut; +use futures::future::{ok, FutureResult}; +use futures::stream::Stream; +use futures::{Future, Poll}; + +pub struct Logging; + +impl Transform for Logging +where + S: Service, Error = Error>, + S::Future: 'static, + B: MessageBody + 'static, +{ + type Request = ServiceRequest; + type Response = ServiceResponse; + type Error = Error; + type InitError = (); + type Transform = LoggingMiddleware; + type Future = FutureResult; + + fn new_transform(&self, service: S) -> Self::Future { + ok(LoggingMiddleware { + service, + }) + } +} + +pub struct LoggingMiddleware { + service: S, +} + +impl Service for LoggingMiddleware +where + S: Service, Error = Error> + + 'static, + S::Future: 'static, + B: MessageBody + 'static, +{ + type Request = ServiceRequest; + type Response = ServiceResponse; + type Error = Error; + type Future = Box>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready() + } + + fn call(&mut self, req: ServiceRequest) -> Self::Future { + Box::new(self.service.call(req).and_then(|res| { + Ok(res.map_body(|_, body| { + ResponseBody::Other(Body::Bytes( + body.fold(BytesMut::new(), move |mut body, chunk| { + body.extend_from_slice(&chunk); + Ok::<_, PayloadError>(body) + }) + .and_then(move |bytes| { + println!("response body: {:?}", &bytes); + ok(bytes.freeze()) + }) + .wait() + .unwrap(), + )) + })) + })) + } +} From b2f4f90454c8f1befdebaa1792f7cdbbaa4722d4 Mon Sep 17 00:00:00 2001 From: Sven-Hendrik Haase Date: Wed, 18 Sep 2019 06:16:47 +0200 Subject: [PATCH 2/2] Use a Stream wrapper instead for async bodies --- middleware/src/read_response_body.rs | 97 ++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 28 deletions(-) diff --git a/middleware/src/read_response_body.rs b/middleware/src/read_response_body.rs index f20be35..26497be 100644 --- a/middleware/src/read_response_body.rs +++ b/middleware/src/read_response_body.rs @@ -1,10 +1,10 @@ use actix_service::{Service, Transform}; -use actix_web::body::{Body, MessageBody, ResponseBody}; -use actix_web::error::PayloadError; +use actix_web::body::{BodySize, MessageBody, ResponseBody}; +use std::marker::PhantomData; use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; +use futures::Async; use futures::future::{ok, FutureResult}; -use futures::stream::Stream; use futures::{Future, Poll}; pub struct Logging; @@ -12,11 +12,10 @@ pub struct Logging; impl Transform for Logging where S: Service, Error = Error>, - S::Future: 'static, B: MessageBody + 'static, { type Request = ServiceRequest; - type Response = ServiceResponse; + type Response = ServiceResponse>; type Error = Error; type InitError = (); type Transform = LoggingMiddleware; @@ -35,36 +34,78 @@ pub struct LoggingMiddleware { impl Service for LoggingMiddleware where - S: Service, Error = Error> - + 'static, - S::Future: 'static, - B: MessageBody + 'static, + S: Service, Error = Error>, + B: MessageBody, { type Request = ServiceRequest; - type Response = ServiceResponse; + type Response = ServiceResponse>; type Error = Error; - type Future = Box>; + type Future = WrapperStream; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.service.poll_ready() } fn call(&mut self, req: ServiceRequest) -> Self::Future { - Box::new(self.service.call(req).and_then(|res| { - Ok(res.map_body(|_, body| { - ResponseBody::Other(Body::Bytes( - body.fold(BytesMut::new(), move |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok::<_, PayloadError>(body) - }) - .and_then(move |bytes| { - println!("response body: {:?}", &bytes); - ok(bytes.freeze()) - }) - .wait() - .unwrap(), - )) - })) - })) + WrapperStream { + fut: self.service.call(req), + _t: PhantomData, + } + } +} + +pub struct WrapperStream +where + B: MessageBody, + S: Service, +{ + fut: S::Future, + _t: PhantomData<(B,)>, +} + +impl Future for WrapperStream +where + B: MessageBody, + S: Service, Error = Error>, +{ + type Item = ServiceResponse>; + type Error = Error; + + fn poll(&mut self) -> Poll { + let res = futures::try_ready!(self.fut.poll()); + + Ok(Async::Ready(res.map_body(move |_, body| { + ResponseBody::Body(BodyLogger { + body, + body_accum: BytesMut::new(), + }) + }))) + } +} + +pub struct BodyLogger { + body: ResponseBody, + body_accum: BytesMut, +} + +impl Drop for BodyLogger { + fn drop(&mut self) { + println!("response body: {:?}", self.body_accum); + } +} + +impl MessageBody for BodyLogger { + fn size(&self) -> BodySize { + self.body.size() + } + + fn poll_next(&mut self) -> Poll, Error> { + match self.body.poll_next()? { + Async::Ready(Some(chunk)) => { + self.body_accum.extend_from_slice(&chunk); + Ok(Async::Ready(Some(chunk))) + } + val => Ok(val), + } } }