1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 00:01:11 +01:00

remove pipeline from public api (#335)

This commit is contained in:
Rob Ede 2021-04-16 00:00:02 +01:00 committed by GitHub
parent 7a82288066
commit 47fba25d67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 85 additions and 35 deletions

View File

@ -9,15 +9,17 @@
//! Start typing. When you press enter the typed line will be echoed back. The server will log //! Start typing. When you press enter the typed line will be echoed back. The server will log
//! the length of each line it echos and the total size of data sent when the connection is closed. //! the length of each line it echos and the total size of data sent when the connection is closed.
use std::sync::{ use std::{
atomic::{AtomicUsize, Ordering}, env, io,
Arc, sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
}; };
use std::{env, io};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::pipeline_factory; use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::future::ok; use futures_util::future::ok;
use log::{error, info}; use log::{error, info};
@ -25,7 +27,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[actix_rt::main] #[actix_rt::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace"); env::set_var("RUST_LOG", "info");
env_logger::init(); env_logger::init();
let count = Arc::new(AtomicUsize::new(0)); let count = Arc::new(AtomicUsize::new(0));
@ -41,7 +43,7 @@ async fn main() -> io::Result<()> {
let count = Arc::clone(&count); let count = Arc::clone(&count);
let num2 = Arc::clone(&count); let num2 = Arc::clone(&count);
pipeline_factory(move |mut stream: TcpStream| { fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count); let count = Arc::clone(&count);
async move { async move {

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Removed pipeline and related structs/functions. [#335]
[#335]: https://github.com/actix/actix-net/pull/335
## 2.0.0-beta.5 - 2021-03-15 ## 2.0.0-beta.5 - 2021-03-15

View File

@ -11,11 +11,11 @@ use pin_project_lite::pin_project;
use super::{Service, ServiceFactory}; use super::{Service, ServiceFactory};
/// Service for the `and_then` combinator, chaining a computation onto the end /// Service for the `and_then` combinator, chaining a computation onto the end of another service
/// of another service which completes successfully. /// which completes successfully.
/// ///
/// This is created by the `Pipeline::and_then` method. /// This is created by the `Pipeline::and_then` method.
pub(crate) struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>); pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
impl<A, B, Req> AndThenService<A, B, Req> { impl<A, B, Req> AndThenService<A, B, Req> {
/// Create new `AndThen` combinator /// Create new `AndThen` combinator
@ -64,7 +64,7 @@ where
} }
pin_project! { pin_project! {
pub(crate) struct AndThenServiceResponse<A, B, Req> pub struct AndThenServiceResponse<A, B, Req>
where where
A: Service<Req>, A: Service<Req>,
B: Service<A::Response, Error = A::Error>, B: Service<A::Response, Error = A::Error>,
@ -117,7 +117,7 @@ where
} }
/// `.and_then()` service factory combinator /// `.and_then()` service factory combinator
pub(crate) struct AndThenServiceFactory<A, B, Req> pub struct AndThenServiceFactory<A, B, Req>
where where
A: ServiceFactory<Req>, A: ServiceFactory<Req>,
A::Config: Clone, A::Config: Clone,
@ -200,7 +200,7 @@ where
} }
pin_project! { pin_project! {
pub(crate) struct AndThenServiceFactoryResponse<A, B, Req> pub struct AndThenServiceFactoryResponse<A, B, Req>
where where
A: ServiceFactory<Req>, A: ServiceFactory<Req>,
B: ServiceFactory<A::Response>, B: ServiceFactory<A::Response>,
@ -272,7 +272,9 @@ mod tests {
use futures_util::future::lazy; use futures_util::future::lazy;
use crate::{ use crate::{
fn_factory, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory, fn_factory, ok,
pipeline::{pipeline, pipeline_factory},
ready, Ready, Service, ServiceFactory,
}; };
struct Srv1(Rc<Cell<usize>>); struct Srv1(Rc<Cell<usize>>);

View File

@ -214,7 +214,11 @@ mod tests {
use futures_util::future::lazy; use futures_util::future::lazy;
use super::*; use super::*;
use crate::{ok, pipeline, pipeline_factory, Ready, Service, ServiceFactory}; use crate::{
ok,
pipeline::{pipeline, pipeline_factory},
Ready, Service, ServiceFactory,
};
#[derive(Clone)] #[derive(Clone)]
struct Srv; struct Srv;

View File

@ -1,8 +1,12 @@
use crate::{ use crate::{
map::Map, map_err::MapErr, transform_err::TransformMapInitErr, Service, ServiceFactory, and_then::{AndThenService, AndThenServiceFactory},
Transform, map::Map,
map_err::MapErr,
transform_err::TransformMapInitErr,
IntoService, IntoServiceFactory, Service, ServiceFactory, Transform,
}; };
/// An extension trait for [`Service`]s that provides a variety of convenient adapters.
pub trait ServiceExt<Req>: Service<Req> { pub trait ServiceExt<Req>: Service<Req> {
/// Map this service's output to a different type, returning a new service /// Map this service's output to a different type, returning a new service
/// of the resulting type. /// of the resulting type.
@ -36,10 +40,27 @@ pub trait ServiceExt<Req>: Service<Req> {
{ {
MapErr::new(self, f) MapErr::new(self, f)
} }
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that the second service
/// isn't called until call to the fist service have finished. Result of the call to the first
/// service is used as an input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a wrapped version of it.
fn and_then<I, S1>(self, service: I) -> AndThenService<Self, S1, Req>
where
Self: Sized,
I: IntoService<S1, Self::Response>,
S1: Service<Self::Response, Error = Self::Error>,
{
AndThenService::new(self, service.into_service())
}
} }
impl<S, Req> ServiceExt<Req> for S where S: Service<Req> {} impl<S, Req> ServiceExt<Req> for S where S: Service<Req> {}
/// An extension trait for [`ServiceFactory`]s that provides a variety of convenient adapters.
pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> { pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
/// Map this service's output to a different type, returning a new service /// Map this service's output to a different type, returning a new service
/// of the resulting type. /// of the resulting type.
@ -68,10 +89,27 @@ pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
{ {
crate::map_init_err::MapInitErr::new(self, f) crate::map_init_err::MapInitErr::new(self, f)
} }
/// Call another service after call to this one has resolved successfully.
fn and_then<I, SF1>(self, factory: I) -> AndThenServiceFactory<Self, SF1, Req>
where
Self: Sized,
Self::Config: Clone,
I: IntoServiceFactory<SF1, Self::Response>,
SF1: ServiceFactory<
Self::Response,
Config = Self::Config,
Error = Self::Error,
InitError = Self::InitError,
>,
{
AndThenServiceFactory::new(self, factory.into_factory())
}
} }
impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {} impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {}
/// An extension trait for [`Transform`]s that provides a variety of convenient adapters.
pub trait TransformExt<S, Req>: Transform<S, Req> { pub trait TransformExt<S, Req>: Transform<S, Req> {
/// Return a new `Transform` whose init error is mapped to to a different type. /// Return a new `Transform` whose init error is mapped to to a different type.
fn map_init_err<F, E>(self, f: F) -> TransformMapInitErr<Self, S, Req, F, E> fn map_init_err<F, E>(self, f: F) -> TransformMapInitErr<Self, S, Req, F, E>

View File

@ -38,7 +38,6 @@ pub use self::apply_cfg::{apply_cfg, apply_cfg_factory};
pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt}; pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt};
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::map_config::{map_config, unit_config}; pub use self::map_config::{map_config, unit_config};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
pub use self::transform::{apply, ApplyTransform, Transform}; pub use self::transform::{apply, ApplyTransform, Transform};
#[allow(unused_imports)] #[allow(unused_imports)]

View File

@ -1,3 +1,6 @@
// TODO: see if pipeline is necessary
#![allow(dead_code)]
use core::{ use core::{
marker::PhantomData, marker::PhantomData,
task::{Context, Poll}, task::{Context, Poll},
@ -11,7 +14,7 @@ use crate::then::{ThenService, ThenServiceFactory};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Construct new pipeline with one service in pipeline chain. /// Construct new pipeline with one service in pipeline chain.
pub fn pipeline<I, S, Req>(service: I) -> Pipeline<S, Req> pub(crate) fn pipeline<I, S, Req>(service: I) -> Pipeline<S, Req>
where where
I: IntoService<S, Req>, I: IntoService<S, Req>,
S: Service<Req>, S: Service<Req>,
@ -23,7 +26,7 @@ where
} }
/// Construct new pipeline factory with one service factory. /// Construct new pipeline factory with one service factory.
pub fn pipeline_factory<I, SF, Req>(factory: I) -> PipelineFactory<SF, Req> pub(crate) fn pipeline_factory<I, SF, Req>(factory: I) -> PipelineFactory<SF, Req>
where where
I: IntoServiceFactory<SF, Req>, I: IntoServiceFactory<SF, Req>,
SF: ServiceFactory<Req>, SF: ServiceFactory<Req>,
@ -35,7 +38,7 @@ where
} }
/// Pipeline service - pipeline allows to compose multiple service into one service. /// Pipeline service - pipeline allows to compose multiple service into one service.
pub struct Pipeline<S, Req> { pub(crate) struct Pipeline<S, Req> {
service: S, service: S,
_phantom: PhantomData<Req>, _phantom: PhantomData<Req>,
} }
@ -157,7 +160,7 @@ impl<S: Service<Req>, Req> Service<Req> for Pipeline<S, Req> {
} }
/// Pipeline factory /// Pipeline factory
pub struct PipelineFactory<SF, Req> { pub(crate) struct PipelineFactory<SF, Req> {
factory: SF, factory: SF,
_phantom: PhantomData<Req>, _phantom: PhantomData<Req>,
} }

View File

@ -246,7 +246,11 @@ mod tests {
use futures_util::future::lazy; use futures_util::future::lazy;
use crate::{err, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory}; use crate::{
err, ok,
pipeline::{pipeline, pipeline_factory},
ready, Ready, Service, ServiceFactory,
};
#[derive(Clone)] #[derive(Clone)]
struct Srv1(Rc<Cell<usize>>); struct Srv1(Rc<Cell<usize>>);

View File

@ -21,11 +21,10 @@ where
ApplyTransform::new(t, factory.into_factory()) ApplyTransform::new(t, factory.into_factory())
} }
/// The `Transform` trait defines the interface of a service factory that wraps inner service /// Defines the interface of a service factory that wraps inner service during construction.
/// during construction.
/// ///
/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in /// Transformers wrap an inner service and runs during inbound and/or outbound processing in the
/// the request/response lifecycle. It may modify request and/or response. /// service lifecycle. It may modify request and/or response.
/// ///
/// For example, a timeout service wrapper: /// For example, a timeout service wrapper:
/// ///

View File

@ -31,7 +31,7 @@ use std::{
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::pipeline_factory; use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok; use futures_util::future::ok;
use log::info; use log::info;
@ -39,14 +39,9 @@ use rustls::{
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig, internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
}; };
#[derive(Debug)]
struct ServiceState {
num: Arc<AtomicUsize>,
}
#[actix_rt::main] #[actix_rt::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace"); env::set_var("RUST_LOG", "info");
env_logger::init(); env_logger::init();
let mut tls_config = ServerConfig::new(NoClientAuth::new()); let mut tls_config = ServerConfig::new(NoClientAuth::new());
@ -73,7 +68,8 @@ async fn main() -> io::Result<()> {
let count = Arc::clone(&count); let count = Arc::clone(&count);
// Set up TLS service factory // Set up TLS service factory
pipeline_factory(tls_acceptor.clone()) tls_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err)) .map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| { .and_then(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed); let num = count.fetch_add(1, Ordering::Relaxed);