mirror of
https://github.com/actix/actix-extras.git
synced 2025-02-22 18:33:18 +01:00
Hold root span across polls in streamed body (#40)
* Hold root span across polls in streamed body * Satisfy clippy * Remove Unpin bound * Add documentation about Compat middlware * Don't use fully qualified MessageBody * Satisfy clippy
This commit is contained in:
parent
dd57aa7157
commit
e6c90a1729
@ -28,6 +28,7 @@ emit_event_on_error = []
|
||||
|
||||
[dependencies]
|
||||
actix-web = { version = "=4.0.0-beta.9", default-features = false }
|
||||
pin-project = "1.0.0"
|
||||
tracing = "0.1.19"
|
||||
tracing-futures = "0.2.4"
|
||||
uuid = { version = "0.8.1", features = ["v4"] }
|
||||
|
@ -1,10 +1,12 @@
|
||||
use crate::{DefaultRootSpanBuilder, RequestId, RootSpan, RootSpanBuilder};
|
||||
use actix_web::body::{BodySize, MessageBody};
|
||||
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
|
||||
use actix_web::web::Bytes;
|
||||
use actix_web::{Error, HttpMessage, ResponseError};
|
||||
use std::future::{ready, Future, Ready};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tracing::Span;
|
||||
use tracing_futures::Instrument;
|
||||
|
||||
/// `TracingLogger` is a middleware to capture structured diagnostic when processing an HTTP request.
|
||||
/// Check the crate-level documentation for an in-depth introduction.
|
||||
@ -17,7 +19,6 @@ use tracing_futures::Instrument;
|
||||
/// In this example we add a [`tracing::Subscriber`] to output structured logs to the console.
|
||||
///
|
||||
/// ```rust
|
||||
/// use actix_web::middleware::Logger;
|
||||
/// use actix_web::App;
|
||||
/// use tracing::{Subscriber, subscriber::set_global_default};
|
||||
/// use tracing_actix_web::TracingLogger;
|
||||
@ -58,8 +59,24 @@ use tracing_futures::Instrument;
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// Like [`actix-web`]'s [`Logger`], in order to use `TracingLogger` inside a Scope, Resource, or
|
||||
/// Condition, the [`Compat`] middleware must be used.
|
||||
///
|
||||
/// ```rust
|
||||
/// use actix_web::middleware::Compat;
|
||||
/// use actix_web::{web, App};
|
||||
/// use tracing_actix_web::TracingLogger;
|
||||
///
|
||||
/// let app = App::new()
|
||||
/// .service(
|
||||
/// web::scope("/some/route")
|
||||
/// .wrap(Compat::new(TracingLogger::default())),
|
||||
/// );
|
||||
/// ```
|
||||
///
|
||||
/// [`actix-web`]: https://docs.rs/actix-web
|
||||
/// [`Logger`]: https://docs.rs/actix-web/3.0.2/actix_web/middleware/struct.Logger.html
|
||||
/// [`Logger`]: https://docs.rs/actix-web/4.0.0-beta.9/actix_web/middleware/struct.Logger.html
|
||||
/// [`Compat`]: https://docs.rs/actix-web/4.0.0-beta.9/actix_web/middleware/struct.Compat.html
|
||||
/// [`tracing`]: https://docs.rs/tracing
|
||||
pub struct TracingLogger<RootSpan: RootSpanBuilder> {
|
||||
root_span_builder: std::marker::PhantomData<RootSpan>,
|
||||
@ -89,10 +106,10 @@ impl<S, B, RootSpan> Transform<S, ServiceRequest> for TracingLogger<RootSpan>
|
||||
where
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||
S::Future: 'static,
|
||||
B: 'static,
|
||||
B: MessageBody + 'static,
|
||||
RootSpan: RootSpanBuilder,
|
||||
{
|
||||
type Response = ServiceResponse<B>;
|
||||
type Response = ServiceResponse<StreamSpan<B>>;
|
||||
type Error = Error;
|
||||
type Transform = TracingLoggerMiddleware<S, RootSpan>;
|
||||
type InitError = ();
|
||||
@ -117,12 +134,12 @@ impl<S, B, RootSpanType> Service<ServiceRequest> for TracingLoggerMiddleware<S,
|
||||
where
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||
S::Future: 'static,
|
||||
B: 'static,
|
||||
B: MessageBody + 'static,
|
||||
RootSpanType: RootSpanBuilder,
|
||||
{
|
||||
type Response = ServiceResponse<B>;
|
||||
type Response = ServiceResponse<StreamSpan<B>>;
|
||||
type Error = Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
|
||||
type Future = TracingResponse<S::Future, RootSpanType>;
|
||||
|
||||
actix_web::dev::forward_ready!(service);
|
||||
|
||||
@ -134,9 +151,49 @@ where
|
||||
req.extensions_mut().insert(root_span_wrapper);
|
||||
|
||||
let fut = root_span.in_scope(|| self.service.call(req));
|
||||
Box::pin(
|
||||
async move {
|
||||
let outcome = fut.await;
|
||||
|
||||
TracingResponse {
|
||||
fut,
|
||||
span: root_span,
|
||||
_root_span_type: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct TracingResponse<F, RootSpanType> {
|
||||
#[pin]
|
||||
fut: F,
|
||||
span: Span,
|
||||
_root_span_type: std::marker::PhantomData<RootSpanType>,
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct StreamSpan<B> {
|
||||
#[pin]
|
||||
body: B,
|
||||
span: Span,
|
||||
}
|
||||
|
||||
impl<F, B, RootSpanType> Future for TracingResponse<F, RootSpanType>
|
||||
where
|
||||
F: Future<Output = Result<ServiceResponse<B>, Error>>,
|
||||
B: MessageBody + 'static,
|
||||
RootSpanType: RootSpanBuilder,
|
||||
{
|
||||
type Output = Result<ServiceResponse<StreamSpan<B>>, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
let fut = this.fut;
|
||||
let span = this.span;
|
||||
|
||||
span.in_scope(|| match fut.poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(outcome) => {
|
||||
RootSpanType::on_request_end(Span::current(), &outcome);
|
||||
|
||||
#[cfg(feature = "emit_event_on_error")]
|
||||
@ -144,10 +201,36 @@ where
|
||||
emit_event_on_error(&outcome);
|
||||
}
|
||||
|
||||
outcome
|
||||
Poll::Ready(outcome.map(|service_response| {
|
||||
service_response.map_body(|_, body| StreamSpan {
|
||||
body,
|
||||
span: span.clone(),
|
||||
})
|
||||
}))
|
||||
}
|
||||
.instrument(root_span),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> MessageBody for StreamSpan<B>
|
||||
where
|
||||
B: MessageBody,
|
||||
{
|
||||
type Error = B::Error;
|
||||
|
||||
fn size(&self) -> BodySize {
|
||||
self.body.size()
|
||||
}
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||
let this = self.project();
|
||||
|
||||
let body = this.body;
|
||||
let span = this.span;
|
||||
span.in_scope(|| body.poll_next(cx))
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user