diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 8cf8d448..84818261 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -6,6 +6,8 @@ * Fix compilation on non-unix platforms +* Better handling server configuration + ## [1.0.0-alpha.2] - 2019-12-02 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index e61eb8e1..9c7724d0 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -42,3 +42,4 @@ mio-uds = { version = "0.6.7" } [dev-dependencies] bytes = "0.4" env_logger = "0.6" +actix-testing = "1.0.0-alpha.2" \ No newline at end of file diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index eb633ac2..df5fa805 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -4,7 +4,7 @@ use std::{fmt, io, net}; use actix_rt::net::TcpStream; use actix_service as actix; use actix_utils::counter::CounterGuard; -use futures::future::{Future, FutureExt, LocalBoxFuture}; +use futures::future::{ok, Future, FutureExt, LocalBoxFuture}; use log::error; use super::builder::bind_addr; @@ -75,7 +75,8 @@ impl ServiceConfig { pub(super) struct ConfiguredService { rt: Box, names: HashMap, - services: HashMap, + topics: HashMap, + services: Vec, } impl ConfiguredService { @@ -83,13 +84,15 @@ impl ConfiguredService { ConfiguredService { rt, names: HashMap::new(), - services: HashMap::new(), + topics: HashMap::new(), + services: Vec::new(), } } pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) { self.names.insert(token, (name.clone(), addr)); - self.services.insert(name, token); + self.topics.insert(name.clone(), token); + self.services.push(token); } } @@ -102,34 +105,50 @@ impl InternalServiceFactory for ConfiguredService { Box::new(Self { rt: self.rt.clone(), names: self.names.clone(), + topics: self.topics.clone(), services: self.services.clone(), }) } fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { // configure services - let mut rt = ServiceRuntime::new(self.services.clone()); + let mut rt = ServiceRuntime::new(self.topics.clone()); self.rt.configure(&mut rt); rt.validate(); + let mut names = self.names.clone(); + let tokens = self.services.clone(); // construct services async move { - let services = rt.services; + let mut services = rt.services; // TODO: Proper error handling here for f in rt.onstart.into_iter() { f.await; } let mut res = vec![]; - for (token, ns) in services.into_iter() { - let newserv = ns.new_service(()); - match newserv.await { - Ok(serv) => { - res.push((token, serv)); - } - Err(_) => { - error!("Can not construct service"); - return Err(()); + for token in tokens { + if let Some(srv) = services.remove(&token) { + let newserv = srv.new_service(()); + match newserv.await { + Ok(serv) => { + res.push((token, serv)); + } + Err(_) => { + error!("Can not construct service"); + return Err(()); + } } + } else { + let name = names.remove(&token).unwrap().0; + res.push(( + token, + Box::new(StreamService::new(actix::service_fn2( + move |_: TcpStream| { + error!("Service {:?} is not configured", name); + ok::<_, ()>(()) + }, + ))), + )); }; } return Ok(res); diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 5c2fa5dd..c46126a8 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -81,12 +81,10 @@ where if let Ok(stream) = stream { let f = self.service.call(stream); - spawn( - async move { - let _ = f.await; - drop(guard); - } - ); + spawn(async move { + let _ = f.await; + drop(guard); + }); ok(()) } else { err(()) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 2965e3af..a64a3f05 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -188,31 +188,29 @@ impl Worker { })); } - spawn( - async move { - let res = join_all(fut).await; - let res: Result, _> = res.into_iter().collect(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - } + spawn(async move { + let res = join_all(fut).await; + let res: Result, _> = res.into_iter().collect(); + match res { + Ok(services) => { + for item in services { + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); } } - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - } } - wrk.await + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::current().stop(); + } } - ); + wrk.await + }); } fn shutdown(&mut self, force: bool) { diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 32761ef0..26ad9ff9 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,5 +1,6 @@ use std::io::Read; -use std::sync::mpsc; +use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; +use std::sync::{mpsc, Arc}; use std::{net, thread, time}; use actix_codec::{BytesCodec, Framed}; @@ -7,7 +8,8 @@ use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::service_fn; use bytes::Bytes; -use futures::{future::ok, SinkExt}; +use futures::future::{lazy, ok}; +use futures::SinkExt; use net2::TcpBuilder; fn unused_addr() -> net::SocketAddr { @@ -126,3 +128,50 @@ fn test_start() { let _ = sys.stop(); let _ = h.join(); } + +#[test] +fn test_configure() { + let addr1 = unused_addr(); + let addr2 = unused_addr(); + let addr3 = unused_addr(); + let (tx, rx) = mpsc::channel(); + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let h = thread::spawn(move || { + let num = num2.clone(); + let sys = actix_rt::System::new("test"); + let srv = Server::build() + .configure(move |cfg| { + let num = num.clone(); + let lst = net::TcpListener::bind(addr3).unwrap(); + cfg.bind("addr1", addr1) + .unwrap() + .bind("addr2", addr2) + .unwrap() + .listen("addr3", lst) + .apply(move |rt| { + let num = num.clone(); + rt.service("addr1", service_fn(|_| ok::<_, ()>(()))); + rt.service("addr3", service_fn(|_| ok::<_, ()>(()))); + rt.on_start(lazy(move |_| { + let _ = num.fetch_add(1, Relaxed); + })) + }) + }) + .unwrap() + .workers(1) + .start(); + let _ = tx.send((srv, actix_rt::System::current())); + let _ = sys.run(); + }); + let (_, sys) = rx.recv().unwrap(); + thread::sleep(time::Duration::from_millis(500)); + + assert!(net::TcpStream::connect(addr1).is_ok()); + assert!(net::TcpStream::connect(addr2).is_ok()); + assert!(net::TcpStream::connect(addr3).is_ok()); + assert_eq!(num.load(Relaxed), 1); + let _ = sys.stop(); + let _ = h.join(); +}