1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-04 18:06:23 +02:00

Compare commits

...

9 Commits

19 changed files with 154 additions and 104 deletions

View File

@ -1,5 +1,29 @@
# Changes # Changes
## [0.1.3] - 2018-12-21
## Fixed
* Fix max concurrent connections handling
## [0.1.2] - 2018-12-12
## Changed
* rename ServiceConfig::rt() to ServiceConfig::apply()
### Fixed
* Fix back-pressure for concurrent ssl handshakes
## [0.1.1] - 2018-12-11
* Fix signal handling on windows
## [0.1.0] - 2018-12-09 ## [0.1.0] - 2018-12-09
* Move server to separate crate * Move server to separate crate

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "0.1.0" version = "0.1.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server" description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -66,3 +66,6 @@ webpki-roots = { version = "0.15", optional = true }
[dev-dependencies] [dev-dependencies]
env_logger = "0.5" env_logger = "0.5"
actix-service = "0.1.1"
actix-codec = "0.1.0"
actix-rt = "0.1.0"

View File

@ -64,7 +64,7 @@ impl ServerBuilder {
/// Set number of workers to start. /// Set number of workers to start.
/// ///
/// By default server uses number of available logical cpu as threads /// By default server uses number of available logical cpu as workers
/// count. /// count.
pub fn workers(mut self, num: usize) -> Self { pub fn workers(mut self, num: usize) -> Self {
self.threads = num; self.threads = num;
@ -108,8 +108,8 @@ impl ServerBuilder {
self self
} }
/// Run external configuration as part of the server building /// Execute external configuration as part of the server building
/// process /// process.
/// ///
/// This function is useful for moving parts of configuration to a /// This function is useful for moving parts of configuration to a
/// different module or even library. /// different module or even library.
@ -117,17 +117,20 @@ impl ServerBuilder {
where where
F: Fn(&mut ServiceConfig) -> io::Result<()>, F: Fn(&mut ServiceConfig) -> io::Result<()>,
{ {
let mut cfg = ServiceConfig::new(); let mut cfg = ServiceConfig::new(self.threads);
f(&mut cfg)?; f(&mut cfg)?;
let mut srv = ConfiguredService::new(cfg.rt); if let Some(apply) = cfg.apply {
for (name, lst) in cfg.services { let mut srv = ConfiguredService::new(apply);
let token = self.token.next(); for (name, lst) in cfg.services {
srv.stream(token, name); let token = self.token.next();
self.sockets.push((token, lst)); srv.stream(token, name);
self.sockets.push((token, lst));
}
self.services.push(Box::new(srv));
} }
self.services.push(Box::new(srv)); self.threads = cfg.threads;
Ok(self) Ok(self)
} }

View File

@ -15,18 +15,28 @@ use super::services::{
use super::Token; use super::Token;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(super) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
pub(super) rt: Box<ServiceRuntimeConfiguration>, pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>,
pub(crate) threads: usize,
} }
impl ServiceConfig { impl ServiceConfig {
pub(super) fn new() -> ServiceConfig { pub(super) fn new(threads: usize) -> ServiceConfig {
ServiceConfig { ServiceConfig {
threads,
services: Vec::new(), services: Vec::new(),
rt: Box::new(not_configured), apply: None,
} }
} }
/// Set number of workers to start.
///
/// By default server uses number of available logical cpu as workers
/// count.
pub fn workers(&mut self, num: usize) {
self.threads = num;
}
/// Add new service to server /// Add new service to server
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self> pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
where where
@ -43,16 +53,20 @@ impl ServiceConfig {
/// Add new service to server /// Add new service to server
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self { pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
if self.apply.is_none() {
self.apply = Some(Box::new(not_configured));
}
self.services.push((name.as_ref().to_string(), lst)); self.services.push((name.as_ref().to_string(), lst));
self self
} }
/// Register service configuration function /// Register service configuration function. This function get called
pub fn rt<F>(&mut self, f: F) -> io::Result<()> /// during worker runtime configuration. It get executed in worker thread.
pub fn apply<F>(&mut self, f: F) -> io::Result<()>
where where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{ {
self.rt = Box::new(f); self.apply = Some(Box::new(f));
Ok(()) Ok(())
} }
} }

View File

@ -9,6 +9,7 @@ use futures::task::AtomicTask;
/// Counter could be cloned, total ncount is shared across all clones. /// Counter could be cloned, total ncount is shared across all clones.
pub struct Counter(Rc<CounterInner>); pub struct Counter(Rc<CounterInner>);
#[derive(Debug)]
struct CounterInner { struct CounterInner {
count: Cell<usize>, count: Cell<usize>,
capacity: usize, capacity: usize,
@ -40,6 +41,7 @@ impl Counter {
} }
} }
#[derive(Debug)]
pub struct CounterGuard(Rc<CounterInner>); pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard { impl CounterGuard {
@ -57,11 +59,7 @@ impl Drop for CounterGuard {
impl CounterInner { impl CounterInner {
fn inc(&self) { fn inc(&self) {
let num = self.count.get() + 1; self.count.set(self.count.get() + 1);
self.count.set(num);
if num == self.capacity {
self.task.register();
}
} }
fn dec(&self) { fn dec(&self) {
@ -73,6 +71,10 @@ impl CounterInner {
} }
fn available(&self) -> bool { fn available(&self) -> bool {
self.count.get() < self.capacity let avail = self.count.get() < self.capacity;
if !avail {
self.task.register();
}
avail
} }
} }

View File

@ -83,9 +83,9 @@ where
}); });
if let Ok(stream) = stream { if let Ok(stream) = stream {
spawn(self.service.call(stream).map_err(|_| ()).map(move |val| { spawn(self.service.call(stream).then(move |res| {
drop(guard); drop(guard);
val res.map_err(|_| ())
})); }));
ok(()) ok(())
} else { } else {
@ -122,9 +122,9 @@ where
} }
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
spawn(self.service.call(req).map_err(|_| ()).map(move |val| { spawn(self.service.call(req).then(move |res| {
drop(guard); drop(guard);
val res.map_err(|_| ())
})); }));
ok(()) ok(())
} }

View File

@ -34,10 +34,12 @@ impl Signals {
let fut = { let fut = {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
tokio_signal::ctrl_c().and_then(move |stream| Signals { tokio_signal::ctrl_c()
srv, .map_err(|_| ())
stream: Box::new(stream.map(|_| Signal::Int)), .and_then(move |stream| Signals {
}) srv,
stream: Box::new(stream.map(|_| Signal::Int)),
})
} }
#[cfg(unix)] #[cfg(unix)]

View File

@ -398,7 +398,7 @@ impl Future for Worker {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[msg.token.0] let _ = self.services[msg.token.0]
.as_mut() .as_mut()
.expect("actix net bug") .expect("actix-server bug")
.1 .1
.call((Some(guard), ServerMessage::Connect(msg.io))); .call((Some(guard), ServerMessage::Connect(msg.io)));
continue; continue;

View File

@ -1,5 +1,19 @@
# Changes # Changes
## [0.1.3] - 2018-12-12
## Changed
* Split service combinators to separate trait
## [0.1.2] - 2018-12-12
### Fixed
* Release future early for `.and_then()` and `.then()` combinators
## [0.1.1] - 2018-12-09 ## [0.1.1] - 2018-12-09
### Added ### Added

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "0.1.1" version = "0.1.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service" description = "Actix Service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@ -61,7 +61,7 @@ where
{ {
b: Cell<B>, b: Cell<B>,
fut_b: Option<B::Future>, fut_b: Option<B::Future>,
fut_a: A::Future, fut_a: Option<A::Future>,
} }
impl<A, B, Request> AndThenFuture<A, B, Request> impl<A, B, Request> AndThenFuture<A, B, Request>
@ -69,10 +69,10 @@ where
A: Service<Request>, A: Service<Request>,
B: Service<A::Response, Error = A::Error>, B: Service<A::Response, Error = A::Error>,
{ {
fn new(fut_a: A::Future, b: Cell<B>) -> Self { fn new(a: A::Future, b: Cell<B>) -> Self {
AndThenFuture { AndThenFuture {
b, b,
fut_a, fut_a: Some(a),
fut_b: None, fut_b: None,
} }
} }
@ -91,8 +91,9 @@ where
return fut.poll(); return fut.poll();
} }
match self.fut_a.poll() { match self.fut_a.as_mut().expect("Bug in actix-service").poll() {
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.get_mut().call(resp)); self.fut_b = Some(self.b.get_mut().call(resp));
self.poll() self.poll()
} }
@ -218,7 +219,7 @@ mod tests {
use std::rc::Rc; use std::rc::Rc;
use super::*; use super::*;
use crate::{NewService, Service}; use crate::{NewService, Service, ServiceExt};
struct Srv1(Rc<Cell<usize>>); struct Srv1(Rc<Cell<usize>>);
impl Service<&'static str> for Srv1 { impl Service<&'static str> for Srv1 {

View File

@ -171,7 +171,7 @@ mod tests {
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use crate::{IntoNewService, IntoService, NewService, Service}; use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt};
#[derive(Clone)] #[derive(Clone)]
struct Srv; struct Srv;

View File

@ -159,7 +159,7 @@ mod tests {
use futures::future::{err, FutureResult}; use futures::future::{err, FutureResult};
use super::*; use super::*;
use crate::{IntoNewService, NewService, Service}; use crate::{IntoNewService, NewService, Service, ServiceExt};
struct Srv; struct Srv;
impl Service<()> for Srv { impl Service<()> for Srv {

View File

@ -50,7 +50,11 @@ pub trait Service<Request> {
/// Calling `call` without calling `poll_ready` is permitted. The /// Calling `call` without calling `poll_ready` is permitted. The
/// implementation must be resilient to this fact. /// implementation must be resilient to this fact.
fn call(&mut self, req: Request) -> Self::Future; fn call(&mut self, req: Request) -> Self::Future;
}
/// An extension trait for `Service`s that provides a variety of convenient
/// adapters
pub trait ServiceExt<Request>: Service<Request> {
/// Apply function to specified service and use it as a next service in /// Apply function to specified service and use it as a next service in
/// chain. /// chain.
fn apply<T, I, F, Out, Req>( fn apply<T, I, F, Out, Req>(
@ -146,6 +150,8 @@ pub trait Service<Request> {
} }
} }
impl<T: ?Sized, Request> ServiceExt<Request> for T where T: Service<Request> {}
/// Creates new `Service` values. /// Creates new `Service` values.
/// ///
/// Acts as a service factory. This is useful for cases where new `Service` /// Acts as a service factory. This is useful for cases where new `Service`

View File

@ -189,7 +189,7 @@ mod tests {
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use super::*; use super::*;
use crate::{IntoNewService, Service}; use crate::{IntoNewService, Service, ServiceExt};
struct Srv; struct Srv;
impl Service<()> for Srv { impl Service<()> for Srv {

View File

@ -190,7 +190,7 @@ mod tests {
use futures::future::{err, FutureResult}; use futures::future::{err, FutureResult};
use super::*; use super::*;
use crate::{IntoNewService, NewService, Service}; use crate::{IntoNewService, NewService, Service, ServiceExt};
struct Srv; struct Srv;

View File

@ -61,7 +61,7 @@ where
{ {
b: Cell<B>, b: Cell<B>,
fut_b: Option<B::Future>, fut_b: Option<B::Future>,
fut_a: A::Future, fut_a: Option<A::Future>,
} }
impl<A, B, Request> ThenFuture<A, B, Request> impl<A, B, Request> ThenFuture<A, B, Request>
@ -69,10 +69,10 @@ where
A: Service<Request>, A: Service<Request>,
B: Service<Result<A::Response, A::Error>>, B: Service<Result<A::Response, A::Error>>,
{ {
fn new(fut_a: A::Future, b: Cell<B>) -> Self { fn new(a: A::Future, b: Cell<B>) -> Self {
ThenFuture { ThenFuture {
b, b,
fut_a, fut_a: Some(a),
fut_b: None, fut_b: None,
} }
} }
@ -91,12 +91,14 @@ where
return fut.poll(); return fut.poll();
} }
match self.fut_a.poll() { match self.fut_a.as_mut().expect("bug in actix-service").poll() {
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.get_mut().call(Ok(resp))); self.fut_b = Some(self.b.get_mut().call(Ok(resp)));
self.poll() self.poll()
} }
Err(err) => { Err(err) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.get_mut().call(Err(err))); self.fut_b = Some(self.b.get_mut().call(Err(err)));
self.poll() self.poll()
} }
@ -221,11 +223,11 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::future::{err, ok, FutureResult}; use futures::future::{err, ok, FutureResult};
use futures::{Async, Poll}; use futures::{Async, Future, Poll};
use std::cell::Cell; use std::cell::Cell;
use std::rc::Rc; use std::rc::Rc;
use super::*; use crate::{IntoNewService, NewService, Service, ServiceExt};
#[derive(Clone)] #[derive(Clone)]
struct Srv1(Rc<Cell<usize>>); struct Srv1(Rc<Cell<usize>>);

View File

@ -21,8 +21,7 @@ path = "src/lib.rs"
actix-service = "0.1.1" actix-service = "0.1.1"
actix-codec = "0.1.0" actix-codec = "0.1.0"
actix-rt = "0.1.0" actix-rt = "0.1.0"
# io
bytes = "0.4" bytes = "0.4"
futures = "0.1" futures = "0.1"
tokio-timer = "0.2.8" tokio-timer = "0.2.8"
log = "0.4"

View File

@ -7,7 +7,8 @@ use actix_rt::Arbiter;
use actix_service::{IntoNewService, IntoService, NewService, Service}; use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use futures::unsync::mpsc; use futures::unsync::mpsc;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::{Async, Future, Poll, Sink, Stream};
use log::debug;
type Request<U> = <U as Decoder>::Item; type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item; type Response<U> = <U as Encoder>::Item;
@ -179,10 +180,8 @@ where
state: TransportState<S, U>, state: TransportState<S, U>,
framed: Framed<T, U>, framed: Framed<T, U>,
request: Option<Request<U>>, request: Option<Request<U>>,
response: Option<Response<U>>,
write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>, write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>,
write_tx: mpsc::Sender<Result<Response<U>, S::Error>>, write_tx: mpsc::Sender<Result<Response<U>, S::Error>>,
flushed: bool,
} }
enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> { enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> {
@ -210,8 +209,6 @@ where
service: service.into_service(), service: service.into_service(),
state: TransportState::Processing, state: TransportState::Processing,
request: None, request: None,
response: None,
flushed: true,
} }
} }
@ -247,7 +244,7 @@ where
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: 'static, <U as Encoder>::Error: std::fmt::Debug + 'static,
{ {
fn poll_service(&mut self) -> bool { fn poll_service(&mut self) -> bool {
match self.service.poll_ready() { match self.service.poll_ready() {
@ -307,59 +304,42 @@ where
/// write to sink /// write to sink
fn poll_response(&mut self) -> bool { fn poll_response(&mut self) -> bool {
let mut item = self.response.take();
loop { loop {
item = if let Some(msg) = item { while !self.framed.is_write_buf_full() {
self.flushed = false; match self.write_rx.poll() {
match self.framed.start_send(msg) { Ok(Async::Ready(Some(msg))) => match msg {
Ok(AsyncSink::Ready) => None, Ok(msg) => {
Ok(AsyncSink::NotReady(item)) => Some(item), if let Err(err) = self.framed.force_send(msg) {
Err(err) => { self.state = TransportState::EncoderError(
self.state = FramedTransportError::Encoder(err),
TransportState::EncoderError(FramedTransportError::Encoder(err)); );
return true; return true;
} }
} }
} else { Err(err) => {
None self.state =
}; TransportState::Error(FramedTransportError::Service(err));
return true;
// flush sink }
if !self.flushed { },
match self.framed.poll_complete() {
Ok(Async::Ready(_)) => {
self.flushed = true;
}
Ok(Async::NotReady) => break, Ok(Async::NotReady) => break,
Err(err) => { Err(_) => panic!("Bug in actix-net code"),
self.state = Ok(Async::Ready(None)) => panic!("Bug in actix-net code"),
TransportState::EncoderError(FramedTransportError::Encoder(err));
return true;
}
} }
} }
// check channel if !self.framed.is_write_buf_empty() {
if self.flushed { match self.framed.poll_complete() {
if item.is_none() { Ok(Async::NotReady) => break,
match self.write_rx.poll() { Err(err) => {
Ok(Async::Ready(Some(msg))) => match msg { debug!("Error sending data: {:?}", err);
Ok(msg) => item = Some(msg), self.state =
Err(err) => { TransportState::EncoderError(FramedTransportError::Encoder(err));
self.state = return true;
TransportState::Error(FramedTransportError::Service(err));
return true;
}
},
Ok(Async::NotReady) => break,
Err(_) => panic!("Bug in gw code"),
Ok(Async::Ready(None)) => panic!("Bug in gw code"),
} }
} else { Ok(Async::Ready(_)) => (),
continue;
} }
} else { } else {
self.response = item;
break; break;
} }
} }
@ -376,7 +356,7 @@ where
S::Future: 'static, S::Future: 'static,
S::Error: 'static, S::Error: 'static,
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: 'static, <U as Encoder>::Error: std::fmt::Debug + 'static,
{ {
type Item = (); type Item = ();
type Error = FramedTransportError<S::Error, U>; type Error = FramedTransportError<S::Error, U>;
@ -391,7 +371,7 @@ where
} }
} }
TransportState::Error(err) => { TransportState::Error(err) => {
if self.poll_response() || self.flushed { if self.poll_response() || !self.framed.is_write_buf_empty() {
Err(err) Err(err)
} else { } else {
self.state = TransportState::Error(err); self.state = TransportState::Error(err);