From 5097b12b7c816b8d40b151a57987ae18a42da29f Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 1 Nov 2021 03:19:32 +0000 Subject: [PATCH] remove and_then_send --- actix-server/examples/tcp-echo.rs | 81 ++++++++-------- actix-service/CHANGES.md | 4 +- actix-service/src/and_then.rs | 85 ----------------- actix-service/src/ext.rs | 18 +--- actix-service/src/fn_service.rs | 149 +++++++++++++++++++++++++++++- actix-tls/examples/tcp-rustls.rs | 41 +++++--- 6 files changed, 224 insertions(+), 154 deletions(-) diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index dfe5e01e..a6786b08 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -19,7 +19,7 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::{fn_service, ServiceFactoryExt as _}; +use actix_service::{fn_factory, fn_service, ServiceExt as _}; use bytes::BytesMut; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -39,53 +39,62 @@ async fn main() -> io::Result<()> { // to return a service *factory*; so it can be created once per worker. Server::build() .bind("echo", addr, { - let count = Arc::clone(&count); - let num2 = Arc::clone(&count); - - let svc = fn_service(move |mut stream: TcpStream| { + fn_factory::<_, (), _, _, _, _>(move || { let count = Arc::clone(&count); async move { - let num = count.fetch_add(1, Ordering::SeqCst); - let num = num + 1; + let count = Arc::clone(&count); + let count2 = Arc::clone(&count); - let mut size = 0; - let mut buf = BytesMut::new(); + let svc = fn_service(move |mut stream: TcpStream| { + let count = Arc::clone(&count); - loop { - match stream.read_buf(&mut buf).await { - // end of stream; bail from loop - Ok(0) => break, + let num = count.fetch_add(1, Ordering::SeqCst) + 1; - // more bytes to process - Ok(bytes_read) => { - info!("[{}] read {} bytes", num, bytes_read); - stream.write_all(&buf[size..]).await.unwrap(); - size += bytes_read; + info!( + "[{}] accepting connection from: {}", + num, + stream.peer_addr().unwrap() + ); + + async move { + let mut size = 0; + let mut buf = BytesMut::new(); + + loop { + match stream.read_buf(&mut buf).await { + // end of stream; bail from loop + Ok(0) => break, + + // more bytes to process + Ok(bytes_read) => { + info!("[{}] read {} bytes", num, bytes_read); + stream.write_all(&buf[size..]).await.unwrap(); + size += bytes_read; + } + + // stream error; bail from loop with error + Err(err) => { + error!("Stream Error: {:?}", err); + return Err(()); + } + } } - // stream error; bail from loop with error - Err(err) => { - error!("Stream Error: {:?}", err); - return Err(()); - } + // send data down service pipeline + Ok((buf.freeze(), size)) } - } + }) + .map_err(|err| error!("Service Error: {:?}", err)) + .and_then(move |(_, size)| { + let num = count2.load(Ordering::SeqCst); + info!("[{}] total bytes read: {}", num, size); + async move { Ok(size) } + }); - // send data down service pipeline - Ok((buf.freeze(), size)) + Ok::<_, ()>(svc.clone()) } }) - .map_err(|err| error!("Service Error: {:?}", err)) - .and_then_send(move |(_, size)| { - let num = num2.load(Ordering::SeqCst); - info!("[{}] total bytes read: {}", num, size); - async move { Ok(size) } - }); - - let svc2 = svc.clone(); - - svc2 })? .workers(2) .run() diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 5aaffc06..8a2e98a7 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,7 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx -* Add `.and_then_send()` & `AndThenSendServiceFactory` for creating `Send`able chained services. [#403] +* `fn_factory[_with_config]` types now impl `Send` even when config, service, request types do not. [#403] + +[#403]: https://github.com/actix/actix-net/pull/403 ## 2.0.1 - 2021-10-11 diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 6a5d3ace..2a7e18d1 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -261,91 +261,6 @@ where } } -/// `.and_then_send()` service factory combinator -pub struct AndThenSendServiceFactory -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, -{ - inner_a: A, - inner_b: B, - _phantom: PhantomData, -} - -impl AndThenSendServiceFactory -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, -{ - /// Create new `AndThenFactory` combinator - pub(crate) fn new(a: A, b: B) -> Self { - Self { - inner_a: a, - inner_b: b, - _phantom: PhantomData, - } - } -} - -impl ServiceFactory for AndThenSendServiceFactory -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, -{ - type Response = B::Response; - type Error = A::Error; - - type Config = A::Config; - type Service = AndThenService; - type InitError = A::InitError; - type Future = AndThenServiceFactoryResponse; - - fn new_service(&self, cfg: A::Config) -> Self::Future { - AndThenServiceFactoryResponse::new( - self.inner_a.new_service(cfg.clone()), - self.inner_b.new_service(cfg), - ) - } -} - -impl Clone for AndThenSendServiceFactory -where - A: ServiceFactory + Clone, - A::Config: Clone, - B: ServiceFactory< - A::Response, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - > + Clone, -{ - fn clone(&self) -> Self { - Self { - inner_a: self.inner_a.clone(), - inner_b: self.inner_b.clone(), - _phantom: PhantomData, - } - } -} - #[cfg(test)] mod tests { use alloc::rc::Rc; diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index 0f82dc1b..284b4ecc 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,5 +1,5 @@ use crate::{ - and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory}, + and_then::{AndThenService, AndThenServiceFactory}, map::Map, map_err::MapErr, transform_err::TransformMapInitErr, @@ -105,22 +105,6 @@ pub trait ServiceFactoryExt: ServiceFactory { { AndThenServiceFactory::new(self, factory.into_factory()) } - - /// Call another service after call to this one has resolved successfully. - fn and_then_send(self, factory: I) -> AndThenSendServiceFactory - where - Self: Sized, - Self::Config: Clone, - I: IntoServiceFactory, - SF1: ServiceFactory< - Self::Response, - Config = Self::Config, - Error = Self::Error, - InitError = Self::InitError, - >, - { - AndThenSendServiceFactory::new(self, factory.into_factory()) - } } impl ServiceFactoryExt for SF where SF: ServiceFactory {} diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index f83ef81f..6f71e04e 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -3,6 +3,7 @@ use core::{future::Future, marker::PhantomData}; use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory}; /// Create `ServiceFactory` for function that can act as a `Service` +// TODO: remove unnecessary Cfg type param pub fn fn_service( f: F, ) -> FnServiceFactory @@ -48,6 +49,7 @@ where /// Ok(()) /// } /// ``` +// TODO: remove unnecessary Cfg type param pub fn fn_factory( f: F, ) -> FnServiceNoConfig @@ -160,7 +162,7 @@ where Fut: Future>, { f: F, - _t: PhantomData<(Req, Cfg)>, + _t: PhantomData, } impl FnServiceFactory @@ -237,7 +239,7 @@ where Srv: Service, { f: F, - _t: PhantomData<(Fut, Cfg, Req, Srv, Err)>, + _t: PhantomData, } impl FnServiceConfig @@ -293,7 +295,7 @@ where Fut: Future>, { f: F, - _t: PhantomData<(Cfg, Req)>, + _t: PhantomData, } impl FnServiceNoConfig @@ -353,10 +355,11 @@ where mod tests { use core::task::Poll; + use alloc::rc::Rc; use futures_util::future::lazy; use super::*; - use crate::{ok, Service, ServiceFactory}; + use crate::{boxed, ok, Service, ServiceExt, ServiceFactory, ServiceFactoryExt}; #[actix_rt::test] async fn test_fn_service() { @@ -391,4 +394,142 @@ mod tests { assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv", 1)); } + + // these three properties of a service factory are usually important + fn is_static(_t: &T) {} + fn impls_clone(_t: &T) {} + fn impls_send(_t: &T) {} + + #[actix_rt::test] + async fn test_fn_factory_impl_send() { + let svc_fac = fn_factory_with_config(|cfg: usize| { + ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg)))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Cfg type is explicitly !Send + let svc_fac = fn_factory_with_config(|cfg: Rc| { + let cfg = Rc::clone(&cfg); + ok::<_, ()>(fn_service(move |_: ()| ok::<_, ()>(("srv", *cfg)))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(fn_service(move |()| ok::<_, ()>("srv"))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Req type is explicitly !Send + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(fn_service(move |_: Rc<()>| ok::<_, ()>("srv"))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + // Service type is explicitly !Send + let svc_fac = fn_factory::<_, (), _, _, _, _>(|| { + ok::<_, ()>(boxed::rc_service(fn_service(move |_: ()| { + ok::<_, ()>("srv") + }))) + }); + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + } + + #[actix_rt::test] + async fn test_service_combinators_impls() { + #[derive(Clone)] + struct Ident; + + impl Service for Ident { + type Response = T; + type Error = (); + type Future = Ready>; + + crate::always_ready!(); + + fn call(&self, req: T) -> Self::Future { + ok(req) + } + } + + let svc = Ident; + is_static(&svc); + impls_clone(&svc); + impls_send(&svc); + + let svc = ServiceExt::map(Ident, core::convert::identity); + impls_send(&svc); + svc.call(()).await.unwrap(); + + let svc = ServiceExt::map_err(Ident, core::convert::identity); + impls_send(&svc); + svc.call(()).await.unwrap(); + + let svc = ServiceExt::and_then(Ident, Ident); + // impls_send(&svc); // fails to compile :( + svc.call(()).await.unwrap(); + + // let svc = ServiceExt::and_then_send(Ident, Ident); + // impls_send(&svc); + // svc.call(()).await.unwrap(); + } + + #[actix_rt::test] + async fn test_factory_combinators_impls() { + #[derive(Clone)] + struct Ident; + + impl ServiceFactory for Ident { + type Response = T; + type Error = (); + type Config = (); + // explicitly !Send result service + type Service = boxed::RcService; + type InitError = (); + type Future = Ready>; + + fn new_service(&self, _cfg: Self::Config) -> Self::Future { + ok(boxed::rc_service(fn_service(ok))) + } + } + + let svc_fac = Ident; + is_static(&svc_fac); + impls_clone(&svc_fac); + impls_send(&svc_fac); + + let svc_fac = ServiceFactoryExt::map(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::map_err(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::map_init_err(Ident, core::convert::identity); + impls_send(&svc_fac); + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + let svc_fac = ServiceFactoryExt::and_then(Ident, Ident); + // impls_send(&svc_fac); // fails to compile :( + let svc = svc_fac.new_service(()).await.unwrap(); + svc.call(()).await.unwrap(); + + // let svc_fac = ServiceFactoryExt::and_then_send(Ident, Ident); + // impls_send(&svc_fac); + // let svc = svc_fac.new_service(()).await.unwrap(); + // svc.call(()).await.unwrap(); + } } diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index 7cf56e6d..39d43b1a 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -31,21 +31,24 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::ServiceFactoryExt as _; +use actix_service::{fn_factory, fn_service, ServiceExt as _, ServiceFactory}; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; use log::info; use rustls::{server::ServerConfig, Certificate, PrivateKey}; use rustls_pemfile::{certs, rsa_private_keys}; +const CERT_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/cert.pem"]; +const KEY_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/key.pem"]; + #[actix_rt::main] async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "info"); env_logger::init(); // Load TLS key and cert files - let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap()); - let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap()); + let cert_file = &mut BufReader::new(File::open(CERT_PATH).unwrap()); + let key_file = &mut BufReader::new(File::open(KEY_PATH).unwrap()); let cert_chain = certs(cert_file) .unwrap() @@ -72,14 +75,30 @@ async fn main() -> io::Result<()> { let count = Arc::clone(&count); // Set up TLS service factory - tls_acceptor - .clone() - .map_err(|err| println!("Rustls error: {:?}", err)) - .and_then_send(move |stream: TlsStream| { - let num = count.fetch_add(1, Ordering::Relaxed); - info!("[{}] Got TLS connection: {:?}", num, &*stream); - ok(()) - }) + // note: moving rustls acceptor into fn_factory scope + fn_factory(move || { + // manually call new_service so that and_then can be used from ServiceExt + // type annotation for inner stream type is required + let svc = >::new_service( + &tls_acceptor, + (), + ); + + let count = Arc::clone(&count); + + async move { + let svc = svc + .await? + .map_err(|err| println!("Rustls error: {:?}", err)) + .and_then(fn_service(move |stream: TlsStream| { + let num = count.fetch_add(1, Ordering::Relaxed) + 1; + info!("[{}] Got TLS connection: {:?}", num, &*stream); + ok(()) + })); + + Ok::<_, ()>(svc) + } + }) })? .workers(1) .run()