mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-24 01:11:07 +01:00
Remove ServerBuilder::configure (#349)
This commit is contained in:
parent
613b2be51f
commit
8ad5f58d38
@ -1,6 +1,10 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
|
||||||
|
* Remove `ServerBuilder::configure` [#349]
|
||||||
|
|
||||||
|
[#349]: https://github.com/actix/actix-net/pull/349
|
||||||
|
|
||||||
|
|
||||||
## 2.0.0-beta.5 - 2021-04-20
|
## 2.0.0-beta.5 - 2021-04-20
|
||||||
|
@ -12,7 +12,6 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
|
|||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
use crate::accept::AcceptLoop;
|
use crate::accept::AcceptLoop;
|
||||||
use crate::config::{ConfiguredService, ServiceConfig};
|
|
||||||
use crate::server::{Server, ServerCommand};
|
use crate::server::{Server, ServerCommand};
|
||||||
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||||
use crate::signals::{Signal, Signals};
|
use crate::signals::{Signal, Signals};
|
||||||
@ -149,32 +148,6 @@ impl ServerBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute external configuration as part of the server building process.
|
|
||||||
///
|
|
||||||
/// This function is useful for moving parts of configuration to a different module or
|
|
||||||
/// even library.
|
|
||||||
pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder>
|
|
||||||
where
|
|
||||||
F: Fn(&mut ServiceConfig) -> io::Result<()>,
|
|
||||||
{
|
|
||||||
let mut cfg = ServiceConfig::new(self.threads, self.backlog);
|
|
||||||
|
|
||||||
f(&mut cfg)?;
|
|
||||||
|
|
||||||
if let Some(apply) = cfg.apply {
|
|
||||||
let mut srv = ConfiguredService::new(apply);
|
|
||||||
for (name, lst) in cfg.services {
|
|
||||||
let token = self.token.next();
|
|
||||||
srv.stream(token, name.clone(), lst.local_addr()?);
|
|
||||||
self.sockets.push((token, name, MioListener::Tcp(lst)));
|
|
||||||
}
|
|
||||||
self.services.push(Box::new(srv));
|
|
||||||
}
|
|
||||||
self.threads = cfg.threads;
|
|
||||||
|
|
||||||
Ok(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add new service to the server.
|
/// Add new service to the server.
|
||||||
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||||
where
|
where
|
||||||
|
@ -1,287 +0,0 @@
|
|||||||
use std::collections::HashMap;
|
|
||||||
use std::future::Future;
|
|
||||||
use std::{fmt, io};
|
|
||||||
|
|
||||||
use actix_rt::net::TcpStream;
|
|
||||||
use actix_service::{
|
|
||||||
fn_service, IntoServiceFactory as IntoBaseServiceFactory,
|
|
||||||
ServiceFactory as BaseServiceFactory,
|
|
||||||
};
|
|
||||||
use actix_utils::{counter::CounterGuard, future::ready};
|
|
||||||
use futures_core::future::LocalBoxFuture;
|
|
||||||
use log::error;
|
|
||||||
|
|
||||||
use crate::builder::bind_addr;
|
|
||||||
use crate::service::{BoxedServerService, InternalServiceFactory, StreamService};
|
|
||||||
use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
|
||||||
use crate::Token;
|
|
||||||
|
|
||||||
pub struct ServiceConfig {
|
|
||||||
pub(crate) services: Vec<(String, MioTcpListener)>,
|
|
||||||
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
|
|
||||||
pub(crate) threads: usize,
|
|
||||||
pub(crate) backlog: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceConfig {
|
|
||||||
pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig {
|
|
||||||
ServiceConfig {
|
|
||||||
threads,
|
|
||||||
backlog,
|
|
||||||
services: Vec::new(),
|
|
||||||
apply: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set number of workers to start.
|
|
||||||
///
|
|
||||||
/// By default server uses number of available logical cpu as workers
|
|
||||||
/// count.
|
|
||||||
pub fn workers(&mut self, num: usize) {
|
|
||||||
self.threads = num;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add new service to server
|
|
||||||
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
|
|
||||||
where
|
|
||||||
U: ToSocketAddrs,
|
|
||||||
{
|
|
||||||
let sockets = bind_addr(addr, self.backlog)?;
|
|
||||||
|
|
||||||
for lst in sockets {
|
|
||||||
self._listen(name.as_ref(), lst);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add new service to server
|
|
||||||
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: StdTcpListener) -> &mut Self {
|
|
||||||
self._listen(name, MioTcpListener::from_std(lst))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register service configuration function. This function get called
|
|
||||||
/// during worker runtime configuration. It get executed in worker thread.
|
|
||||||
pub fn apply<F>(&mut self, f: F) -> io::Result<()>
|
|
||||||
where
|
|
||||||
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
|
||||||
{
|
|
||||||
self.apply = Some(Box::new(f));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn _listen<N: AsRef<str>>(&mut self, name: N, lst: MioTcpListener) -> &mut Self {
|
|
||||||
if self.apply.is_none() {
|
|
||||||
self.apply = Some(Box::new(not_configured));
|
|
||||||
}
|
|
||||||
self.services.push((name.as_ref().to_string(), lst));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) struct ConfiguredService {
|
|
||||||
rt: Box<dyn ServiceRuntimeConfiguration>,
|
|
||||||
names: HashMap<Token, (String, StdSocketAddr)>,
|
|
||||||
topics: HashMap<String, Token>,
|
|
||||||
services: Vec<Token>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConfiguredService {
|
|
||||||
pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
|
|
||||||
ConfiguredService {
|
|
||||||
rt,
|
|
||||||
names: HashMap::new(),
|
|
||||||
topics: HashMap::new(),
|
|
||||||
services: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) {
|
|
||||||
self.names.insert(token, (name.clone(), addr));
|
|
||||||
self.topics.insert(name, token);
|
|
||||||
self.services.push(token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InternalServiceFactory for ConfiguredService {
|
|
||||||
fn name(&self, token: Token) -> &str {
|
|
||||||
&self.names[&token].0
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
|
|
||||||
Box::new(Self {
|
|
||||||
rt: self.rt.clone(),
|
|
||||||
names: self.names.clone(),
|
|
||||||
topics: self.topics.clone(),
|
|
||||||
services: self.services.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
|
||||||
// configure services
|
|
||||||
let mut rt = ServiceRuntime::new(self.topics.clone());
|
|
||||||
self.rt.configure(&mut rt);
|
|
||||||
rt.validate();
|
|
||||||
let mut names = self.names.clone();
|
|
||||||
let tokens = self.services.clone();
|
|
||||||
|
|
||||||
// construct services
|
|
||||||
Box::pin(async move {
|
|
||||||
let mut services = rt.services;
|
|
||||||
// TODO: Proper error handling here
|
|
||||||
for f in rt.onstart.into_iter() {
|
|
||||||
f.await;
|
|
||||||
}
|
|
||||||
let mut res = vec![];
|
|
||||||
for token in tokens {
|
|
||||||
if let Some(srv) = services.remove(&token) {
|
|
||||||
let newserv = srv.new_service(());
|
|
||||||
match newserv.await {
|
|
||||||
Ok(serv) => {
|
|
||||||
res.push((token, serv));
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
error!("Can not construct service");
|
|
||||||
return Err(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let name = names.remove(&token).unwrap().0;
|
|
||||||
res.push((
|
|
||||||
token,
|
|
||||||
Box::new(StreamService::new(fn_service(move |_: TcpStream| {
|
|
||||||
error!("Service {:?} is not configured", name);
|
|
||||||
ready::<Result<_, ()>>(Ok(()))
|
|
||||||
}))),
|
|
||||||
));
|
|
||||||
};
|
|
||||||
}
|
|
||||||
Ok(res)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) trait ServiceRuntimeConfiguration: Send {
|
|
||||||
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
|
|
||||||
|
|
||||||
fn configure(&self, rt: &mut ServiceRuntime);
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F> ServiceRuntimeConfiguration for F
|
|
||||||
where
|
|
||||||
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
|
||||||
{
|
|
||||||
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
|
|
||||||
Box::new(self.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn configure(&self, rt: &mut ServiceRuntime) {
|
|
||||||
(self)(rt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn not_configured(_: &mut ServiceRuntime) {
|
|
||||||
error!("Service is not configured");
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ServiceRuntime {
|
|
||||||
names: HashMap<String, Token>,
|
|
||||||
services: HashMap<Token, BoxedNewService>,
|
|
||||||
onstart: Vec<LocalBoxFuture<'static, ()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceRuntime {
|
|
||||||
fn new(names: HashMap<String, Token>) -> Self {
|
|
||||||
ServiceRuntime {
|
|
||||||
names,
|
|
||||||
services: HashMap::new(),
|
|
||||||
onstart: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn validate(&self) {
|
|
||||||
for (name, token) in &self.names {
|
|
||||||
if !self.services.contains_key(&token) {
|
|
||||||
error!("Service {:?} is not configured", name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register service.
|
|
||||||
///
|
|
||||||
/// Name of the service must be registered during configuration stage with
|
|
||||||
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
|
|
||||||
pub fn service<T, F>(&mut self, name: &str, service: F)
|
|
||||||
where
|
|
||||||
F: IntoBaseServiceFactory<T, TcpStream>,
|
|
||||||
T: BaseServiceFactory<TcpStream, Config = ()> + 'static,
|
|
||||||
T::Future: 'static,
|
|
||||||
T::Service: 'static,
|
|
||||||
T::InitError: fmt::Debug,
|
|
||||||
{
|
|
||||||
// let name = name.to_owned();
|
|
||||||
if let Some(token) = self.names.get(name) {
|
|
||||||
self.services.insert(
|
|
||||||
*token,
|
|
||||||
Box::new(ServiceFactory {
|
|
||||||
inner: service.into_factory(),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
panic!("Unknown service: {:?}", name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute future before services initialization.
|
|
||||||
pub fn on_start<F>(&mut self, fut: F)
|
|
||||||
where
|
|
||||||
F: Future<Output = ()> + 'static,
|
|
||||||
{
|
|
||||||
self.onstart.push(Box::pin(fut))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type BoxedNewService = Box<
|
|
||||||
dyn BaseServiceFactory<
|
|
||||||
(CounterGuard, MioStream),
|
|
||||||
Response = (),
|
|
||||||
Error = (),
|
|
||||||
InitError = (),
|
|
||||||
Config = (),
|
|
||||||
Service = BoxedServerService,
|
|
||||||
Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
|
|
||||||
>,
|
|
||||||
>;
|
|
||||||
|
|
||||||
struct ServiceFactory<T> {
|
|
||||||
inner: T,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory<T>
|
|
||||||
where
|
|
||||||
T: BaseServiceFactory<TcpStream, Config = ()>,
|
|
||||||
T::Future: 'static,
|
|
||||||
T::Service: 'static,
|
|
||||||
T::Error: 'static,
|
|
||||||
T::InitError: fmt::Debug + 'static,
|
|
||||||
{
|
|
||||||
type Response = ();
|
|
||||||
type Error = ();
|
|
||||||
type Config = ();
|
|
||||||
type Service = BoxedServerService;
|
|
||||||
type InitError = ();
|
|
||||||
type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
|
|
||||||
|
|
||||||
fn new_service(&self, _: ()) -> Self::Future {
|
|
||||||
let fut = self.inner.new_service(());
|
|
||||||
Box::pin(async move {
|
|
||||||
match fut.await {
|
|
||||||
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Can not construct service: {:?}", e);
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -6,7 +6,6 @@
|
|||||||
|
|
||||||
mod accept;
|
mod accept;
|
||||||
mod builder;
|
mod builder;
|
||||||
mod config;
|
|
||||||
mod server;
|
mod server;
|
||||||
mod service;
|
mod service;
|
||||||
mod signals;
|
mod signals;
|
||||||
@ -16,7 +15,6 @@ mod waker_queue;
|
|||||||
mod worker;
|
mod worker;
|
||||||
|
|
||||||
pub use self::builder::ServerBuilder;
|
pub use self::builder::ServerBuilder;
|
||||||
pub use self::config::{ServiceConfig, ServiceRuntime};
|
|
||||||
pub use self::server::Server;
|
pub use self::server::Server;
|
||||||
pub use self::service::ServiceFactory;
|
pub use self::service::ServiceFactory;
|
||||||
pub use self::test_server::TestServer;
|
pub use self::test_server::TestServer;
|
||||||
|
@ -24,7 +24,7 @@ pub(crate) trait InternalServiceFactory: Send {
|
|||||||
|
|
||||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
|
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
|
||||||
|
|
||||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
|
fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) type BoxedServerService = Box<
|
pub(crate) type BoxedServerService = Box<
|
||||||
@ -131,14 +131,14 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>> {
|
||||||
let token = self.token;
|
let token = self.token;
|
||||||
let fut = self.inner.create().new_service(());
|
let fut = self.inner.create().new_service(());
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
match fut.await {
|
match fut.await {
|
||||||
Ok(inner) => {
|
Ok(inner) => {
|
||||||
let service = Box::new(StreamService::new(inner)) as _;
|
let service = Box::new(StreamService::new(inner)) as _;
|
||||||
Ok(vec![(token, service)])
|
Ok((token, service))
|
||||||
}
|
}
|
||||||
Err(_) => Err(()),
|
Err(_) => Err(()),
|
||||||
}
|
}
|
||||||
|
@ -231,11 +231,7 @@ impl ServerWorker {
|
|||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(idx, factory)| {
|
.map(|(idx, factory)| {
|
||||||
let fut = factory.create();
|
let fut = factory.create();
|
||||||
async move {
|
async move { fut.await.map(|(t, s)| (idx, t, s)) }
|
||||||
fut.await.map(|r| {
|
|
||||||
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
@ -248,7 +244,6 @@ impl ServerWorker {
|
|||||||
let services = match res {
|
let services = match res {
|
||||||
Ok(res) => res
|
Ok(res) => res
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flatten()
|
|
||||||
.fold(Vec::new(), |mut services, (factory, token, service)| {
|
.fold(Vec::new(), |mut services, (factory, token, service)| {
|
||||||
assert_eq!(token.0, services.len());
|
assert_eq!(token.0, services.len());
|
||||||
services.push(WorkerService {
|
services.push(WorkerService {
|
||||||
@ -360,7 +355,7 @@ enum WorkerState {
|
|||||||
struct Restart {
|
struct Restart {
|
||||||
factory_id: usize,
|
factory_id: usize,
|
||||||
token: Token,
|
token: Token,
|
||||||
fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
|
fut: LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown keep states necessary for server shutdown:
|
// Shutdown keep states necessary for server shutdown:
|
||||||
@ -440,19 +435,15 @@ impl Future for ServerWorker {
|
|||||||
let factory_id = restart.factory_id;
|
let factory_id = restart.factory_id;
|
||||||
let token = restart.token;
|
let token = restart.token;
|
||||||
|
|
||||||
let service = ready!(restart.fut.as_mut().poll(cx))
|
let (token_new, service) = ready!(restart.fut.as_mut().poll(cx))
|
||||||
.unwrap_or_else(|_| {
|
.unwrap_or_else(|_| {
|
||||||
panic!(
|
panic!(
|
||||||
"Can not restart {:?} service",
|
"Can not restart {:?} service",
|
||||||
this.factories[factory_id].name(token)
|
this.factories[factory_id].name(token)
|
||||||
)
|
)
|
||||||
})
|
});
|
||||||
.into_iter()
|
|
||||||
// Find the same token from vector. There should be only one
|
assert_eq!(token, token_new);
|
||||||
// So the first match would be enough.
|
|
||||||
.find(|(t, _)| *t == token)
|
|
||||||
.map(|(_, service)| service)
|
|
||||||
.expect("No BoxedServerService found");
|
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} has been restarted",
|
"Service {:?} has been restarted",
|
||||||
|
@ -142,57 +142,6 @@ fn test_start() {
|
|||||||
let _ = h.join();
|
let _ = h.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_configure() {
|
|
||||||
let addr1 = unused_addr();
|
|
||||||
let addr2 = unused_addr();
|
|
||||||
let addr3 = unused_addr();
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
let num = Arc::new(AtomicUsize::new(0));
|
|
||||||
let num2 = num.clone();
|
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
|
||||||
let num = num2.clone();
|
|
||||||
let sys = actix_rt::System::new();
|
|
||||||
let srv = sys.block_on(lazy(|_| {
|
|
||||||
Server::build()
|
|
||||||
.disable_signals()
|
|
||||||
.configure(move |cfg| {
|
|
||||||
let num = num.clone();
|
|
||||||
let lst = net::TcpListener::bind(addr3).unwrap();
|
|
||||||
cfg.bind("addr1", addr1)
|
|
||||||
.unwrap()
|
|
||||||
.bind("addr2", addr2)
|
|
||||||
.unwrap()
|
|
||||||
.listen("addr3", lst)
|
|
||||||
.apply(move |rt| {
|
|
||||||
let num = num.clone();
|
|
||||||
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
|
|
||||||
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
|
|
||||||
rt.on_start(lazy(move |_| {
|
|
||||||
let _ = num.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.unwrap()
|
|
||||||
.workers(1)
|
|
||||||
.run()
|
|
||||||
}));
|
|
||||||
|
|
||||||
let _ = tx.send((srv, actix_rt::System::current()));
|
|
||||||
let _ = sys.run();
|
|
||||||
});
|
|
||||||
let (_, sys) = rx.recv().unwrap();
|
|
||||||
thread::sleep(Duration::from_millis(500));
|
|
||||||
|
|
||||||
assert!(net::TcpStream::connect(addr1).is_ok());
|
|
||||||
assert!(net::TcpStream::connect(addr2).is_ok());
|
|
||||||
assert!(net::TcpStream::connect(addr3).is_ok());
|
|
||||||
assert_eq!(num.load(Ordering::Relaxed), 1);
|
|
||||||
sys.stop();
|
|
||||||
let _ = h.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_max_concurrent_connections() {
|
async fn test_max_concurrent_connections() {
|
||||||
// Note:
|
// Note:
|
||||||
@ -305,81 +254,6 @@ async fn test_service_restart() {
|
|||||||
let num_clone = num.clone();
|
let num_clone = num.clone();
|
||||||
let num2_clone = num2.clone();
|
let num2_clone = num2.clone();
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
|
||||||
actix_rt::System::new().block_on(async {
|
|
||||||
let server = Server::build()
|
|
||||||
.backlog(1)
|
|
||||||
.disable_signals()
|
|
||||||
.configure(move |cfg| {
|
|
||||||
let num = num.clone();
|
|
||||||
let num2 = num2.clone();
|
|
||||||
cfg.bind("addr1", addr1)
|
|
||||||
.unwrap()
|
|
||||||
.bind("addr2", addr2)
|
|
||||||
.unwrap()
|
|
||||||
.apply(move |rt| {
|
|
||||||
let num = num.clone();
|
|
||||||
let num2 = num2.clone();
|
|
||||||
rt.service(
|
|
||||||
"addr1",
|
|
||||||
fn_factory(move || {
|
|
||||||
let num = num.clone();
|
|
||||||
async move { Ok::<_, ()>(TestService(num)) }
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
rt.service(
|
|
||||||
"addr2",
|
|
||||||
fn_factory(move || {
|
|
||||||
let num2 = num2.clone();
|
|
||||||
async move { Ok::<_, ()>(TestService(num2)) }
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.unwrap()
|
|
||||||
.workers(1)
|
|
||||||
.run();
|
|
||||||
|
|
||||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
|
||||||
server.await
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
let (server, sys) = rx.recv().unwrap();
|
|
||||||
|
|
||||||
for _ in 0..5 {
|
|
||||||
TcpStream::connect(addr1)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.shutdown()
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
TcpStream::connect(addr2)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.shutdown()
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
|
||||||
|
|
||||||
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
|
||||||
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
|
||||||
|
|
||||||
sys.stop();
|
|
||||||
let _ = server.stop(false);
|
|
||||||
let _ = h.join().unwrap();
|
|
||||||
|
|
||||||
let addr1 = unused_addr();
|
|
||||||
let addr2 = unused_addr();
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
let num = Arc::new(AtomicUsize::new(0));
|
|
||||||
let num2 = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let num_clone = num.clone();
|
|
||||||
let num2_clone = num2.clone();
|
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
let h = thread::spawn(move || {
|
||||||
let num = num.clone();
|
let num = num.clone();
|
||||||
actix_rt::System::new().block_on(async {
|
actix_rt::System::new().block_on(async {
|
||||||
|
Loading…
Reference in New Issue
Block a user