mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-16 07:59:00 +02:00
Compare commits
11 Commits
server-con
...
rt-0.2.6
Author | SHA1 | Date | |
---|---|---|---|
|
9fa2a36b4e | ||
|
ed5023128b | ||
|
2e8c2c7733 | ||
|
115e82329f | ||
|
0b0060fe47 | ||
|
35e32d8e55 | ||
|
9982a9498d | ||
|
fa72975f34 | ||
|
fe5de2510d | ||
|
e3155957a8 | ||
|
f6f9e1fcdb |
26
Cargo.toml
26
Cargo.toml
@@ -1,18 +1,3 @@
|
||||
[package]
|
||||
name = "actix-net"
|
||||
version = "0.3.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix net - framework for the composable network services for Rust"
|
||||
readme = "README.md"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-net/"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
edition = "2018"
|
||||
|
||||
[workspace]
|
||||
members = [
|
||||
"actix-codec",
|
||||
@@ -28,17 +13,6 @@ members = [
|
||||
"router",
|
||||
]
|
||||
|
||||
[dev-dependencies]
|
||||
actix-service = "0.4.0"
|
||||
actix-codec = "0.1.1"
|
||||
actix-rt = "0.2.0"
|
||||
actix-server = { version="0.5.0", features=["ssl"] }
|
||||
env_logger = "0.6"
|
||||
futures = "0.1.25"
|
||||
openssl = "0.10"
|
||||
tokio-tcp = "0.1"
|
||||
tokio-openssl = "0.3"
|
||||
|
||||
[patch.crates-io]
|
||||
actix-codec = { path = "actix-codec" }
|
||||
actix-connect = { path = "actix-connect" }
|
||||
|
@@ -209,6 +209,7 @@ where
|
||||
// get a spurious 0 that looks like EOF
|
||||
self.buffer.reserve(1);
|
||||
if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
|
||||
trace!("read 0 bytes, mark stream as eof");
|
||||
self.eof = true;
|
||||
}
|
||||
|
||||
|
@@ -57,5 +57,5 @@ webpki = { version = "0.21", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "0.4"
|
||||
actix-testing = { version="0.1.0" }
|
||||
actix-testing = { version="0.2.0" }
|
||||
actix-server-config = "0.2.0"
|
||||
|
@@ -94,7 +94,8 @@ where
|
||||
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
|
||||
trace!("SSL Handshake start for: {:?}", stream.host());
|
||||
let (io, stream) = stream.replace(());
|
||||
let host = DNSNameRef::try_from_ascii_str(stream.host()).unwrap();
|
||||
let host = DNSNameRef::try_from_ascii_str(stream.host())
|
||||
.expect("rustls currently only handles hostname-based connections. See https://github.com/briansmith/webpki/issues/54");
|
||||
ConnectAsyncExt {
|
||||
fut: TlsConnector::from(self.connector.clone()).connect(host, io),
|
||||
stream: Some(stream),
|
||||
|
@@ -42,6 +42,7 @@ fn test_rustls_string() {
|
||||
let con = test::call_service(&mut conn, addr.into());
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_static_str() {
|
||||
let srv = TestServer::with(|| {
|
||||
|
@@ -1,5 +1,11 @@
|
||||
# Changes
|
||||
|
||||
|
||||
## [0.1.1] - 2019-10-14
|
||||
|
||||
* Re-register task on every dispatcher poll.
|
||||
|
||||
|
||||
## [0.1.0] - 2019-09-25
|
||||
|
||||
* Initial release
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-ioframe"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix framed service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
@@ -28,8 +28,8 @@ log = "0.4"
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "0.2.2"
|
||||
actix-connect = "0.2.0"
|
||||
actix-testing = "0.1.0"
|
||||
actix-connect = "0.3.0"
|
||||
actix-testing = "0.2.0"
|
||||
actix-server-config = "0.2.0"
|
||||
tokio-tcp = "0.1"
|
||||
tokio-timer = "0.2"
|
||||
|
@@ -29,6 +29,10 @@ impl<T> Cell<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) unsafe fn get_ref(&mut self) -> &T {
|
||||
&*self.inner.as_ref().get()
|
||||
}
|
||||
|
||||
pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
|
||||
&mut *self.inner.as_ref().get()
|
||||
}
|
||||
|
@@ -147,13 +147,13 @@ where
|
||||
}
|
||||
Ok(Async::NotReady) => return false,
|
||||
Ok(Async::Ready(None)) => {
|
||||
log::trace!("Client disconnected");
|
||||
self.dispatch_state = FramedState::Stopping;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
let mut cell = self.inner.clone();
|
||||
unsafe { cell.get_mut().task.register() };
|
||||
tokio_current_thread::spawn(
|
||||
self.service
|
||||
.call(Item::new(self.state.clone(), self.sink.clone(), item))
|
||||
@@ -274,6 +274,8 @@ where
|
||||
type Error = ServiceError<S::Error, U>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
unsafe { self.inner.get_ref().task.register() };
|
||||
|
||||
match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
|
||||
FramedState::Processing => {
|
||||
if self.poll_read() || self.poll_write() {
|
||||
|
@@ -1,5 +1,16 @@
|
||||
# Changes
|
||||
|
||||
## [0.2.6] - 2019-11-14
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix arbiter's thread panic message.
|
||||
|
||||
### Added
|
||||
|
||||
* Allow to join arbiter's thread. #60
|
||||
|
||||
|
||||
## [0.2.5] - 2019-09-02
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-rt"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix runtime"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -39,11 +39,20 @@ impl fmt::Debug for ArbiterCommand {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
/// Arbiters provide an asynchronous execution environment for actors, functions
|
||||
/// and futures. When an Arbiter is created, they spawn a new OS thread, and
|
||||
/// host an event loop. Some Arbiter functions execute on the current thread.
|
||||
pub struct Arbiter(UnboundedSender<ArbiterCommand>);
|
||||
pub struct Arbiter {
|
||||
sender: UnboundedSender<ArbiterCommand>,
|
||||
thread_handle: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl Clone for Arbiter {
|
||||
fn clone(&self) -> Self {
|
||||
Self::with_sender(self.sender.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Arbiter {
|
||||
fn default() -> Self {
|
||||
@@ -55,7 +64,7 @@ impl Arbiter {
|
||||
pub(crate) fn new_system() -> Self {
|
||||
let (tx, rx) = unbounded();
|
||||
|
||||
let arb = Arbiter(tx);
|
||||
let arb = Arbiter::with_sender(tx);
|
||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||
RUNNING.with(|cell| cell.set(false));
|
||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||
@@ -75,7 +84,7 @@ impl Arbiter {
|
||||
|
||||
/// Stop arbiter from continuing it's event loop.
|
||||
pub fn stop(&self) {
|
||||
let _ = self.0.unbounded_send(ArbiterCommand::Stop);
|
||||
let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
|
||||
}
|
||||
|
||||
/// Spawn new thread and run event loop in spawned thread.
|
||||
@@ -87,9 +96,9 @@ impl Arbiter {
|
||||
let (arb_tx, arb_rx) = unbounded();
|
||||
let arb_tx2 = arb_tx.clone();
|
||||
|
||||
let _ = thread::Builder::new().name(name.clone()).spawn(move || {
|
||||
let handle = thread::Builder::new().name(name.clone()).spawn(move || {
|
||||
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
|
||||
let arb = Arbiter(arb_tx);
|
||||
let arb = Arbiter::with_sender(arb_tx);
|
||||
|
||||
let (stop, stop_rx) = channel();
|
||||
RUNNING.with(|cell| cell.set(true));
|
||||
@@ -119,9 +128,9 @@ impl Arbiter {
|
||||
let _ = System::current()
|
||||
.sys()
|
||||
.unbounded_send(SystemCommand::UnregisterArbiter(id));
|
||||
});
|
||||
}).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err));
|
||||
|
||||
Arbiter(arb_tx2)
|
||||
Arbiter{sender: arb_tx2, thread_handle: Some(handle)}
|
||||
}
|
||||
|
||||
pub(crate) fn run_system() {
|
||||
@@ -171,7 +180,7 @@ impl Arbiter {
|
||||
F: Future<Item = (), Error = ()> + Send + 'static,
|
||||
{
|
||||
let _ = self
|
||||
.0
|
||||
.sender
|
||||
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
|
||||
}
|
||||
|
||||
@@ -182,7 +191,7 @@ impl Arbiter {
|
||||
F: FnOnce() + Send + 'static,
|
||||
{
|
||||
let _ = self
|
||||
.0
|
||||
.sender
|
||||
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||
f();
|
||||
})));
|
||||
@@ -198,7 +207,7 @@ impl Arbiter {
|
||||
{
|
||||
let (tx, rx) = channel();
|
||||
let _ = self
|
||||
.0
|
||||
.sender
|
||||
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
|
||||
if !tx.is_canceled() {
|
||||
let _ = tx.send(f());
|
||||
@@ -250,6 +259,20 @@ impl Arbiter {
|
||||
f(item)
|
||||
})
|
||||
}
|
||||
|
||||
fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
|
||||
Self{sender, thread_handle: None}
|
||||
}
|
||||
|
||||
/// Wait for the event loop to stop by joining the underlying thread (if have Some).
|
||||
pub fn join(&mut self) -> thread::Result<()>{
|
||||
if let Some(thread_handle) = self.thread_handle.take() {
|
||||
thread_handle.join()
|
||||
}
|
||||
else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ArbiterController {
|
||||
@@ -260,9 +283,11 @@ struct ArbiterController {
|
||||
impl Drop for ArbiterController {
|
||||
fn drop(&mut self) {
|
||||
if thread::panicking() {
|
||||
eprintln!("Panic in Arbiter thread, shutting down system.");
|
||||
if System::current().stop_on_panic() {
|
||||
eprintln!("Panic in Arbiter thread, shutting down system.");
|
||||
System::current().stop_with_code(1)
|
||||
} else {
|
||||
eprintln!("Panic in Arbiter thread.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,6 @@
|
||||
# Changes
|
||||
|
||||
## Next
|
||||
## [0.7.0] - 2019-10-04
|
||||
|
||||
### Changed
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-server"
|
||||
version = "0.6.1"
|
||||
version = "0.7.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix server - General purpose tcp server"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -1,5 +1,10 @@
|
||||
# Changes
|
||||
|
||||
## [0.2.0] - 2019-10-14
|
||||
|
||||
* Upgrade actix-server and actix-server-config deps
|
||||
|
||||
|
||||
## [0.1.0] - 2019-09-25
|
||||
|
||||
* Initial impl
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-testing"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix testing utils"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
@@ -17,10 +17,10 @@ name = "actix_testing"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
actix-rt = "0.2.1"
|
||||
actix-server = "0.6.0"
|
||||
actix-rt = "0.2.5"
|
||||
actix-server = "0.7.0"
|
||||
actix-server-config = "0.2.0"
|
||||
actix-service = "0.4.0"
|
||||
actix-service = "0.4.2"
|
||||
|
||||
log = "0.4"
|
||||
net2 = "0.2"
|
||||
|
@@ -1,5 +1,15 @@
|
||||
# Changes
|
||||
|
||||
## [0.4.7] - 2019-10-14
|
||||
|
||||
* Re-register task on every framed transport poll.
|
||||
|
||||
|
||||
## [0.4.6] - 2019-10-08
|
||||
|
||||
* Refactor `Counter` type. register current task in available method.
|
||||
|
||||
|
||||
## [0.4.5] - 2019-07-19
|
||||
|
||||
### Removed
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-utils"
|
||||
version = "0.4.5"
|
||||
version = "0.4.7"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix utils - various actix net related services"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -25,11 +25,13 @@ impl Counter {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get counter guard.
|
||||
pub fn get(&self) -> CounterGuard {
|
||||
CounterGuard::new(self.0.clone())
|
||||
}
|
||||
|
||||
/// Check if counter is not at capacity
|
||||
/// Check if counter is not at capacity. If counter at capacity
|
||||
/// it registers notification for current task.
|
||||
pub fn available(&self) -> bool {
|
||||
self.0.available()
|
||||
}
|
||||
@@ -57,11 +59,7 @@ impl Drop for CounterGuard {
|
||||
|
||||
impl CounterInner {
|
||||
fn inc(&self) {
|
||||
let num = self.count.get() + 1;
|
||||
self.count.set(num);
|
||||
if num == self.capacity {
|
||||
self.task.register();
|
||||
}
|
||||
self.count.set(self.count.get() + 1);
|
||||
}
|
||||
|
||||
fn dec(&self) {
|
||||
@@ -73,6 +71,11 @@ impl CounterInner {
|
||||
}
|
||||
|
||||
fn available(&self) -> bool {
|
||||
self.count.get() < self.capacity
|
||||
if self.count.get() < self.capacity {
|
||||
true
|
||||
} else {
|
||||
self.task.register();
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -129,7 +129,6 @@ where
|
||||
};
|
||||
|
||||
let mut cell = self.inner.clone();
|
||||
cell.get_mut().task.register();
|
||||
tokio_current_thread::spawn(self.service.call(item).then(move |item| {
|
||||
let inner = cell.get_mut();
|
||||
inner.buf.push_back(item);
|
||||
@@ -293,6 +292,8 @@ where
|
||||
type Error = FramedTransportError<S::Error, U>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.inner.get_ref().task.register();
|
||||
|
||||
match mem::replace(&mut self.state, TransportState::Processing) {
|
||||
TransportState::Processing => {
|
||||
if self.poll_read() || self.poll_write() {
|
||||
|
Reference in New Issue
Block a user