1
0
mirror of https://github.com/actix/examples synced 2025-02-02 09:39:03 +01:00

Use a Stream wrapper instead for async bodies

This commit is contained in:
Sven-Hendrik Haase 2019-09-18 06:16:47 +02:00
parent 56859e14d2
commit b2f4f90454

View File

@ -1,10 +1,10 @@
use actix_service::{Service, Transform}; use actix_service::{Service, Transform};
use actix_web::body::{Body, MessageBody, ResponseBody}; use actix_web::body::{BodySize, MessageBody, ResponseBody};
use actix_web::error::PayloadError; use std::marker::PhantomData;
use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error}; use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error};
use bytes::BytesMut; use bytes::{Bytes, BytesMut};
use futures::Async;
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use futures::stream::Stream;
use futures::{Future, Poll}; use futures::{Future, Poll};
pub struct Logging; pub struct Logging;
@ -12,11 +12,10 @@ pub struct Logging;
impl<S: 'static, B> Transform<S> for Logging impl<S: 'static, B> Transform<S> for Logging
where where
S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>, S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
type Request = ServiceRequest; type Request = ServiceRequest;
type Response = ServiceResponse<B>; type Response = ServiceResponse<BodyLogger<B>>;
type Error = Error; type Error = Error;
type InitError = (); type InitError = ();
type Transform = LoggingMiddleware<S>; type Transform = LoggingMiddleware<S>;
@ -35,36 +34,78 @@ pub struct LoggingMiddleware<S> {
impl<S, B> Service for LoggingMiddleware<S> impl<S, B> Service for LoggingMiddleware<S>
where where
S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error> S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
+ 'static, B: MessageBody,
S::Future: 'static,
B: MessageBody + 'static,
{ {
type Request = ServiceRequest; type Request = ServiceRequest;
type Response = ServiceResponse<B>; type Response = ServiceResponse<BodyLogger<B>>;
type Error = Error; type Error = Error;
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>; type Future = WrapperStream<S, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready() self.service.poll_ready()
} }
fn call(&mut self, req: ServiceRequest) -> Self::Future { fn call(&mut self, req: ServiceRequest) -> Self::Future {
Box::new(self.service.call(req).and_then(|res| { WrapperStream {
Ok(res.map_body(|_, body| { fut: self.service.call(req),
ResponseBody::Other(Body::Bytes( _t: PhantomData,
body.fold(BytesMut::new(), move |mut body, chunk| { }
body.extend_from_slice(&chunk); }
Ok::<_, PayloadError>(body) }
})
.and_then(move |bytes| { pub struct WrapperStream<S, B>
println!("response body: {:?}", &bytes); where
ok(bytes.freeze()) B: MessageBody,
}) S: Service,
.wait() {
.unwrap(), fut: S::Future,
)) _t: PhantomData<(B,)>,
})) }
}))
impl<S, B> Future for WrapperStream<S, B>
where
B: MessageBody,
S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
{
type Item = ServiceResponse<BodyLogger<B>>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let res = futures::try_ready!(self.fut.poll());
Ok(Async::Ready(res.map_body(move |_, body| {
ResponseBody::Body(BodyLogger {
body,
body_accum: BytesMut::new(),
})
})))
}
}
pub struct BodyLogger<B> {
body: ResponseBody<B>,
body_accum: BytesMut,
}
impl<B> Drop for BodyLogger<B> {
fn drop(&mut self) {
println!("response body: {:?}", self.body_accum);
}
}
impl<B: MessageBody> MessageBody for BodyLogger<B> {
fn size(&self) -> BodySize {
self.body.size()
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
match self.body.poll_next()? {
Async::Ready(Some(chunk)) => {
self.body_accum.extend_from_slice(&chunk);
Ok(Async::Ready(Some(chunk)))
}
val => Ok(val),
}
} }
} }