1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-18 04:35:32 +02:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Nikolay Kim
dbfa13d6be Fixed unsoundness in .and_then()/.then() service combinators 2020-01-16 16:58:11 -08:00
Nikolay Kim
e7c2439543 prep release 2020-01-15 13:35:07 -08:00
Nikolay Kim
3116db5168 revert 1.0.3 changes 2020-01-15 13:24:38 -08:00
Nikolay Kim
5940731ef0 Fix actix-service 1.0.3 compatibility 2020-01-15 11:58:06 -08:00
Rajasekharan Vengalil
aed5fecc8a Add support for tokio tracing for actix Service. (#86)
* Add support for tokio tracing for actix Service.

* Address comments

* Change trace's return type to ApplyTransform

* Remove redundant type args

* Remove reference to MakeSpan from docs
2020-01-15 11:43:52 -08:00
Nikolay Kim
a751899aad Fixed unsoundness in AndThenService impl #83 2020-01-15 11:40:15 -08:00
Nikolay Kim
fa800aeba3 Fix AsRef<str> impl 2020-01-14 15:06:02 -08:00
20 changed files with 490 additions and 122 deletions

View File

@@ -10,6 +10,7 @@ members = [
"actix-testing",
"actix-threadpool",
"actix-tls",
"actix-tracing",
"actix-utils",
"router",
"string",
@@ -26,6 +27,7 @@ actix-service = { path = "actix-service" }
actix-testing = { path = "actix-testing" }
actix-threadpool = { path = "actix-threadpool" }
actix-tls = { path = "actix-tls" }
actix-tracing = { path = "actix-tracing" }
actix-utils = { path = "actix-utils" }
actix-router = { path = "router" }
bytestring = { path = "string" }

View File

@@ -1,5 +1,9 @@
# Changes
## [1.0.2] - 2020-01-15
* Fix actix-service 1.0.3 compatibility
## [1.0.1] - 2019-12-15
* Fix trust-dns-resolver compilation

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-connect"
version = "1.0.1"
version = "1.0.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix connect - tcp connector service"
keywords = ["network", "framework", "async", "futures"]
@@ -10,7 +10,6 @@ documentation = "https://docs.rs/actix-connect/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
edition = "2018"
workspace = ".."
[package.metadata.docs.rs]
features = ["openssl", "rustls", "uri"]
@@ -32,12 +31,12 @@ rustls = ["rust-tls", "tokio-rustls", "webpki"]
uri = ["http"]
[dependencies]
actix-service = "1.0.0"
actix-service = "1.0.3"
actix-codec = "0.2.0"
actix-utils = "1.0.3"
actix-utils = "1.0.6"
actix-rt = "1.0.0"
derive_more = "0.99.2"
either = "1.5.2"
either = "1.5.3"
futures = "0.3.1"
http = { version = "0.2.0", optional = true }
log = "0.4"

View File

@@ -72,7 +72,7 @@ pub fn start_default_resolver() -> AsyncResolver {
}
/// Create tcp connector service
pub fn new_connector<T: Address>(
pub fn new_connector<T: Address + 'static>(
resolver: AsyncResolver,
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone {
@@ -80,7 +80,7 @@ pub fn new_connector<T: Address>(
}
/// Create tcp connector service
pub fn new_connector_factory<T: Address>(
pub fn new_connector_factory<T: Address + 'static>(
resolver: AsyncResolver,
) -> impl ServiceFactory<
Config = (),
@@ -93,14 +93,14 @@ pub fn new_connector_factory<T: Address>(
}
/// Create connector service with default parameters
pub fn default_connector<T: Address>(
pub fn default_connector<T: Address + 'static>(
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone {
pipeline(Resolver::default()).and_then(TcpConnector::new())
}
/// Create connector service factory with default parameters
pub fn default_connector_factory<T: Address>() -> impl ServiceFactory<
pub fn default_connector_factory<T: Address + 'static>() -> impl ServiceFactory<
Config = (),
Request = Connect<T>,
Response = Connection<T, TcpStream>,

View File

@@ -1,5 +1,23 @@
# Changes
## [1.0.5] - 2020-01-16
### Fixed
* Fixed unsoundness in .and_then()/.then() service combinators
## [1.0.4] - 2020-01-15
### Fixed
* Revert 1.0.3 change
## [1.0.3] - 2020-01-15
### Fixed
* Fixed unsoundness in `AndThenService` impl
## [1.0.2] - 2020-01-08
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-service"
version = "1.0.2"
version = "1.0.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix service"
keywords = ["network", "framework", "async", "futures"]
@@ -13,7 +13,6 @@ edition = "2018"
[badges]
travis-ci = { repository = "actix/actix-service", branch = "master" }
appveyor = { repository = "actix/actix-net" }
codecov = { repository = "actix/actix-service", branch = "master", service = "github" }
[lib]

View File

@@ -1,5 +1,6 @@
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
@@ -9,7 +10,7 @@ use crate::cell::Cell;
/// of another service which completes successfully.
///
/// This is created by the `ServiceExt::and_then` method.
pub struct AndThenService<A, B>(Cell<(A, B)>);
pub(crate) struct AndThenService<A, B>(Cell<(A, B)>);
impl<A, B> AndThenService<A, B> {
/// Create new `AndThen` combinator
@@ -40,7 +41,6 @@ where
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
@@ -57,7 +57,7 @@ where
}
#[pin_project::pin_project]
pub struct AndThenServiceResponse<A, B>
pub(crate) struct AndThenServiceResponse<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
@@ -110,7 +110,7 @@ where
}
/// `.and_then()` service factory combinator
pub struct AndThenServiceFactory<A, B>
pub(crate) struct AndThenServiceFactory<A, B>
where
A: ServiceFactory,
A::Config: Clone,
@@ -121,8 +121,7 @@ where
InitError = A::InitError,
>,
{
a: A,
b: B,
inner: Rc<(A, B)>,
}
impl<A, B> AndThenServiceFactory<A, B>
@@ -138,7 +137,9 @@ where
{
/// Create new `AndThenFactory` combinator
pub(crate) fn new(a: A, b: B) -> Self {
Self { a, b }
Self {
inner: Rc::new((a, b)),
}
}
}
@@ -163,34 +164,34 @@ where
type Future = AndThenServiceFactoryResponse<A, B>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
let inner = &*self.inner;
AndThenServiceFactoryResponse::new(
self.a.new_service(cfg.clone()),
self.b.new_service(cfg),
inner.0.new_service(cfg.clone()),
inner.1.new_service(cfg),
)
}
}
impl<A, B> Clone for AndThenServiceFactory<A, B>
where
A: ServiceFactory + Clone,
A: ServiceFactory,
A::Config: Clone,
B: ServiceFactory<
Config = A::Config,
Request = A::Response,
Error = A::Error,
InitError = A::InitError,
> + Clone,
Config = A::Config,
Request = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
inner: self.inner.clone(),
}
}
}
#[pin_project::pin_project]
pub struct AndThenServiceFactoryResponse<A, B>
pub(crate) struct AndThenServiceFactoryResponse<A, B>
where
A: ServiceFactory,
B: ServiceFactory<Request = A::Response>,

View File

@@ -1,13 +1,14 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use crate::cell::Cell;
use crate::{Service, ServiceFactory};
/// `Apply` service combinator
pub struct AndThenApplyFn<A, B, F, Fut, Res, Err>
pub(crate) struct AndThenApplyFn<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
@@ -15,8 +16,7 @@ where
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>,
{
a: A,
b: Cell<(B, F)>,
srv: Cell<(A, B, F)>,
r: PhantomData<(Fut, Res, Err)>,
}
@@ -31,8 +31,7 @@ where
/// Create new `Apply` combinator
pub(crate) fn new(a: A, b: B, f: F) -> Self {
Self {
a,
b: Cell::new((b, f)),
srv: Cell::new((a, b, f)),
r: PhantomData,
}
}
@@ -40,7 +39,7 @@ where
impl<A, B, F, Fut, Res, Err> Clone for AndThenApplyFn<A, B, F, Fut, Res, Err>
where
A: Service + Clone,
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
@@ -48,8 +47,7 @@ where
{
fn clone(&self) -> Self {
AndThenApplyFn {
a: self.a.clone(),
b: self.b.clone(),
srv: self.srv.clone(),
r: PhantomData,
}
}
@@ -69,8 +67,9 @@ where
type Future = AndThenApplyFnFuture<A, B, F, Fut, Res, Err>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = self.a.poll_ready(cx)?.is_pending();
if self.b.get_mut().0.poll_ready(cx)?.is_pending() || not_ready {
let inner = self.srv.get_mut();
let not_ready = inner.0.poll_ready(cx)?.is_pending();
if inner.1.poll_ready(cx)?.is_pending() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
@@ -78,14 +77,15 @@ where
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.srv.get_mut().0.call(req);
AndThenApplyFnFuture {
state: State::A(self.a.call(req), Some(self.b.clone())),
state: State::A(fut, Some(self.srv.clone())),
}
}
}
#[pin_project::pin_project]
pub struct AndThenApplyFnFuture<A, B, F, Fut, Res, Err>
pub(crate) struct AndThenApplyFnFuture<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
@@ -108,7 +108,7 @@ where
Err: From<A::Error>,
Err: From<B::Error>,
{
A(#[pin] A::Future, Option<Cell<(B, F)>>),
A(#[pin] A::Future, Option<Cell<(A, B, F)>>),
B(#[pin] Fut),
Empty,
}
@@ -134,7 +134,7 @@ where
let mut b = b.take().unwrap();
this.state.set(State::Empty);
let b = b.get_mut();
let fut = (&mut b.1)(res, &mut b.0);
let fut = (&mut b.2)(res, &mut b.1);
this.state.set(State::B(fut));
self.poll(cx)
}
@@ -150,10 +150,8 @@ where
}
/// `AndThenApplyFn` service factory
pub struct AndThenApplyFnFactory<A, B, F, Fut, Res, Err> {
a: A,
b: B,
f: F,
pub(crate) struct AndThenApplyFnFactory<A, B, F, Fut, Res, Err> {
srv: Rc<(A, B, F)>,
r: PhantomData<(Fut, Res, Err)>,
}
@@ -168,25 +166,16 @@ where
/// Create new `ApplyNewService` new service instance
pub(crate) fn new(a: A, b: B, f: F) -> Self {
Self {
a: a,
b: b,
f: f,
srv: Rc::new((a, b, f)),
r: PhantomData,
}
}
}
impl<A, B, F, Fut, Res, Err> Clone for AndThenApplyFnFactory<A, B, F, Fut, Res, Err>
where
A: Clone,
B: Clone,
F: Clone,
{
impl<A, B, F, Fut, Res, Err> Clone for AndThenApplyFnFactory<A, B, F, Fut, Res, Err> {
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
f: self.f.clone(),
srv: self.srv.clone(),
r: PhantomData,
}
}
@@ -210,18 +199,19 @@ where
type Future = AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
let srv = &*self.srv;
AndThenApplyFnFactoryResponse {
a: None,
b: None,
f: self.f.clone(),
fut_a: self.a.new_service(cfg.clone()),
fut_b: self.b.new_service(cfg),
f: srv.2.clone(),
fut_a: srv.0.new_service(cfg.clone()),
fut_b: srv.1.new_service(cfg),
}
}
}
#[pin_project::pin_project]
pub struct AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err>
pub(crate) struct AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err>
where
A: ServiceFactory,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
@@ -267,8 +257,11 @@ where
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(AndThenApplyFn {
a: this.a.take().unwrap(),
b: Cell::new((this.b.take().unwrap(), this.f.clone())),
srv: Cell::new((
this.a.take().unwrap(),
this.b.take().unwrap(),
this.f.clone(),
)),
r: PhantomData,
}))
} else {

View File

@@ -7,7 +7,18 @@ use crate::cell::Cell;
use crate::{Service, ServiceFactory};
/// Convert `Fn(Config, &mut Service1) -> Future<Service2>` fn to a service factory
pub fn apply_cfg<F, C, T, R, S, E>(srv: T, f: F) -> ApplyConfigService<F, C, T, R, S, E>
pub fn apply_cfg<F, C, T, R, S, E>(
srv: T,
f: F,
) -> impl ServiceFactory<
Config = C,
Request = S::Request,
Response = S::Response,
Error = S::Error,
Service = S,
InitError = E,
Future = R,
> + Clone
where
F: FnMut(C, &mut T) -> R,
T: Service,
@@ -26,7 +37,14 @@ where
pub fn apply_cfg_factory<F, C, T, R, S>(
factory: T,
f: F,
) -> ApplyConfigServiceFactory<F, C, T, R, S>
) -> impl ServiceFactory<
Config = C,
Request = S::Request,
Response = S::Response,
Error = S::Error,
Service = S,
InitError = T::InitError,
> + Clone
where
F: FnMut(C, &mut T::Service) -> R,
T: ServiceFactory<Config = ()>,
@@ -41,7 +59,7 @@ where
}
/// Convert `Fn(Config, &mut Server) -> Future<Service>` fn to NewService\
pub struct ApplyConfigService<F, C, T, R, S, E>
struct ApplyConfigService<F, C, T, R, S, E>
where
F: FnMut(C, &mut T) -> R,
T: Service,
@@ -92,7 +110,7 @@ where
}
/// Convert `Fn(&Config) -> Future<Service>` fn to NewService
pub struct ApplyConfigServiceFactory<F, C, T, R, S>
struct ApplyConfigServiceFactory<F, C, T, R, S>
where
F: FnMut(C, &mut T::Service) -> R,
T: ServiceFactory<Config = ()>,
@@ -145,7 +163,7 @@ where
}
#[pin_project::pin_project]
pub struct ApplyConfigServiceFactoryResponse<F, C, T, R, S>
struct ApplyConfigServiceFactoryResponse<F, C, T, R, S>
where
F: FnMut(C, &mut T::Service) -> R,
T: ServiceFactory<Config = ()>,

View File

@@ -361,10 +361,7 @@ where
}
pub mod dev {
pub use crate::and_then::{AndThenService, AndThenServiceFactory};
pub use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory};
pub use crate::apply::{Apply, ApplyServiceFactory};
pub use crate::apply_cfg::{ApplyConfigService, ApplyConfigServiceFactory};
pub use crate::fn_service::{
FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig,
};
@@ -372,7 +369,6 @@ pub mod dev {
pub use crate::map_config::{MapConfig, UnitConfig};
pub use crate::map_err::{MapErr, MapErrServiceFactory};
pub use crate::map_init_err::MapInitErr;
pub use crate::then::{ThenService, ThenServiceFactory};
pub use crate::transform::ApplyTransform;
pub use crate::transform_err::TransformMapInitErr;
}

View File

@@ -46,7 +46,12 @@ impl<T: Service> Pipeline<T> {
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
pub fn and_then<F, U>(self, service: F) -> Pipeline<AndThenService<T, U>>
pub fn and_then<F, U>(
self,
service: F,
) -> Pipeline<
impl Service<Request = T::Request, Response = U::Response, Error = T::Error> + Clone,
>
where
Self: Sized,
F: IntoService<U>,
@@ -65,7 +70,7 @@ impl<T: Service> Pipeline<T> {
self,
service: I,
f: F,
) -> Pipeline<AndThenApplyFn<T, U, F, Fut, Res, Err>>
) -> Pipeline<impl Service<Request = T::Request, Response = Res, Error = Err> + Clone>
where
Self: Sized,
I: IntoService<U>,
@@ -84,7 +89,12 @@ impl<T: Service> Pipeline<T> {
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<F, U>(self, service: F) -> Pipeline<ThenService<T, U>>
pub fn then<F, U>(
self,
service: F,
) -> Pipeline<
impl Service<Request = T::Request, Response = U::Response, Error = T::Error> + Clone,
>
where
Self: Sized,
F: IntoService<U>,
@@ -168,7 +178,23 @@ pub struct PipelineFactory<T> {
impl<T: ServiceFactory> PipelineFactory<T> {
/// Call another service after call to this one has resolved successfully.
pub fn and_then<F, U>(self, factory: F) -> PipelineFactory<AndThenServiceFactory<T, U>>
pub fn and_then<F, U>(
self,
factory: F,
) -> PipelineFactory<
impl ServiceFactory<
Request = T::Request,
Response = U::Response,
Error = T::Error,
Config = T::Config,
InitError = T::InitError,
Service = impl Service<
Request = T::Request,
Response = U::Response,
Error = T::Error,
> + Clone,
> + Clone,
>
where
Self: Sized,
T::Config: Clone,
@@ -193,7 +219,16 @@ impl<T: ServiceFactory> PipelineFactory<T> {
self,
factory: I,
f: F,
) -> PipelineFactory<AndThenApplyFnFactory<T, U, F, Fut, Res, Err>>
) -> PipelineFactory<
impl ServiceFactory<
Request = T::Request,
Response = Res,
Error = Err,
Config = T::Config,
InitError = T::InitError,
Service = impl Service<Request = T::Request, Response = Res, Error = Err> + Clone,
> + Clone,
>
where
Self: Sized,
T::Config: Clone,
@@ -214,7 +249,23 @@ impl<T: ServiceFactory> PipelineFactory<T> {
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<F, U>(self, factory: F) -> PipelineFactory<ThenServiceFactory<T, U>>
pub fn then<F, U>(
self,
factory: F,
) -> PipelineFactory<
impl ServiceFactory<
Request = T::Request,
Response = U::Response,
Error = T::Error,
Config = T::Config,
InitError = T::InitError,
Service = impl Service<
Request = T::Request,
Response = U::Response,
Error = T::Error,
> + Clone,
> + Clone,
>
where
Self: Sized,
T::Config: Clone,

View File

@@ -1,5 +1,6 @@
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
@@ -8,11 +9,8 @@ use crate::cell::Cell;
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `ServiceExt::then` method.
pub struct ThenService<A, B> {
a: A,
b: Cell<B>,
}
/// This is created by the `Pipeline::then` method.
pub(crate) struct ThenService<A, B>(Cell<(A, B)>);
impl<A, B> ThenService<A, B> {
/// Create new `.then()` combinator
@@ -21,19 +19,13 @@ impl<A, B> ThenService<A, B> {
A: Service,
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error>,
{
Self { a, b: Cell::new(b) }
Self(Cell::new((a, b)))
}
}
impl<A, B> Clone for ThenService<A, B>
where
A: Clone,
{
impl<A, B> Clone for ThenService<A, B> {
fn clone(&self) -> Self {
ThenService {
a: self.a.clone(),
b: self.b.clone(),
}
ThenService(self.0.clone())
}
}
@@ -47,9 +39,10 @@ where
type Error = B::Error;
type Future = ThenServiceResponse<A, B>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.a.poll_ready(ctx)?.is_ready();
if !self.b.get_mut().poll_ready(ctx)?.is_ready() || not_ready {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
@@ -58,13 +51,13 @@ where
fn call(&mut self, req: A::Request) -> Self::Future {
ThenServiceResponse {
state: State::A(self.a.call(req), Some(self.b.clone())),
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),
}
}
}
#[pin_project::pin_project]
pub struct ThenServiceResponse<A, B>
pub(crate) struct ThenServiceResponse<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
@@ -79,7 +72,7 @@ where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
A(#[pin] A::Future, Option<Cell<B>>),
A(#[pin] A::Future, Option<Cell<(A, B)>>),
B(#[pin] B::Future),
Empty,
}
@@ -101,7 +94,7 @@ where
Poll::Ready(res) => {
let mut b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.get_mut().call(res);
let fut = b.get_mut().1.call(res);
this.state.set(State::B(fut));
self.poll(cx)
}
@@ -117,10 +110,7 @@ where
}
/// `.then()` service factory combinator
pub struct ThenServiceFactory<A, B> {
a: A,
b: B,
}
pub(crate) struct ThenServiceFactory<A, B>(Rc<(A, B)>);
impl<A, B> ThenServiceFactory<A, B>
where
@@ -135,7 +125,7 @@ where
{
/// Create new `AndThen` combinator
pub(crate) fn new(a: A, b: B) -> Self {
Self { a, b }
Self(Rc::new((a, b)))
}
}
@@ -160,28 +150,19 @@ where
type Future = ThenServiceFactoryResponse<A, B>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
ThenServiceFactoryResponse::new(
self.a.new_service(cfg.clone()),
self.b.new_service(cfg),
)
let srv = &*self.0;
ThenServiceFactoryResponse::new(srv.0.new_service(cfg.clone()), srv.1.new_service(cfg))
}
}
impl<A, B> Clone for ThenServiceFactory<A, B>
where
A: Clone,
B: Clone,
{
impl<A, B> Clone for ThenServiceFactory<A, B> {
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
}
Self(self.0.clone())
}
}
#[pin_project::pin_project]
pub struct ThenServiceFactoryResponse<A, B>
pub(crate) struct ThenServiceFactoryResponse<A, B>
where
A: ServiceFactory,
B: ServiceFactory<

5
actix-tracing/CHANGES.md Normal file
View File

@@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2020-01-15
* Initial release

26
actix-tracing/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "actix-tracing"
version = "0.1.0"
authors = ["Rajasekharan Vengalil <avranju@gmail.com>"]
description = "Support for tokio tracing with Actix services"
keywords = ["network", "framework", "tracing"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tracing/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
edition = "2018"
[lib]
name = "actix_tracing"
path = "src/lib.rs"
[dependencies]
actix-service = "1.0.4"
futures-util = "0.3.1"
tracing = "0.1"
tracing-futures = "0.2"
[dev_dependencies]
actix-rt = "1.0"
slab = "0.4"

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-tracing/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

261
actix-tracing/src/lib.rs Normal file
View File

@@ -0,0 +1,261 @@
//! Actix tracing - support for tokio tracing with Actix services.
#![deny(rust_2018_idioms, warnings)]
use std::marker::PhantomData;
use std::task::{Context, Poll};
use actix_service::{
apply, dev::ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform,
};
use futures_util::future::{ok, Either, Ready};
use tracing_futures::{Instrument, Instrumented};
/// A `Service` implementation that automatically enters/exits tracing spans
/// for the wrapped inner service.
#[derive(Clone)]
pub struct TracingService<S, F> {
inner: S,
make_span: F,
}
impl<S, F> TracingService<S, F> {
pub fn new(inner: S, make_span: F) -> Self {
TracingService { inner, make_span }
}
}
impl<S, F> Service for TracingService<S, F>
where
S: Service,
F: Fn(&S::Request) -> Option<tracing::Span>,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = Either<S::Future, Instrumented<S::Future>>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(ctx)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let span = (self.make_span)(&req);
let _enter = span.as_ref().map(|s| s.enter());
let fut = self.inner.call(req);
// make a child span to track the future's execution
if let Some(span) = span
.clone()
.map(|span| tracing::span!(parent: &span, tracing::Level::INFO, "future"))
{
Either::Right(fut.instrument(span))
} else {
Either::Left(fut)
}
}
}
/// A `Transform` implementation that wraps services with a [`TracingService`].
///
/// [`TracingService`]: struct.TracingService.html
pub struct TracingTransform<S, U, F> {
make_span: F,
_p: PhantomData<fn(S, U)>,
}
impl<S, U, F> TracingTransform<S, U, F> {
pub fn new(make_span: F) -> Self {
TracingTransform {
make_span,
_p: PhantomData,
}
}
}
impl<S, U, F> Transform<S> for TracingTransform<S, U, F>
where
S: Service,
U: ServiceFactory<
Request = S::Request,
Response = S::Response,
Error = S::Error,
Service = S,
>,
F: Fn(&S::Request) -> Option<tracing::Span> + Clone,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Transform = TracingService<S, F>;
type InitError = U::InitError;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(TracingService::new(service, self.make_span.clone()))
}
}
/// Wraps the provided service factory with a transform that automatically
/// enters/exits the given span.
///
/// The span to be entered/exited can be provided via a closure. The closure
/// is passed in a reference to the request being handled by the service.
///
/// For example:
/// ```rust,ignore
/// let traced_service = trace(
/// web_service,
/// |req: &Request| Some(span!(Level::INFO, "request", req.id))
/// );
/// ```
pub fn trace<S, U, F>(
service_factory: U,
make_span: F,
) -> ApplyTransform<TracingTransform<S::Service, S, F>, S>
where
S: ServiceFactory,
F: Fn(&S::Request) -> Option<tracing::Span> + Clone,
U: IntoServiceFactory<S>,
{
apply(
TracingTransform::new(make_span),
service_factory.into_factory(),
)
}
#[cfg(test)]
mod test {
use super::*;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, RwLock};
use actix_service::{fn_factory, fn_service};
use slab::Slab;
use tracing::{span, Event, Level, Metadata, Subscriber};
thread_local! {
static SPAN: RefCell<Vec<span::Id>> = RefCell::new(Vec::new());
}
#[derive(Default)]
struct Stats {
entered_spans: BTreeSet<u64>,
exited_spans: BTreeSet<u64>,
events_count: BTreeMap<u64, usize>,
}
#[derive(Default)]
struct Inner {
spans: Slab<&'static Metadata<'static>>,
stats: Stats,
}
#[derive(Clone, Default)]
struct TestSubscriber {
inner: Arc<RwLock<Inner>>,
}
impl Subscriber for TestSubscriber {
fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, span: &span::Attributes<'_>) -> span::Id {
let id = self.inner.write().unwrap().spans.insert(span.metadata());
span::Id::from_u64(id as u64 + 1)
}
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {}
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {}
fn event(&self, event: &Event<'_>) {
let id = event
.parent()
.cloned()
.or_else(|| SPAN.with(|current_span| current_span.borrow().last().cloned()))
.unwrap();
*self
.inner
.write()
.unwrap()
.stats
.events_count
.entry(id.into_u64())
.or_insert(0) += 1;
}
fn enter(&self, span: &span::Id) {
self.inner
.write()
.unwrap()
.stats
.entered_spans
.insert(span.into_u64());
SPAN.with(|current_span| {
current_span.borrow_mut().push(span.clone());
});
}
fn exit(&self, span: &span::Id) {
self.inner
.write()
.unwrap()
.stats
.exited_spans
.insert(span.into_u64());
// we are guaranteed that on any given thread, spans are exited in reverse order
SPAN.with(|current_span| {
let leaving = current_span
.borrow_mut()
.pop()
.expect("told to exit span when not in span");
assert_eq!(
&leaving, span,
"told to exit span that was not most recently entered"
);
});
}
}
#[actix_rt::test]
async fn service_call() {
let service_factory = fn_factory(|| {
ok::<_, ()>(fn_service(|req: &'static str| {
tracing::event!(Level::TRACE, "It's happening - {}!", req);
ok::<_, ()>(())
}))
});
let subscriber = TestSubscriber::default();
let _guard = tracing::subscriber::set_default(subscriber.clone());
let span_svc = span!(Level::TRACE, "span_svc");
let trace_service_factory = trace(service_factory, |_: &&str| Some(span_svc.clone()));
let mut service = trace_service_factory.new_service(()).await.unwrap();
service.call("boo").await.unwrap();
let id = span_svc.id().unwrap().into_u64();
assert!(subscriber
.inner
.read()
.unwrap()
.stats
.entered_spans
.contains(&id));
assert!(subscriber
.inner
.read()
.unwrap()
.stats
.exited_spans
.contains(&id));
assert_eq!(subscriber.inner.read().unwrap().stats.events_count[&id], 1);
}
}

View File

@@ -1,5 +1,9 @@
# Changes
## [0.1.4] - 2020-01-14
* Fix `AsRef<str>` impl
## [0.1.3] - 2020-01-13
* Add `PartialEq<T: AsRef<str>>`, `AsRef<[u8]>` impls

View File

@@ -1,6 +1,6 @@
[package]
name = "bytestring"
version = "0.1.3"
version = "0.1.4"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "A UTF-8 encoded string with Bytes as a storage"
keywords = ["actix"]

View File

@@ -7,7 +7,7 @@ use bytes::Bytes;
/// A utf-8 encoded string with [`Bytes`] as a storage.
///
/// [`Bytes`]: https://docs.rs/bytes/0.5.3/bytes/struct.Bytes.html
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Default)]
#[derive(Clone, Eq, Ord, PartialOrd, Default)]
pub struct ByteString(Bytes);
impl ByteString {
@@ -55,6 +55,12 @@ impl AsRef<[u8]> for ByteString {
}
}
impl AsRef<str> for ByteString {
fn as_ref(&self) -> &str {
&*self
}
}
impl hash::Hash for ByteString {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
(**self).hash(state);
@@ -187,6 +193,8 @@ mod test {
fn test_from_string() {
let s: ByteString = "hello".to_string().into();
assert_eq!(&s, "hello");
let t: &str = s.as_ref();
assert_eq!(t, "hello");
}
#[test]