From aed5fecc8a0b994cc871483bcf15b3c5fbb350df Mon Sep 17 00:00:00 2001 From: Rajasekharan Vengalil Date: Wed, 15 Jan 2020 11:43:53 -0800 Subject: [PATCH] Add support for tokio tracing for actix Service. (#86) * Add support for tokio tracing for actix Service. * Address comments * Change trace's return type to ApplyTransform * Remove redundant type args * Remove reference to MakeSpan from docs --- Cargo.toml | 1 + actix-tracing/Cargo.toml | 27 ++++ actix-tracing/src/lib.rs | 261 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 289 insertions(+) create mode 100644 actix-tracing/Cargo.toml create mode 100644 actix-tracing/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1557e0f2..1e6643da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "actix-testing", "actix-threadpool", "actix-tls", + "actix-tracing", "actix-utils", "router", "string", diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml new file mode 100644 index 00000000..60927c12 --- /dev/null +++ b/actix-tracing/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "actix-tracing" +version = "0.1.0" +authors = ["Rajasekharan Vengalil "] +description = "Support for tokio tracing with Actix services" +keywords = ["network", "framework", "tracing"] +homepage = "https://actix.rs" +repository = "https://github.com/actix/actix-net.git" +documentation = "https://docs.rs/actix-tracing/" +categories = ["network-programming", "asynchronous"] +license = "MIT/Apache-2.0" +edition = "2018" +workspace = ".." + +[lib] +name = "actix_tracing" +path = "src/lib.rs" + +[dependencies] +actix-service = "1.0" +futures-util = "0.3" +tracing = "0.1" +tracing-futures = "0.2" + +[dev_dependencies] +actix-rt = "1.0" +slab = "0.4" \ No newline at end of file diff --git a/actix-tracing/src/lib.rs b/actix-tracing/src/lib.rs new file mode 100644 index 00000000..35c85286 --- /dev/null +++ b/actix-tracing/src/lib.rs @@ -0,0 +1,261 @@ +//! Actix tracing - support for tokio tracing with Actix services. +#![deny(rust_2018_idioms, warnings)] + +use std::marker::PhantomData; +use std::task::{Context, Poll}; + +use actix_service::{ + apply, dev::ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform, +}; +use futures_util::future::{ok, Either, Ready}; +use tracing_futures::{Instrument, Instrumented}; + +/// A `Service` implementation that automatically enters/exits tracing spans +/// for the wrapped inner service. +#[derive(Clone)] +pub struct TracingService { + inner: S, + make_span: F, +} + +impl TracingService { + pub fn new(inner: S, make_span: F) -> Self { + TracingService { inner, make_span } + } +} + +impl Service for TracingService +where + S: Service, + F: Fn(&S::Request) -> Option, +{ + type Request = S::Request; + type Response = S::Response; + type Error = S::Error; + type Future = Either>; + + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(ctx) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + let span = (self.make_span)(&req); + let _enter = span.as_ref().map(|s| s.enter()); + + let fut = self.inner.call(req); + + // make a child span to track the future's execution + if let Some(span) = span + .clone() + .map(|span| tracing::span!(parent: &span, tracing::Level::INFO, "future")) + { + Either::Right(fut.instrument(span)) + } else { + Either::Left(fut) + } + } +} + +/// A `Transform` implementation that wraps services with a [`TracingService`]. +/// +/// [`TracingService`]: struct.TracingService.html +pub struct TracingTransform { + make_span: F, + _p: PhantomData, +} + +impl TracingTransform { + pub fn new(make_span: F) -> Self { + TracingTransform { + make_span, + _p: PhantomData, + } + } +} + +impl Transform for TracingTransform +where + S: Service, + U: ServiceFactory< + Request = S::Request, + Response = S::Response, + Error = S::Error, + Service = S, + >, + F: Fn(&S::Request) -> Option + Clone, +{ + type Request = S::Request; + type Response = S::Response; + type Error = S::Error; + type Transform = TracingService; + type InitError = U::InitError; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ok(TracingService::new(service, self.make_span.clone())) + } +} + +/// Wraps the provided service factory with a transform that automatically +/// enters/exits the given span. +/// +/// The span to be entered/exited can be provided via a closure. The closure +/// is passed in a reference to the request being handled by the service. +/// +/// For example: +/// ```rust,ignore +/// let traced_service = trace( +/// web_service, +/// |req: &Request| Some(span!(Level::INFO, "request", req.id)) +/// ); +/// ``` +pub fn trace( + service_factory: U, + make_span: F, +) -> ApplyTransform, S> +where + S: ServiceFactory, + F: Fn(&S::Request) -> Option + Clone, + U: IntoServiceFactory, +{ + apply( + TracingTransform::new(make_span), + service_factory.into_factory(), + ) +} + +#[cfg(test)] +mod test { + use super::*; + + use std::cell::RefCell; + use std::collections::{BTreeMap, BTreeSet}; + use std::sync::{Arc, RwLock}; + + use actix_service::{fn_factory, fn_service}; + use slab::Slab; + use tracing::{span, Event, Level, Metadata, Subscriber}; + + thread_local! { + static SPAN: RefCell> = RefCell::new(Vec::new()); + } + + #[derive(Default)] + struct Stats { + entered_spans: BTreeSet, + exited_spans: BTreeSet, + events_count: BTreeMap, + } + + #[derive(Default)] + struct Inner { + spans: Slab<&'static Metadata<'static>>, + stats: Stats, + } + + #[derive(Clone, Default)] + struct TestSubscriber { + inner: Arc>, + } + + impl Subscriber for TestSubscriber { + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + + fn new_span(&self, span: &span::Attributes<'_>) -> span::Id { + let id = self.inner.write().unwrap().spans.insert(span.metadata()); + span::Id::from_u64(id as u64 + 1) + } + + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + + fn event(&self, event: &Event<'_>) { + let id = event + .parent() + .cloned() + .or_else(|| SPAN.with(|current_span| current_span.borrow().last().cloned())) + .unwrap(); + + *self + .inner + .write() + .unwrap() + .stats + .events_count + .entry(id.into_u64()) + .or_insert(0) += 1; + } + + fn enter(&self, span: &span::Id) { + self.inner + .write() + .unwrap() + .stats + .entered_spans + .insert(span.into_u64()); + + SPAN.with(|current_span| { + current_span.borrow_mut().push(span.clone()); + }); + } + + fn exit(&self, span: &span::Id) { + self.inner + .write() + .unwrap() + .stats + .exited_spans + .insert(span.into_u64()); + + // we are guaranteed that on any given thread, spans are exited in reverse order + SPAN.with(|current_span| { + let leaving = current_span + .borrow_mut() + .pop() + .expect("told to exit span when not in span"); + assert_eq!( + &leaving, span, + "told to exit span that was not most recently entered" + ); + }); + } + } + + #[actix_rt::test] + async fn service_call() { + let service_factory = fn_factory(|| { + ok::<_, ()>(fn_service(|req: &'static str| { + tracing::event!(Level::TRACE, "It's happening - {}!", req); + ok::<_, ()>(()) + })) + }); + + let subscriber = TestSubscriber::default(); + let _guard = tracing::subscriber::set_default(subscriber.clone()); + + let span_svc = span!(Level::TRACE, "span_svc"); + let trace_service_factory = trace(service_factory, |_: &&str| Some(span_svc.clone())); + let mut service = trace_service_factory.new_service(()).await.unwrap(); + service.call("boo").await.unwrap(); + + let id = span_svc.id().unwrap().into_u64(); + assert!(subscriber + .inner + .read() + .unwrap() + .stats + .entered_spans + .contains(&id)); + assert!(subscriber + .inner + .read() + .unwrap() + .stats + .exited_spans + .contains(&id)); + assert_eq!(subscriber.inner.read().unwrap().stats.events_count[&id], 1); + } +}