From 8ad5f58d38767d3038efb1d65ebef1b87bf76a38 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 27 Apr 2021 15:58:02 -0700 Subject: [PATCH] Remove ServerBuilder::configure (#349) --- actix-server/CHANGES.md | 4 + actix-server/src/builder.rs | 27 --- actix-server/src/config.rs | 287 ------------------------------ actix-server/src/lib.rs | 2 - actix-server/src/service.rs | 6 +- actix-server/src/worker.rs | 21 +-- actix-server/tests/test_server.rs | 126 ------------- 7 files changed, 13 insertions(+), 460 deletions(-) delete mode 100644 actix-server/src/config.rs diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 58b1bd38..28c7b206 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,10 @@ # Changes ## Unreleased - 2021-xx-xx +* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349] +* Remove `ServerBuilder::configure` [#349] + +[#349]: https://github.com/actix/actix-net/pull/349 ## 2.0.0-beta.5 - 2021-04-20 diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 66aba10c..932e8f83 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -12,7 +12,6 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; use crate::accept::AcceptLoop; -use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; @@ -149,32 +148,6 @@ impl ServerBuilder { self } - /// Execute external configuration as part of the server building process. - /// - /// This function is useful for moving parts of configuration to a different module or - /// even library. - pub fn configure(mut self, f: F) -> io::Result - where - F: Fn(&mut ServiceConfig) -> io::Result<()>, - { - let mut cfg = ServiceConfig::new(self.threads, self.backlog); - - f(&mut cfg)?; - - if let Some(apply) = cfg.apply { - let mut srv = ConfiguredService::new(apply); - for (name, lst) in cfg.services { - let token = self.token.next(); - srv.stream(token, name.clone(), lst.local_addr()?); - self.sockets.push((token, name, MioListener::Tcp(lst))); - } - self.services.push(Box::new(srv)); - } - self.threads = cfg.threads; - - Ok(self) - } - /// Add new service to the server. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs deleted file mode 100644 index c5e63630..00000000 --- a/actix-server/src/config.rs +++ /dev/null @@ -1,287 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::{fmt, io}; - -use actix_rt::net::TcpStream; -use actix_service::{ - fn_service, IntoServiceFactory as IntoBaseServiceFactory, - ServiceFactory as BaseServiceFactory, -}; -use actix_utils::{counter::CounterGuard, future::ready}; -use futures_core::future::LocalBoxFuture; -use log::error; - -use crate::builder::bind_addr; -use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; -use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::Token; - -pub struct ServiceConfig { - pub(crate) services: Vec<(String, MioTcpListener)>, - pub(crate) apply: Option>, - pub(crate) threads: usize, - pub(crate) backlog: u32, -} - -impl ServiceConfig { - pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig { - ServiceConfig { - threads, - backlog, - services: Vec::new(), - apply: None, - } - } - - /// Set number of workers to start. - /// - /// By default server uses number of available logical cpu as workers - /// count. - pub fn workers(&mut self, num: usize) { - self.threads = num; - } - - /// Add new service to server - pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> - where - U: ToSocketAddrs, - { - let sockets = bind_addr(addr, self.backlog)?; - - for lst in sockets { - self._listen(name.as_ref(), lst); - } - - Ok(self) - } - - /// Add new service to server - pub fn listen>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { - self._listen(name, MioTcpListener::from_std(lst)) - } - - /// Register service configuration function. This function get called - /// during worker runtime configuration. It get executed in worker thread. - pub fn apply(&mut self, f: F) -> io::Result<()> - where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, - { - self.apply = Some(Box::new(f)); - Ok(()) - } - - fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { - if self.apply.is_none() { - self.apply = Some(Box::new(not_configured)); - } - self.services.push((name.as_ref().to_string(), lst)); - self - } -} - -pub(super) struct ConfiguredService { - rt: Box, - names: HashMap, - topics: HashMap, - services: Vec, -} - -impl ConfiguredService { - pub(super) fn new(rt: Box) -> Self { - ConfiguredService { - rt, - names: HashMap::new(), - topics: HashMap::new(), - services: Vec::new(), - } - } - - pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) { - self.names.insert(token, (name.clone(), addr)); - self.topics.insert(name, token); - self.services.push(token); - } -} - -impl InternalServiceFactory for ConfiguredService { - fn name(&self, token: Token) -> &str { - &self.names[&token].0 - } - - fn clone_factory(&self) -> Box { - Box::new(Self { - rt: self.rt.clone(), - names: self.names.clone(), - topics: self.topics.clone(), - services: self.services.clone(), - }) - } - - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { - // configure services - let mut rt = ServiceRuntime::new(self.topics.clone()); - self.rt.configure(&mut rt); - rt.validate(); - let mut names = self.names.clone(); - let tokens = self.services.clone(); - - // construct services - Box::pin(async move { - let mut services = rt.services; - // TODO: Proper error handling here - for f in rt.onstart.into_iter() { - f.await; - } - let mut res = vec![]; - for token in tokens { - if let Some(srv) = services.remove(&token) { - let newserv = srv.new_service(()); - match newserv.await { - Ok(serv) => { - res.push((token, serv)); - } - Err(_) => { - error!("Can not construct service"); - return Err(()); - } - } - } else { - let name = names.remove(&token).unwrap().0; - res.push(( - token, - Box::new(StreamService::new(fn_service(move |_: TcpStream| { - error!("Service {:?} is not configured", name); - ready::>(Ok(())) - }))), - )); - }; - } - Ok(res) - }) - } -} - -pub(super) trait ServiceRuntimeConfiguration: Send { - fn clone(&self) -> Box; - - fn configure(&self, rt: &mut ServiceRuntime); -} - -impl ServiceRuntimeConfiguration for F -where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, -{ - fn clone(&self) -> Box { - Box::new(self.clone()) - } - - fn configure(&self, rt: &mut ServiceRuntime) { - (self)(rt) - } -} - -fn not_configured(_: &mut ServiceRuntime) { - error!("Service is not configured"); -} - -pub struct ServiceRuntime { - names: HashMap, - services: HashMap, - onstart: Vec>, -} - -impl ServiceRuntime { - fn new(names: HashMap) -> Self { - ServiceRuntime { - names, - services: HashMap::new(), - onstart: Vec::new(), - } - } - - fn validate(&self) { - for (name, token) in &self.names { - if !self.services.contains_key(&token) { - error!("Service {:?} is not configured", name); - } - } - } - - /// Register service. - /// - /// Name of the service must be registered during configuration stage with - /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. - pub fn service(&mut self, name: &str, service: F) - where - F: IntoBaseServiceFactory, - T: BaseServiceFactory + 'static, - T::Future: 'static, - T::Service: 'static, - T::InitError: fmt::Debug, - { - // let name = name.to_owned(); - if let Some(token) = self.names.get(name) { - self.services.insert( - *token, - Box::new(ServiceFactory { - inner: service.into_factory(), - }), - ); - } else { - panic!("Unknown service: {:?}", name); - } - } - - /// Execute future before services initialization. - pub fn on_start(&mut self, fut: F) - where - F: Future + 'static, - { - self.onstart.push(Box::pin(fut)) - } -} - -type BoxedNewService = Box< - dyn BaseServiceFactory< - (CounterGuard, MioStream), - Response = (), - Error = (), - InitError = (), - Config = (), - Service = BoxedServerService, - Future = LocalBoxFuture<'static, Result>, - >, ->; - -struct ServiceFactory { - inner: T, -} - -impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory -where - T: BaseServiceFactory, - T::Future: 'static, - T::Service: 'static, - T::Error: 'static, - T::InitError: fmt::Debug + 'static, -{ - type Response = (); - type Error = (); - type Config = (); - type Service = BoxedServerService; - type InitError = (); - type Future = LocalBoxFuture<'static, Result>; - - fn new_service(&self, _: ()) -> Self::Future { - let fut = self.inner.new_service(()); - Box::pin(async move { - match fut.await { - Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), - Err(e) => { - error!("Can not construct service: {:?}", e); - Err(()) - } - } - }) - } -} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index af9ab0b0..cf484f10 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -6,7 +6,6 @@ mod accept; mod builder; -mod config; mod server; mod service; mod signals; @@ -16,7 +15,6 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index da57af67..d0eea966 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -24,7 +24,7 @@ pub(crate) trait InternalServiceFactory: Send { fn clone_factory(&self) -> Box; - fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>; } pub(crate) type BoxedServerService = Box< @@ -131,14 +131,14 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { match fut.await { Ok(inner) => { let service = Box::new(StreamService::new(inner)) as _; - Ok(vec![(token, service)]) + Ok((token, service)) } Err(_) => Err(()), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 7bc211b1..3d799370 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -231,11 +231,7 @@ impl ServerWorker { .enumerate() .map(|(idx, factory)| { let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } + async move { fut.await.map(|(t, s)| (idx, t, s)) } }) .collect::>(); @@ -248,7 +244,6 @@ impl ServerWorker { let services = match res { Ok(res) => res .into_iter() - .flatten() .fold(Vec::new(), |mut services, (factory, token, service)| { assert_eq!(token.0, services.len()); services.push(WorkerService { @@ -360,7 +355,7 @@ enum WorkerState { struct Restart { factory_id: usize, token: Token, - fut: LocalBoxFuture<'static, Result, ()>>, + fut: LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>, } // Shutdown keep states necessary for server shutdown: @@ -440,19 +435,15 @@ impl Future for ServerWorker { let factory_id = restart.factory_id; let token = restart.token; - let service = ready!(restart.fut.as_mut().poll(cx)) + let (token_new, service) = ready!(restart.fut.as_mut().poll(cx)) .unwrap_or_else(|_| { panic!( "Can not restart {:?} service", this.factories[factory_id].name(token) ) - }) - .into_iter() - // Find the same token from vector. There should be only one - // So the first match would be enough. - .find(|(t, _)| *t == token) - .map(|(_, service)| service) - .expect("No BoxedServerService found"); + }); + + assert_eq!(token, token_new); trace!( "Service {:?} has been restarted", diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index cc9f8190..78894816 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -142,57 +142,6 @@ fn test_start() { let _ = h.join(); } -#[test] -fn test_configure() { - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let addr3 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = num.clone(); - - let h = thread::spawn(move || { - let num = num2.clone(); - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .apply(move |rt| { - let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); - rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Ordering::Relaxed); - })) - }) - }) - .unwrap() - .workers(1) - .run() - })); - - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); - }); - let (_, sys) = rx.recv().unwrap(); - thread::sleep(Duration::from_millis(500)); - - assert!(net::TcpStream::connect(addr1).is_ok()); - assert!(net::TcpStream::connect(addr2).is_ok()); - assert!(net::TcpStream::connect(addr3).is_ok()); - assert_eq!(num.load(Ordering::Relaxed), 1); - sys.stop(); - let _ = h.join(); -} - #[actix_rt::test] async fn test_max_concurrent_connections() { // Note: @@ -305,81 +254,6 @@ async fn test_service_restart() { let num_clone = num.clone(); let num2_clone = num2.clone(); - let h = thread::spawn(move || { - actix_rt::System::new().block_on(async { - let server = Server::build() - .backlog(1) - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let num2 = num2.clone(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .apply(move |rt| { - let num = num.clone(); - let num2 = num2.clone(); - rt.service( - "addr1", - fn_factory(move || { - let num = num.clone(); - async move { Ok::<_, ()>(TestService(num)) } - }), - ); - rt.service( - "addr2", - fn_factory(move || { - let num2 = num2.clone(); - async move { Ok::<_, ()>(TestService(num2)) } - }), - ); - }) - }) - .unwrap() - .workers(1) - .run(); - - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await - }) - }); - - let (server, sys) = rx.recv().unwrap(); - - for _ in 0..5 { - TcpStream::connect(addr1) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - TcpStream::connect(addr2) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - } - - sleep(Duration::from_secs(3)).await; - - assert!(num_clone.load(Ordering::SeqCst) > 5); - assert!(num2_clone.load(Ordering::SeqCst) > 5); - - sys.stop(); - let _ = server.stop(false); - let _ = h.join().unwrap(); - - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = Arc::new(AtomicUsize::new(0)); - - let num_clone = num.clone(); - let num2_clone = num2.clone(); - let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async {