1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-04-18 12:55:19 +02:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Nikolay Kim
ebf8d7fa34 Fix back-pressure handling for concurrent connections 2018-12-21 10:43:18 -08:00
Nikolay Kim
298727dcbd back port bug fixes 2018-12-12 19:01:59 -08:00
11 changed files with 50 additions and 29 deletions

View File

@ -1,5 +1,21 @@
# Changes # Changes
## [0.2.6] - 2018-12-21
### Fixed
* Fix back-pressure handling for concurrent connections
## [0.2.5] - 2018-12-12
### Fixed
* Fix back-pressure for concurrent ssl handshakes
* Drop completed future for .then and .and_then combinators
## [0.2.4] - 2018-11-21 ## [0.2.4] - 2018-11-21
### Added ### Added

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-net" name = "actix-net"
version = "0.2.4" version = "0.2.6"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix net - framework for the compisible network services for Rust (experimental)" description = "Actix net - framework for the compisible network services for Rust (experimental)"
readme = "README.md" readme = "README.md"
@ -58,8 +58,7 @@ tokio-timer = "0.2"
tokio-reactor = "0.1" tokio-reactor = "0.1"
tokio-current-thread = "0.1" tokio-current-thread = "0.1"
tower-service = "0.1" tower-service = "0.1"
trust-dns-proto = "^0.5.0" trust-dns-resolver = "^0.10.2"
trust-dns-resolver = "^0.10.0"
# native-tls # native-tls
native-tls = { version="0.2", optional = true } native-tls = { version="0.2", optional = true }

View File

@ -80,7 +80,8 @@ fn main() {
future::ok(()) future::ok(())
}) })
}, },
).unwrap() )
.unwrap()
.start(); .start();
sys.run(); sys.run();

View File

@ -61,7 +61,8 @@ fn main() {
println!("got ssl connection {:?}", num); println!("got ssl connection {:?}", num);
future::ok(()) future::ok(())
}) })
}).unwrap() })
.unwrap()
.start(); .start();
sys.run(); sys.run();

View File

@ -254,7 +254,8 @@ where
io::ErrorKind::WriteZero, io::ErrorKind::WriteZero,
"failed to \ "failed to \
write frame to transport", write frame to transport",
).into()); )
.into());
} }
// TODO: Add a way to `bytes` to do this w/o returning the drained // TODO: Add a way to `bytes` to do this w/o returning the drained

View File

@ -9,6 +9,7 @@ use futures::task::AtomicTask;
/// Counter could be cloned, total ncount is shared across all clones. /// Counter could be cloned, total ncount is shared across all clones.
pub struct Counter(Rc<CounterInner>); pub struct Counter(Rc<CounterInner>);
#[derive(Debug)]
struct CounterInner { struct CounterInner {
count: Cell<usize>, count: Cell<usize>,
capacity: usize, capacity: usize,
@ -40,6 +41,7 @@ impl Counter {
} }
} }
#[derive(Debug)]
pub struct CounterGuard(Rc<CounterInner>); pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard { impl CounterGuard {
@ -57,11 +59,7 @@ impl Drop for CounterGuard {
impl CounterInner { impl CounterInner {
fn inc(&self) { fn inc(&self) {
let num = self.count.get() + 1; self.count.set(self.count.get() + 1);
self.count.set(num);
if num == self.capacity {
self.task.register();
}
} }
fn dec(&self) { fn dec(&self) {
@ -73,6 +71,10 @@ impl CounterInner {
} }
fn available(&self) -> bool { fn available(&self) -> bool {
self.count.get() < self.capacity let avail = self.count.get() < self.capacity;
if !avail {
self.task.register();
}
avail
} }
} }

View File

@ -9,10 +9,7 @@
#![cfg_attr( #![cfg_attr(
feature = "cargo-clippy", feature = "cargo-clippy",
allow( allow(declare_interior_mutable_const, borrow_interior_mutable_const)
declare_interior_mutable_const,
borrow_interior_mutable_const
)
)] )]
#[macro_use] #[macro_use]

View File

@ -83,9 +83,9 @@ where
}); });
if let Ok(stream) = stream { if let Ok(stream) = stream {
spawn(self.service.call(stream).map_err(|_| ()).map(move |val| { spawn(self.service.call(stream).then(move |res| {
drop(guard); drop(guard);
val res.map_err(|_| ())
})); }));
ok(()) ok(())
} else { } else {
@ -123,9 +123,9 @@ where
} }
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
spawn(self.service.call(req).map_err(|_| ()).map(move |val| { spawn(self.service.call(req).then(move |res| {
drop(guard); drop(guard);
val res.map_err(|_| ())
})); }));
ok(()) ok(())
} }

View File

@ -167,7 +167,8 @@ impl Worker {
.map_err(|e| { .map_err(|e| {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0)); Arbiter::current().do_send(StopArbiter(0));
}).and_then(move |services| { })
.and_then(move |services| {
for item in services { for item in services {
for (idx, token, service) in item { for (idx, token, service) in item {
while token.0 >= wrk.services.len() { while token.0 >= wrk.services.len() {

View File

@ -63,7 +63,7 @@ where
{ {
b: Cell<B>, b: Cell<B>,
fut_b: Option<B::Future>, fut_b: Option<B::Future>,
fut_a: A::Future, fut_a: Option<A::Future>,
} }
impl<A, B> AndThenFuture<A, B> impl<A, B> AndThenFuture<A, B>
@ -71,10 +71,10 @@ where
A: Service, A: Service,
B: Service<Request = A::Response, Error = A::Error>, B: Service<Request = A::Response, Error = A::Error>,
{ {
fn new(fut_a: A::Future, b: Cell<B>) -> Self { fn new(a: A::Future, b: Cell<B>) -> Self {
AndThenFuture { AndThenFuture {
b, b,
fut_a, fut_a: Some(a),
fut_b: None, fut_b: None,
} }
} }
@ -94,8 +94,9 @@ where
return fut.poll(); return fut.poll();
} }
match self.fut_a.poll() { match self.fut_a.as_mut().expect("actix-net bug").poll() {
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.borrow_mut().call(resp)); self.fut_b = Some(self.b.borrow_mut().call(resp));
self.poll() self.poll()
} }

View File

@ -63,7 +63,7 @@ where
{ {
b: Cell<B>, b: Cell<B>,
fut_b: Option<B::Future>, fut_b: Option<B::Future>,
fut_a: A::Future, fut_a: Option<A::Future>,
} }
impl<A, B> ThenFuture<A, B> impl<A, B> ThenFuture<A, B>
@ -71,10 +71,10 @@ where
A: Service, A: Service,
B: Service<Request = Result<A::Response, A::Error>>, B: Service<Request = Result<A::Response, A::Error>>,
{ {
fn new(fut_a: A::Future, b: Cell<B>) -> Self { fn new(a: A::Future, b: Cell<B>) -> Self {
ThenFuture { ThenFuture {
b, b,
fut_a, fut_a: Some(a),
fut_b: None, fut_b: None,
} }
} }
@ -93,12 +93,14 @@ where
return fut.poll(); return fut.poll();
} }
match self.fut_a.poll() { match self.fut_a.as_mut().expect("actix-net bug").poll() {
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.borrow_mut().call(Ok(resp))); self.fut_b = Some(self.b.borrow_mut().call(Ok(resp)));
self.poll() self.poll()
} }
Err(err) => { Err(err) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.borrow_mut().call(Err(err))); self.fut_b = Some(self.b.borrow_mut().call(Err(err)));
self.poll() self.poll()
} }