1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-14 07:40:31 +02:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Nikolay Kim
e8a1664c15 prepare release 2018-12-12 14:24:46 -08:00
Nikolay Kim
d1bfae7414 fix backpressure for ssl concurrent handshakes 2018-12-12 14:24:24 -08:00
Nikolay Kim
5ca00dc798 rename ServiceConfig::rt to ServiceConfig::apply 2018-12-12 14:16:16 -08:00
6 changed files with 59 additions and 25 deletions

View File

@@ -1,5 +1,17 @@
# Changes # Changes
## [0.1.2] - 2018-12-12
## Changed
* rename ServiceConfig::rt() to ServiceConfig::apply()
### Fixed
* Fix back-pressure for concurrent ssl handshakes
## [0.1.1] - 2018-12-11 ## [0.1.1] - 2018-12-11
* Fix signal handling on windows * Fix signal handling on windows

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "0.1.1" version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server" description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@@ -66,3 +66,6 @@ webpki-roots = { version = "0.15", optional = true }
[dev-dependencies] [dev-dependencies]
env_logger = "0.5" env_logger = "0.5"
actix-service = "0.1.1"
actix-codec = "0.1.0"
actix-rt = "0.1.0"

View File

@@ -64,7 +64,7 @@ impl ServerBuilder {
/// Set number of workers to start. /// Set number of workers to start.
/// ///
/// By default server uses number of available logical cpu as threads /// By default server uses number of available logical cpu as workers
/// count. /// count.
pub fn workers(mut self, num: usize) -> Self { pub fn workers(mut self, num: usize) -> Self {
self.threads = num; self.threads = num;
@@ -108,8 +108,8 @@ impl ServerBuilder {
self self
} }
/// Run external configuration as part of the server building /// Execute external configuration as part of the server building
/// process /// process.
/// ///
/// This function is useful for moving parts of configuration to a /// This function is useful for moving parts of configuration to a
/// different module or even library. /// different module or even library.
@@ -117,17 +117,20 @@ impl ServerBuilder {
where where
F: Fn(&mut ServiceConfig) -> io::Result<()>, F: Fn(&mut ServiceConfig) -> io::Result<()>,
{ {
let mut cfg = ServiceConfig::new(); let mut cfg = ServiceConfig::new(self.threads);
f(&mut cfg)?; f(&mut cfg)?;
let mut srv = ConfiguredService::new(cfg.rt); if let Some(apply) = cfg.apply {
for (name, lst) in cfg.services { let mut srv = ConfiguredService::new(apply);
let token = self.token.next(); for (name, lst) in cfg.services {
srv.stream(token, name); let token = self.token.next();
self.sockets.push((token, lst)); srv.stream(token, name);
self.sockets.push((token, lst));
}
self.services.push(Box::new(srv));
} }
self.services.push(Box::new(srv)); self.threads = cfg.threads;
Ok(self) Ok(self)
} }

View File

@@ -15,18 +15,28 @@ use super::services::{
use super::Token; use super::Token;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(super) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
pub(super) rt: Box<ServiceRuntimeConfiguration>, pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>,
pub(crate) threads: usize,
} }
impl ServiceConfig { impl ServiceConfig {
pub(super) fn new() -> ServiceConfig { pub(super) fn new(threads: usize) -> ServiceConfig {
ServiceConfig { ServiceConfig {
threads,
services: Vec::new(), services: Vec::new(),
rt: Box::new(not_configured), apply: None,
} }
} }
/// Set number of workers to start.
///
/// By default server uses number of available logical cpu as workers
/// count.
pub fn workers(&mut self, num: usize) {
self.threads = num;
}
/// Add new service to server /// Add new service to server
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self> pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
where where
@@ -43,16 +53,20 @@ impl ServiceConfig {
/// Add new service to server /// Add new service to server
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self { pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
if self.apply.is_none() {
self.apply = Some(Box::new(not_configured));
}
self.services.push((name.as_ref().to_string(), lst)); self.services.push((name.as_ref().to_string(), lst));
self self
} }
/// Register service configuration function /// Register service configuration function. This function get called
pub fn rt<F>(&mut self, f: F) -> io::Result<()> /// during worker runtime configuration. It get executed in worker thread.
pub fn apply<F>(&mut self, f: F) -> io::Result<()>
where where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{ {
self.rt = Box::new(f); self.apply = Some(Box::new(f));
Ok(()) Ok(())
} }
} }

View File

@@ -9,6 +9,7 @@ use futures::task::AtomicTask;
/// Counter could be cloned, total ncount is shared across all clones. /// Counter could be cloned, total ncount is shared across all clones.
pub struct Counter(Rc<CounterInner>); pub struct Counter(Rc<CounterInner>);
#[derive(Debug)]
struct CounterInner { struct CounterInner {
count: Cell<usize>, count: Cell<usize>,
capacity: usize, capacity: usize,
@@ -40,6 +41,7 @@ impl Counter {
} }
} }
#[derive(Debug)]
pub struct CounterGuard(Rc<CounterInner>); pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard { impl CounterGuard {
@@ -57,11 +59,7 @@ impl Drop for CounterGuard {
impl CounterInner { impl CounterInner {
fn inc(&self) { fn inc(&self) {
let num = self.count.get() + 1; self.count.set(self.count.get() + 1);
self.count.set(num);
if num == self.capacity {
self.task.register();
}
} }
fn dec(&self) { fn dec(&self) {
@@ -73,6 +71,10 @@ impl CounterInner {
} }
fn available(&self) -> bool { fn available(&self) -> bool {
self.count.get() < self.capacity let avail = self.count.get() < self.capacity;
if !avail {
self.task.register();
}
avail
} }
} }

View File

@@ -398,7 +398,7 @@ impl Future for Worker {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[msg.token.0] let _ = self.services[msg.token.0]
.as_mut() .as_mut()
.expect("actix net bug") .expect("actix-server bug")
.1 .1
.call((Some(guard), ServerMessage::Connect(msg.io))); .call((Some(guard), ServerMessage::Connect(msg.io)));
continue; continue;