1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-04-10 23:42:54 +02:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Nikolay Kim
ebf8d7fa34 Fix back-pressure handling for concurrent connections 2018-12-21 10:43:18 -08:00
Nikolay Kim
298727dcbd back port bug fixes 2018-12-12 19:01:59 -08:00
11 changed files with 50 additions and 29 deletions

View File

@ -1,5 +1,21 @@
# Changes
## [0.2.6] - 2018-12-21
### Fixed
* Fix back-pressure handling for concurrent connections
## [0.2.5] - 2018-12-12
### Fixed
* Fix back-pressure for concurrent ssl handshakes
* Drop completed future for .then and .and_then combinators
## [0.2.4] - 2018-11-21
### Added

View File

@ -1,6 +1,6 @@
[package]
name = "actix-net"
version = "0.2.4"
version = "0.2.6"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix net - framework for the compisible network services for Rust (experimental)"
readme = "README.md"
@ -58,8 +58,7 @@ tokio-timer = "0.2"
tokio-reactor = "0.1"
tokio-current-thread = "0.1"
tower-service = "0.1"
trust-dns-proto = "^0.5.0"
trust-dns-resolver = "^0.10.0"
trust-dns-resolver = "^0.10.2"
# native-tls
native-tls = { version="0.2", optional = true }

View File

@ -80,7 +80,8 @@ fn main() {
future::ok(())
})
},
).unwrap()
)
.unwrap()
.start();
sys.run();

View File

@ -61,7 +61,8 @@ fn main() {
println!("got ssl connection {:?}", num);
future::ok(())
})
}).unwrap()
})
.unwrap()
.start();
sys.run();

View File

@ -254,7 +254,8 @@ where
io::ErrorKind::WriteZero,
"failed to \
write frame to transport",
).into());
)
.into());
}
// TODO: Add a way to `bytes` to do this w/o returning the drained

View File

@ -9,6 +9,7 @@ use futures::task::AtomicTask;
/// Counter could be cloned, total ncount is shared across all clones.
pub struct Counter(Rc<CounterInner>);
#[derive(Debug)]
struct CounterInner {
count: Cell<usize>,
capacity: usize,
@ -40,6 +41,7 @@ impl Counter {
}
}
#[derive(Debug)]
pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard {
@ -57,11 +59,7 @@ impl Drop for CounterGuard {
impl CounterInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.capacity {
self.task.register();
}
self.count.set(self.count.get() + 1);
}
fn dec(&self) {
@ -73,6 +71,10 @@ impl CounterInner {
}
fn available(&self) -> bool {
self.count.get() < self.capacity
let avail = self.count.get() < self.capacity;
if !avail {
self.task.register();
}
avail
}
}

View File

@ -9,10 +9,7 @@
#![cfg_attr(
feature = "cargo-clippy",
allow(
declare_interior_mutable_const,
borrow_interior_mutable_const
)
allow(declare_interior_mutable_const, borrow_interior_mutable_const)
)]
#[macro_use]

View File

@ -83,9 +83,9 @@ where
});
if let Ok(stream) = stream {
spawn(self.service.call(stream).map_err(|_| ()).map(move |val| {
spawn(self.service.call(stream).then(move |res| {
drop(guard);
val
res.map_err(|_| ())
}));
ok(())
} else {
@ -123,9 +123,9 @@ where
}
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
spawn(self.service.call(req).map_err(|_| ()).map(move |val| {
spawn(self.service.call(req).then(move |res| {
drop(guard);
val
res.map_err(|_| ())
}));
ok(())
}

View File

@ -167,7 +167,8 @@ impl Worker {
.map_err(|e| {
error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0));
}).and_then(move |services| {
})
.and_then(move |services| {
for item in services {
for (idx, token, service) in item {
while token.0 >= wrk.services.len() {

View File

@ -63,7 +63,7 @@ where
{
b: Cell<B>,
fut_b: Option<B::Future>,
fut_a: A::Future,
fut_a: Option<A::Future>,
}
impl<A, B> AndThenFuture<A, B>
@ -71,10 +71,10 @@ where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
fn new(a: A::Future, b: Cell<B>) -> Self {
AndThenFuture {
b,
fut_a,
fut_a: Some(a),
fut_b: None,
}
}
@ -94,8 +94,9 @@ where
return fut.poll();
}
match self.fut_a.poll() {
match self.fut_a.as_mut().expect("actix-net bug").poll() {
Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.borrow_mut().call(resp));
self.poll()
}

View File

@ -63,7 +63,7 @@ where
{
b: Cell<B>,
fut_b: Option<B::Future>,
fut_a: A::Future,
fut_a: Option<A::Future>,
}
impl<A, B> ThenFuture<A, B>
@ -71,10 +71,10 @@ where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
fn new(a: A::Future, b: Cell<B>) -> Self {
ThenFuture {
b,
fut_a,
fut_a: Some(a),
fut_b: None,
}
}
@ -93,12 +93,14 @@ where
return fut.poll();
}
match self.fut_a.poll() {
match self.fut_a.as_mut().expect("actix-net bug").poll() {
Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.borrow_mut().call(Ok(resp)));
self.poll()
}
Err(err) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.borrow_mut().call(Err(err)));
self.poll()
}