mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-14 00:48:23 +02:00
Compare commits
6 Commits
server-0.1
...
service-v0
Author | SHA1 | Date | |
---|---|---|---|
|
bf9bd97173 | ||
|
61939c7af2 | ||
|
e8a1664c15 | ||
|
d1bfae7414 | ||
|
5ca00dc798 | ||
|
fd3e77ea83 |
@@ -1,5 +1,22 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.2] - 2018-12-12
|
||||||
|
|
||||||
|
## Changed
|
||||||
|
|
||||||
|
* rename ServiceConfig::rt() to ServiceConfig::apply()
|
||||||
|
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix back-pressure for concurrent ssl handshakes
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.1] - 2018-12-11
|
||||||
|
|
||||||
|
* Fix signal handling on windows
|
||||||
|
|
||||||
|
|
||||||
## [0.1.0] - 2018-12-09
|
## [0.1.0] - 2018-12-09
|
||||||
|
|
||||||
* Move server to separate crate
|
* Move server to separate crate
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-server"
|
name = "actix-server"
|
||||||
version = "0.1.0"
|
version = "0.1.2"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix server - General purpose tcp server"
|
description = "Actix server - General purpose tcp server"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
@@ -66,3 +66,6 @@ webpki-roots = { version = "0.15", optional = true }
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
env_logger = "0.5"
|
env_logger = "0.5"
|
||||||
|
actix-service = "0.1.1"
|
||||||
|
actix-codec = "0.1.0"
|
||||||
|
actix-rt = "0.1.0"
|
||||||
|
@@ -64,7 +64,7 @@ impl ServerBuilder {
|
|||||||
|
|
||||||
/// Set number of workers to start.
|
/// Set number of workers to start.
|
||||||
///
|
///
|
||||||
/// By default server uses number of available logical cpu as threads
|
/// By default server uses number of available logical cpu as workers
|
||||||
/// count.
|
/// count.
|
||||||
pub fn workers(mut self, num: usize) -> Self {
|
pub fn workers(mut self, num: usize) -> Self {
|
||||||
self.threads = num;
|
self.threads = num;
|
||||||
@@ -108,8 +108,8 @@ impl ServerBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run external configuration as part of the server building
|
/// Execute external configuration as part of the server building
|
||||||
/// process
|
/// process.
|
||||||
///
|
///
|
||||||
/// This function is useful for moving parts of configuration to a
|
/// This function is useful for moving parts of configuration to a
|
||||||
/// different module or even library.
|
/// different module or even library.
|
||||||
@@ -117,17 +117,20 @@ impl ServerBuilder {
|
|||||||
where
|
where
|
||||||
F: Fn(&mut ServiceConfig) -> io::Result<()>,
|
F: Fn(&mut ServiceConfig) -> io::Result<()>,
|
||||||
{
|
{
|
||||||
let mut cfg = ServiceConfig::new();
|
let mut cfg = ServiceConfig::new(self.threads);
|
||||||
|
|
||||||
f(&mut cfg)?;
|
f(&mut cfg)?;
|
||||||
|
|
||||||
let mut srv = ConfiguredService::new(cfg.rt);
|
if let Some(apply) = cfg.apply {
|
||||||
for (name, lst) in cfg.services {
|
let mut srv = ConfiguredService::new(apply);
|
||||||
let token = self.token.next();
|
for (name, lst) in cfg.services {
|
||||||
srv.stream(token, name);
|
let token = self.token.next();
|
||||||
self.sockets.push((token, lst));
|
srv.stream(token, name);
|
||||||
|
self.sockets.push((token, lst));
|
||||||
|
}
|
||||||
|
self.services.push(Box::new(srv));
|
||||||
}
|
}
|
||||||
self.services.push(Box::new(srv));
|
self.threads = cfg.threads;
|
||||||
|
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
@@ -15,18 +15,28 @@ use super::services::{
|
|||||||
use super::Token;
|
use super::Token;
|
||||||
|
|
||||||
pub struct ServiceConfig {
|
pub struct ServiceConfig {
|
||||||
pub(super) services: Vec<(String, net::TcpListener)>,
|
pub(crate) services: Vec<(String, net::TcpListener)>,
|
||||||
pub(super) rt: Box<ServiceRuntimeConfiguration>,
|
pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>,
|
||||||
|
pub(crate) threads: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceConfig {
|
impl ServiceConfig {
|
||||||
pub(super) fn new() -> ServiceConfig {
|
pub(super) fn new(threads: usize) -> ServiceConfig {
|
||||||
ServiceConfig {
|
ServiceConfig {
|
||||||
|
threads,
|
||||||
services: Vec::new(),
|
services: Vec::new(),
|
||||||
rt: Box::new(not_configured),
|
apply: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set number of workers to start.
|
||||||
|
///
|
||||||
|
/// By default server uses number of available logical cpu as workers
|
||||||
|
/// count.
|
||||||
|
pub fn workers(&mut self, num: usize) {
|
||||||
|
self.threads = num;
|
||||||
|
}
|
||||||
|
|
||||||
/// Add new service to server
|
/// Add new service to server
|
||||||
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
|
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
|
||||||
where
|
where
|
||||||
@@ -43,16 +53,20 @@ impl ServiceConfig {
|
|||||||
|
|
||||||
/// Add new service to server
|
/// Add new service to server
|
||||||
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
|
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
|
||||||
|
if self.apply.is_none() {
|
||||||
|
self.apply = Some(Box::new(not_configured));
|
||||||
|
}
|
||||||
self.services.push((name.as_ref().to_string(), lst));
|
self.services.push((name.as_ref().to_string(), lst));
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register service configuration function
|
/// Register service configuration function. This function get called
|
||||||
pub fn rt<F>(&mut self, f: F) -> io::Result<()>
|
/// during worker runtime configuration. It get executed in worker thread.
|
||||||
|
pub fn apply<F>(&mut self, f: F) -> io::Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
||||||
{
|
{
|
||||||
self.rt = Box::new(f);
|
self.apply = Some(Box::new(f));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -9,6 +9,7 @@ use futures::task::AtomicTask;
|
|||||||
/// Counter could be cloned, total ncount is shared across all clones.
|
/// Counter could be cloned, total ncount is shared across all clones.
|
||||||
pub struct Counter(Rc<CounterInner>);
|
pub struct Counter(Rc<CounterInner>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct CounterInner {
|
struct CounterInner {
|
||||||
count: Cell<usize>,
|
count: Cell<usize>,
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
@@ -40,6 +41,7 @@ impl Counter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct CounterGuard(Rc<CounterInner>);
|
pub struct CounterGuard(Rc<CounterInner>);
|
||||||
|
|
||||||
impl CounterGuard {
|
impl CounterGuard {
|
||||||
@@ -57,11 +59,7 @@ impl Drop for CounterGuard {
|
|||||||
|
|
||||||
impl CounterInner {
|
impl CounterInner {
|
||||||
fn inc(&self) {
|
fn inc(&self) {
|
||||||
let num = self.count.get() + 1;
|
self.count.set(self.count.get() + 1);
|
||||||
self.count.set(num);
|
|
||||||
if num == self.capacity {
|
|
||||||
self.task.register();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dec(&self) {
|
fn dec(&self) {
|
||||||
@@ -73,6 +71,10 @@ impl CounterInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn available(&self) -> bool {
|
fn available(&self) -> bool {
|
||||||
self.count.get() < self.capacity
|
let avail = self.count.get() < self.capacity;
|
||||||
|
if !avail {
|
||||||
|
self.task.register();
|
||||||
|
}
|
||||||
|
avail
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -34,10 +34,12 @@ impl Signals {
|
|||||||
let fut = {
|
let fut = {
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
{
|
{
|
||||||
tokio_signal::ctrl_c().and_then(move |stream| Signals {
|
tokio_signal::ctrl_c()
|
||||||
srv,
|
.map_err(|_| ())
|
||||||
stream: Box::new(stream.map(|_| Signal::Int)),
|
.and_then(move |stream| Signals {
|
||||||
})
|
srv,
|
||||||
|
stream: Box::new(stream.map(|_| Signal::Int)),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
|
@@ -398,7 +398,7 @@ impl Future for Worker {
|
|||||||
let guard = self.conns.get();
|
let guard = self.conns.get();
|
||||||
let _ = self.services[msg.token.0]
|
let _ = self.services[msg.token.0]
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.expect("actix net bug")
|
.expect("actix-server bug")
|
||||||
.1
|
.1
|
||||||
.call((Some(guard), ServerMessage::Connect(msg.io)));
|
.call((Some(guard), ServerMessage::Connect(msg.io)));
|
||||||
continue;
|
continue;
|
||||||
|
@@ -1,5 +1,19 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.1.3] - 2018-12-12
|
||||||
|
|
||||||
|
## Changed
|
||||||
|
|
||||||
|
* Split service combinators to separate trait
|
||||||
|
|
||||||
|
|
||||||
|
## [0.1.2] - 2018-12-12
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Release future early for `.and_then()` and `.then()` combinators
|
||||||
|
|
||||||
|
|
||||||
## [0.1.1] - 2018-12-09
|
## [0.1.1] - 2018-12-09
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-service"
|
name = "actix-service"
|
||||||
version = "0.1.1"
|
version = "0.1.3"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix Service"
|
description = "Actix Service"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
|
@@ -61,7 +61,7 @@ where
|
|||||||
{
|
{
|
||||||
b: Cell<B>,
|
b: Cell<B>,
|
||||||
fut_b: Option<B::Future>,
|
fut_b: Option<B::Future>,
|
||||||
fut_a: A::Future,
|
fut_a: Option<A::Future>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B, Request> AndThenFuture<A, B, Request>
|
impl<A, B, Request> AndThenFuture<A, B, Request>
|
||||||
@@ -69,10 +69,10 @@ where
|
|||||||
A: Service<Request>,
|
A: Service<Request>,
|
||||||
B: Service<A::Response, Error = A::Error>,
|
B: Service<A::Response, Error = A::Error>,
|
||||||
{
|
{
|
||||||
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
|
fn new(a: A::Future, b: Cell<B>) -> Self {
|
||||||
AndThenFuture {
|
AndThenFuture {
|
||||||
b,
|
b,
|
||||||
fut_a,
|
fut_a: Some(a),
|
||||||
fut_b: None,
|
fut_b: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -91,8 +91,9 @@ where
|
|||||||
return fut.poll();
|
return fut.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.fut_a.poll() {
|
match self.fut_a.as_mut().expect("Bug in actix-service").poll() {
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.get_mut().call(resp));
|
self.fut_b = Some(self.b.get_mut().call(resp));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
|
@@ -50,7 +50,11 @@ pub trait Service<Request> {
|
|||||||
/// Calling `call` without calling `poll_ready` is permitted. The
|
/// Calling `call` without calling `poll_ready` is permitted. The
|
||||||
/// implementation must be resilient to this fact.
|
/// implementation must be resilient to this fact.
|
||||||
fn call(&mut self, req: Request) -> Self::Future;
|
fn call(&mut self, req: Request) -> Self::Future;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An extension trait for `Service`s that provides a variety of convenient
|
||||||
|
/// adapters
|
||||||
|
pub trait ServiceExt<Request>: Service<Request> {
|
||||||
/// Apply function to specified service and use it as a next service in
|
/// Apply function to specified service and use it as a next service in
|
||||||
/// chain.
|
/// chain.
|
||||||
fn apply<T, I, F, Out, Req>(
|
fn apply<T, I, F, Out, Req>(
|
||||||
@@ -146,6 +150,8 @@ pub trait Service<Request> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}
|
||||||
|
|
||||||
/// Creates new `Service` values.
|
/// Creates new `Service` values.
|
||||||
///
|
///
|
||||||
/// Acts as a service factory. This is useful for cases where new `Service`
|
/// Acts as a service factory. This is useful for cases where new `Service`
|
||||||
|
@@ -61,7 +61,7 @@ where
|
|||||||
{
|
{
|
||||||
b: Cell<B>,
|
b: Cell<B>,
|
||||||
fut_b: Option<B::Future>,
|
fut_b: Option<B::Future>,
|
||||||
fut_a: A::Future,
|
fut_a: Option<A::Future>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B, Request> ThenFuture<A, B, Request>
|
impl<A, B, Request> ThenFuture<A, B, Request>
|
||||||
@@ -69,10 +69,10 @@ where
|
|||||||
A: Service<Request>,
|
A: Service<Request>,
|
||||||
B: Service<Result<A::Response, A::Error>>,
|
B: Service<Result<A::Response, A::Error>>,
|
||||||
{
|
{
|
||||||
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
|
fn new(a: A::Future, b: Cell<B>) -> Self {
|
||||||
ThenFuture {
|
ThenFuture {
|
||||||
b,
|
b,
|
||||||
fut_a,
|
fut_a: Some(a),
|
||||||
fut_b: None,
|
fut_b: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -91,12 +91,14 @@ where
|
|||||||
return fut.poll();
|
return fut.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.fut_a.poll() {
|
match self.fut_a.as_mut().expect("bug in actix-service").poll() {
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.get_mut().call(Ok(resp)));
|
self.fut_b = Some(self.b.get_mut().call(Ok(resp)));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.get_mut().call(Err(err)));
|
self.fut_b = Some(self.b.get_mut().call(Err(err)));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user