From e6c90a1729fc4339b26afff0586e3582cd8f6eba Mon Sep 17 00:00:00 2001 From: Riley Date: Sun, 10 Oct 2021 07:20:24 -0500 Subject: [PATCH] Hold root span across polls in streamed body (#40) * Hold root span across polls in streamed body * Satisfy clippy * Remove Unpin bound * Add documentation about Compat middlware * Don't use fully qualified MessageBody * Satisfy clippy --- Cargo.toml | 1 + src/middleware.rs | 111 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 98 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index df4e10de3..9016503e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ emit_event_on_error = [] [dependencies] actix-web = { version = "=4.0.0-beta.9", default-features = false } +pin-project = "1.0.0" tracing = "0.1.19" tracing-futures = "0.2.4" uuid = { version = "0.8.1", features = ["v4"] } diff --git a/src/middleware.rs b/src/middleware.rs index 7477320af..37a8fb5c2 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,10 +1,12 @@ use crate::{DefaultRootSpanBuilder, RequestId, RootSpan, RootSpanBuilder}; +use actix_web::body::{BodySize, MessageBody}; use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; +use actix_web::web::Bytes; use actix_web::{Error, HttpMessage, ResponseError}; use std::future::{ready, Future, Ready}; use std::pin::Pin; +use std::task::{Context, Poll}; use tracing::Span; -use tracing_futures::Instrument; /// `TracingLogger` is a middleware to capture structured diagnostic when processing an HTTP request. /// Check the crate-level documentation for an in-depth introduction. @@ -17,7 +19,6 @@ use tracing_futures::Instrument; /// In this example we add a [`tracing::Subscriber`] to output structured logs to the console. /// /// ```rust -/// use actix_web::middleware::Logger; /// use actix_web::App; /// use tracing::{Subscriber, subscriber::set_global_default}; /// use tracing_actix_web::TracingLogger; @@ -58,8 +59,24 @@ use tracing_futures::Instrument; /// } /// ``` /// +/// Like [`actix-web`]'s [`Logger`], in order to use `TracingLogger` inside a Scope, Resource, or +/// Condition, the [`Compat`] middleware must be used. +/// +/// ```rust +/// use actix_web::middleware::Compat; +/// use actix_web::{web, App}; +/// use tracing_actix_web::TracingLogger; +/// +/// let app = App::new() +/// .service( +/// web::scope("/some/route") +/// .wrap(Compat::new(TracingLogger::default())), +/// ); +/// ``` +/// /// [`actix-web`]: https://docs.rs/actix-web -/// [`Logger`]: https://docs.rs/actix-web/3.0.2/actix_web/middleware/struct.Logger.html +/// [`Logger`]: https://docs.rs/actix-web/4.0.0-beta.9/actix_web/middleware/struct.Logger.html +/// [`Compat`]: https://docs.rs/actix-web/4.0.0-beta.9/actix_web/middleware/struct.Compat.html /// [`tracing`]: https://docs.rs/tracing pub struct TracingLogger { root_span_builder: std::marker::PhantomData, @@ -89,10 +106,10 @@ impl Transform for TracingLogger where S: Service, Error = Error>, S::Future: 'static, - B: 'static, + B: MessageBody + 'static, RootSpan: RootSpanBuilder, { - type Response = ServiceResponse; + type Response = ServiceResponse>; type Error = Error; type Transform = TracingLoggerMiddleware; type InitError = (); @@ -117,12 +134,12 @@ impl Service for TracingLoggerMiddleware, Error = Error>, S::Future: 'static, - B: 'static, + B: MessageBody + 'static, RootSpanType: RootSpanBuilder, { - type Response = ServiceResponse; + type Response = ServiceResponse>; type Error = Error; - type Future = Pin>>>; + type Future = TracingResponse; actix_web::dev::forward_ready!(service); @@ -134,9 +151,49 @@ where req.extensions_mut().insert(root_span_wrapper); let fut = root_span.in_scope(|| self.service.call(req)); - Box::pin( - async move { - let outcome = fut.await; + + TracingResponse { + fut, + span: root_span, + _root_span_type: std::marker::PhantomData, + } + } +} + +#[doc(hidden)] +#[pin_project::pin_project] +pub struct TracingResponse { + #[pin] + fut: F, + span: Span, + _root_span_type: std::marker::PhantomData, +} + +#[doc(hidden)] +#[pin_project::pin_project] +pub struct StreamSpan { + #[pin] + body: B, + span: Span, +} + +impl Future for TracingResponse +where + F: Future, Error>>, + B: MessageBody + 'static, + RootSpanType: RootSpanBuilder, +{ + type Output = Result>, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let fut = this.fut; + let span = this.span; + + span.in_scope(|| match fut.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(outcome) => { RootSpanType::on_request_end(Span::current(), &outcome); #[cfg(feature = "emit_event_on_error")] @@ -144,10 +201,36 @@ where emit_event_on_error(&outcome); } - outcome + Poll::Ready(outcome.map(|service_response| { + service_response.map_body(|_, body| StreamSpan { + body, + span: span.clone(), + }) + })) } - .instrument(root_span), - ) + }) + } +} + +impl MessageBody for StreamSpan +where + B: MessageBody, +{ + type Error = B::Error; + + fn size(&self) -> BodySize { + self.body.size() + } + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.project(); + + let body = this.body; + let span = this.span; + span.in_scope(|| body.poll_next(cx)) } }