mirror of
https://github.com/fafhrd91/actix-net
synced 2025-04-18 12:55:19 +02:00
Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
ebf8d7fa34 | ||
|
298727dcbd |
16
CHANGES.md
16
CHANGES.md
@ -1,5 +1,21 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.2.6] - 2018-12-21
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix back-pressure handling for concurrent connections
|
||||||
|
|
||||||
|
|
||||||
|
## [0.2.5] - 2018-12-12
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fix back-pressure for concurrent ssl handshakes
|
||||||
|
|
||||||
|
* Drop completed future for .then and .and_then combinators
|
||||||
|
|
||||||
|
|
||||||
## [0.2.4] - 2018-11-21
|
## [0.2.4] - 2018-11-21
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-net"
|
name = "actix-net"
|
||||||
version = "0.2.4"
|
version = "0.2.6"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix net - framework for the compisible network services for Rust (experimental)"
|
description = "Actix net - framework for the compisible network services for Rust (experimental)"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@ -58,8 +58,7 @@ tokio-timer = "0.2"
|
|||||||
tokio-reactor = "0.1"
|
tokio-reactor = "0.1"
|
||||||
tokio-current-thread = "0.1"
|
tokio-current-thread = "0.1"
|
||||||
tower-service = "0.1"
|
tower-service = "0.1"
|
||||||
trust-dns-proto = "^0.5.0"
|
trust-dns-resolver = "^0.10.2"
|
||||||
trust-dns-resolver = "^0.10.0"
|
|
||||||
|
|
||||||
# native-tls
|
# native-tls
|
||||||
native-tls = { version="0.2", optional = true }
|
native-tls = { version="0.2", optional = true }
|
||||||
|
@ -80,7 +80,8 @@ fn main() {
|
|||||||
future::ok(())
|
future::ok(())
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
).unwrap()
|
)
|
||||||
|
.unwrap()
|
||||||
.start();
|
.start();
|
||||||
|
|
||||||
sys.run();
|
sys.run();
|
||||||
|
@ -61,7 +61,8 @@ fn main() {
|
|||||||
println!("got ssl connection {:?}", num);
|
println!("got ssl connection {:?}", num);
|
||||||
future::ok(())
|
future::ok(())
|
||||||
})
|
})
|
||||||
}).unwrap()
|
})
|
||||||
|
.unwrap()
|
||||||
.start();
|
.start();
|
||||||
|
|
||||||
sys.run();
|
sys.run();
|
||||||
|
@ -254,7 +254,8 @@ where
|
|||||||
io::ErrorKind::WriteZero,
|
io::ErrorKind::WriteZero,
|
||||||
"failed to \
|
"failed to \
|
||||||
write frame to transport",
|
write frame to transport",
|
||||||
).into());
|
)
|
||||||
|
.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Add a way to `bytes` to do this w/o returning the drained
|
// TODO: Add a way to `bytes` to do this w/o returning the drained
|
||||||
|
@ -9,6 +9,7 @@ use futures::task::AtomicTask;
|
|||||||
/// Counter could be cloned, total ncount is shared across all clones.
|
/// Counter could be cloned, total ncount is shared across all clones.
|
||||||
pub struct Counter(Rc<CounterInner>);
|
pub struct Counter(Rc<CounterInner>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct CounterInner {
|
struct CounterInner {
|
||||||
count: Cell<usize>,
|
count: Cell<usize>,
|
||||||
capacity: usize,
|
capacity: usize,
|
||||||
@ -40,6 +41,7 @@ impl Counter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct CounterGuard(Rc<CounterInner>);
|
pub struct CounterGuard(Rc<CounterInner>);
|
||||||
|
|
||||||
impl CounterGuard {
|
impl CounterGuard {
|
||||||
@ -57,11 +59,7 @@ impl Drop for CounterGuard {
|
|||||||
|
|
||||||
impl CounterInner {
|
impl CounterInner {
|
||||||
fn inc(&self) {
|
fn inc(&self) {
|
||||||
let num = self.count.get() + 1;
|
self.count.set(self.count.get() + 1);
|
||||||
self.count.set(num);
|
|
||||||
if num == self.capacity {
|
|
||||||
self.task.register();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dec(&self) {
|
fn dec(&self) {
|
||||||
@ -73,6 +71,10 @@ impl CounterInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn available(&self) -> bool {
|
fn available(&self) -> bool {
|
||||||
self.count.get() < self.capacity
|
let avail = self.count.get() < self.capacity;
|
||||||
|
if !avail {
|
||||||
|
self.task.register();
|
||||||
|
}
|
||||||
|
avail
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,10 +9,7 @@
|
|||||||
|
|
||||||
#![cfg_attr(
|
#![cfg_attr(
|
||||||
feature = "cargo-clippy",
|
feature = "cargo-clippy",
|
||||||
allow(
|
allow(declare_interior_mutable_const, borrow_interior_mutable_const)
|
||||||
declare_interior_mutable_const,
|
|
||||||
borrow_interior_mutable_const
|
|
||||||
)
|
|
||||||
)]
|
)]
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -83,9 +83,9 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
if let Ok(stream) = stream {
|
if let Ok(stream) = stream {
|
||||||
spawn(self.service.call(stream).map_err(|_| ()).map(move |val| {
|
spawn(self.service.call(stream).then(move |res| {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
val
|
res.map_err(|_| ())
|
||||||
}));
|
}));
|
||||||
ok(())
|
ok(())
|
||||||
} else {
|
} else {
|
||||||
@ -123,9 +123,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
|
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
|
||||||
spawn(self.service.call(req).map_err(|_| ()).map(move |val| {
|
spawn(self.service.call(req).then(move |res| {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
val
|
res.map_err(|_| ())
|
||||||
}));
|
}));
|
||||||
ok(())
|
ok(())
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,8 @@ impl Worker {
|
|||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
error!("Can not start worker: {:?}", e);
|
error!("Can not start worker: {:?}", e);
|
||||||
Arbiter::current().do_send(StopArbiter(0));
|
Arbiter::current().do_send(StopArbiter(0));
|
||||||
}).and_then(move |services| {
|
})
|
||||||
|
.and_then(move |services| {
|
||||||
for item in services {
|
for item in services {
|
||||||
for (idx, token, service) in item {
|
for (idx, token, service) in item {
|
||||||
while token.0 >= wrk.services.len() {
|
while token.0 >= wrk.services.len() {
|
||||||
|
@ -63,7 +63,7 @@ where
|
|||||||
{
|
{
|
||||||
b: Cell<B>,
|
b: Cell<B>,
|
||||||
fut_b: Option<B::Future>,
|
fut_b: Option<B::Future>,
|
||||||
fut_a: A::Future,
|
fut_a: Option<A::Future>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B> AndThenFuture<A, B>
|
impl<A, B> AndThenFuture<A, B>
|
||||||
@ -71,10 +71,10 @@ where
|
|||||||
A: Service,
|
A: Service,
|
||||||
B: Service<Request = A::Response, Error = A::Error>,
|
B: Service<Request = A::Response, Error = A::Error>,
|
||||||
{
|
{
|
||||||
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
|
fn new(a: A::Future, b: Cell<B>) -> Self {
|
||||||
AndThenFuture {
|
AndThenFuture {
|
||||||
b,
|
b,
|
||||||
fut_a,
|
fut_a: Some(a),
|
||||||
fut_b: None,
|
fut_b: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -94,8 +94,9 @@ where
|
|||||||
return fut.poll();
|
return fut.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.fut_a.poll() {
|
match self.fut_a.as_mut().expect("actix-net bug").poll() {
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.borrow_mut().call(resp));
|
self.fut_b = Some(self.b.borrow_mut().call(resp));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,7 @@ where
|
|||||||
{
|
{
|
||||||
b: Cell<B>,
|
b: Cell<B>,
|
||||||
fut_b: Option<B::Future>,
|
fut_b: Option<B::Future>,
|
||||||
fut_a: A::Future,
|
fut_a: Option<A::Future>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A, B> ThenFuture<A, B>
|
impl<A, B> ThenFuture<A, B>
|
||||||
@ -71,10 +71,10 @@ where
|
|||||||
A: Service,
|
A: Service,
|
||||||
B: Service<Request = Result<A::Response, A::Error>>,
|
B: Service<Request = Result<A::Response, A::Error>>,
|
||||||
{
|
{
|
||||||
fn new(fut_a: A::Future, b: Cell<B>) -> Self {
|
fn new(a: A::Future, b: Cell<B>) -> Self {
|
||||||
ThenFuture {
|
ThenFuture {
|
||||||
b,
|
b,
|
||||||
fut_a,
|
fut_a: Some(a),
|
||||||
fut_b: None,
|
fut_b: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -93,12 +93,14 @@ where
|
|||||||
return fut.poll();
|
return fut.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.fut_a.poll() {
|
match self.fut_a.as_mut().expect("actix-net bug").poll() {
|
||||||
Ok(Async::Ready(resp)) => {
|
Ok(Async::Ready(resp)) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.borrow_mut().call(Ok(resp)));
|
self.fut_b = Some(self.b.borrow_mut().call(Ok(resp)));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
let _ = self.fut_a.take();
|
||||||
self.fut_b = Some(self.b.borrow_mut().call(Err(err)));
|
self.fut_b = Some(self.b.borrow_mut().call(Err(err)));
|
||||||
self.poll()
|
self.poll()
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user