1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-13 16:38:22 +02:00

Compare commits

..

15 Commits

Author SHA1 Message Date
Nikolay Kim
9fa2a36b4e prepare actix-rt release 2019-11-14 17:33:28 +06:00
Ivan Ladelshchikov
ed5023128b store the thread's handle with arbiter (#60) 2019-11-14 15:07:33 +06:00
Nikolay Kim
2e8c2c7733 Re-register task on every future poll 2019-10-14 17:55:52 +06:00
Nikolay Kim
115e82329f fix arbiter thread panic message 2019-10-14 11:19:08 +06:00
Nikolay Kim
0b0060fe47 update deps 2019-10-14 10:37:48 +06:00
Nikolay Kim
35e32d8e55 prepare actix-testing release 2019-10-14 10:30:27 +06:00
Nikolay Kim
9982a9498d register current task in counters available method. 2019-10-08 15:02:43 +06:00
Nikolay Kim
fa72975f34 extra trace logging 2019-10-08 14:46:22 +06:00
Sven-Hendrik Haase
fe5de2510d Merge pull request #56 from actix/fix-52
Add an error message if we receive a non-hostname-based dest
2019-10-04 13:48:20 +02:00
Yuki Okushi
e3155957a8 Prepare actix-server release (#55) 2019-10-04 17:36:23 +09:00
Sven-Hendrik Haase
f6f9e1fcdb Add an error message if we receive a non-hostname-based dest
This is more helpful than an unwrap and at least points users at the right location.
Upstream issue is https://github.com/briansmith/webpki/issues/54
2019-10-04 07:30:13 +02:00
Yuki Okushi
2667850d60 Prepare actix-server-config release (#54)
* Prepare actix-server-config release

* Bump up actix-server-config to 0.2.0
2019-10-04 06:13:33 +06:00
Yuki Okushi
fba2002702 Prepare actix-connect release (#53) 2019-10-04 06:21:59 +09:00
Jerome Gravel-Niquet
e733c562d9 Update rustls, tokio-rustls and webpki across the board (#42)
* Update rustls, tokio-rustls and webpki across the board

* bump minimum rust version to 1.37

* updated readme and changelogs to reflect changes and minimum required rust version
2019-10-04 03:32:32 +09:00
Yuki Okushi
8f05986a9f Use map() instead of and_then() (#51) 2019-10-03 14:55:44 +09:00
29 changed files with 154 additions and 92 deletions

View File

@@ -10,7 +10,7 @@ matrix:
include:
- rust: stable
- rust: beta
- rust: 1.36.0
- rust: 1.37.0
- rust: nightly-2019-06-15
allow_failures:
- rust: nightly-2019-06-15

View File

@@ -1,18 +1,3 @@
[package]
name = "actix-net"
version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix net - framework for the composable network services for Rust"
readme = "README.md"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-net/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
[workspace]
members = [
"actix-codec",
@@ -28,17 +13,6 @@ members = [
"router",
]
[dev-dependencies]
actix-service = "0.4.0"
actix-codec = "0.1.1"
actix-rt = "0.2.0"
actix-server = { version="0.5.0", features=["ssl"] }
env_logger = "0.6"
futures = "0.1.25"
openssl = "0.10"
tokio-tcp = "0.1"
tokio-openssl = "0.3"
[patch.crates-io]
actix-codec = { path = "actix-codec" }
actix-connect = { path = "actix-connect" }

View File

@@ -7,7 +7,7 @@ Actix net - framework for composable network services
* [API Documentation (Development)](https://actix.rs/actix-net/actix_net/)
* [Chat on gitter](https://gitter.im/actix/actix)
* Cargo package: [actix-net](https://crates.io/crates/actix-net)
* Minimum supported Rust version: 1.36 or later
* Minimum supported Rust version: 1.37 or later
## Example

View File

@@ -209,6 +209,7 @@ where
// get a spurious 0 that looks like EOF
self.buffer.reserve(1);
if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
trace!("read 0 bytes, mark stream as eof");
self.eof = true;
}

View File

@@ -1,5 +1,12 @@
# Changes
## [0.3.0] - 2019-10-03
### Changed
* Update `rustls` to 0.16
* Minimum required Rust version upped to 1.37.0
## [0.2.5] - 2019-09-05
* Add `TcpConnectService`

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-connect"
version = "0.2.5"
version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"]
@@ -51,11 +51,11 @@ openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true }
#rustls
rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true }
webpki = { version = "0.19", optional = true }
rustls = { version = "0.16.0", optional = true }
tokio-rustls = { version = "0.10.0", optional = true }
webpki = { version = "0.21", optional = true }
[dev-dependencies]
bytes = "0.4"
actix-testing = { version="0.1.0" }
actix-server-config = "0.1.0"
actix-testing = { version="0.2.0" }
actix-server-config = "0.2.0"

View File

@@ -5,10 +5,7 @@ use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use std::sync::Arc;
use tokio_rustls::{
rustls::{ClientConfig, ClientSession},
Connect, TlsConnector, TlsStream,
};
use tokio_rustls::{client::TlsStream, rustls::ClientConfig, Connect, TlsConnector};
use webpki::DNSNameRef;
use crate::{Address, Connection};
@@ -37,7 +34,7 @@ where
connector: Arc<ClientConfig>,
) -> impl Service<
Request = Connection<T, U>,
Response = Connection<T, TlsStream<U, ClientSession>>,
Response = Connection<T, TlsStream<U>>,
Error = std::io::Error,
> {
RustlsConnectorService {
@@ -61,7 +58,7 @@ where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U, ClientSession>>;
type Response = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
type Config = ();
type Service = RustlsConnectorService<T, U>;
@@ -86,7 +83,7 @@ where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U, ClientSession>>;
type Response = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
type Future = ConnectAsyncExt<T, U>;
@@ -97,7 +94,8 @@ where
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace(());
let host = DNSNameRef::try_from_ascii_str(stream.host()).unwrap();
let host = DNSNameRef::try_from_ascii_str(stream.host())
.expect("rustls currently only handles hostname-based connections. See https://github.com/briansmith/webpki/issues/54");
ConnectAsyncExt {
fut: TlsConnector::from(self.connector.clone()).connect(host, io),
stream: Some(stream),
@@ -114,7 +112,7 @@ impl<T: Address, U> Future for ConnectAsyncExt<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
{
type Item = Connection<T, TlsStream<U, ClientSession>>;
type Item = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

View File

@@ -42,6 +42,7 @@ fn test_rustls_string() {
let con = test::call_service(&mut conn, addr.into());
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[test]
fn test_static_str() {
let srv = TestServer::with(|| {

View File

@@ -1,5 +1,11 @@
# Changes
## [0.1.1] - 2019-10-14
* Re-register task on every dispatcher poll.
## [0.1.0] - 2019-09-25
* Initial release

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-ioframe"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix framed service"
keywords = ["network", "framework", "async", "futures"]
@@ -28,8 +28,8 @@ log = "0.4"
[dev-dependencies]
actix-rt = "0.2.2"
actix-connect = "0.2.0"
actix-testing = "0.1.0"
actix-server-config = "0.1.1"
actix-connect = "0.3.0"
actix-testing = "0.2.0"
actix-server-config = "0.2.0"
tokio-tcp = "0.1"
tokio-timer = "0.2"

View File

@@ -29,6 +29,10 @@ impl<T> Cell<T> {
}
}
pub(crate) unsafe fn get_ref(&mut self) -> &T {
&*self.inner.as_ref().get()
}
pub(crate) unsafe fn get_mut(&mut self) -> &mut T {
&mut *self.inner.as_ref().get()
}

View File

@@ -147,13 +147,13 @@ where
}
Ok(Async::NotReady) => return false,
Ok(Async::Ready(None)) => {
log::trace!("Client disconnected");
self.dispatch_state = FramedState::Stopping;
return true;
}
};
let mut cell = self.inner.clone();
unsafe { cell.get_mut().task.register() };
tokio_current_thread::spawn(
self.service
.call(Item::new(self.state.clone(), self.sink.clone(), item))
@@ -274,6 +274,8 @@ where
type Error = ServiceError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
unsafe { self.inner.get_ref().task.register() };
match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
FramedState::Processing => {
if self.poll_read() || self.poll_write() {

View File

@@ -1,5 +1,16 @@
# Changes
## [0.2.6] - 2019-11-14
### Fixed
* Fix arbiter's thread panic message.
### Added
* Allow to join arbiter's thread. #60
## [0.2.5] - 2019-09-02
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-rt"
version = "0.2.5"
version = "0.2.6"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -39,11 +39,20 @@ impl fmt::Debug for ArbiterCommand {
}
}
#[derive(Debug, Clone)]
#[derive(Debug)]
/// Arbiters provide an asynchronous execution environment for actors, functions
/// and futures. When an Arbiter is created, they spawn a new OS thread, and
/// host an event loop. Some Arbiter functions execute on the current thread.
pub struct Arbiter(UnboundedSender<ArbiterCommand>);
pub struct Arbiter {
sender: UnboundedSender<ArbiterCommand>,
thread_handle: Option<thread::JoinHandle<()>>,
}
impl Clone for Arbiter {
fn clone(&self) -> Self {
Self::with_sender(self.sender.clone())
}
}
impl Default for Arbiter {
fn default() -> Self {
@@ -55,7 +64,7 @@ impl Arbiter {
pub(crate) fn new_system() -> Self {
let (tx, rx) = unbounded();
let arb = Arbiter(tx);
let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear());
@@ -75,7 +84,7 @@ impl Arbiter {
/// Stop arbiter from continuing it's event loop.
pub fn stop(&self) {
let _ = self.0.unbounded_send(ArbiterCommand::Stop);
let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
}
/// Spawn new thread and run event loop in spawned thread.
@@ -87,9 +96,9 @@ impl Arbiter {
let (arb_tx, arb_rx) = unbounded();
let arb_tx2 = arb_tx.clone();
let _ = thread::Builder::new().name(name.clone()).spawn(move || {
let handle = thread::Builder::new().name(name.clone()).spawn(move || {
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
let arb = Arbiter(arb_tx);
let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
@@ -119,9 +128,9 @@ impl Arbiter {
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id));
});
}).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err));
Arbiter(arb_tx2)
Arbiter{sender: arb_tx2, thread_handle: Some(handle)}
}
pub(crate) fn run_system() {
@@ -171,7 +180,7 @@ impl Arbiter {
F: Future<Item = (), Error = ()> + Send + 'static,
{
let _ = self
.0
.sender
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
}
@@ -182,7 +191,7 @@ impl Arbiter {
F: FnOnce() + Send + 'static,
{
let _ = self
.0
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
f();
})));
@@ -198,7 +207,7 @@ impl Arbiter {
{
let (tx, rx) = channel();
let _ = self
.0
.sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_canceled() {
let _ = tx.send(f());
@@ -250,6 +259,20 @@ impl Arbiter {
f(item)
})
}
fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
Self{sender, thread_handle: None}
}
/// Wait for the event loop to stop by joining the underlying thread (if have Some).
pub fn join(&mut self) -> thread::Result<()>{
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join()
}
else {
Ok(())
}
}
}
struct ArbiterController {
@@ -260,9 +283,11 @@ struct ArbiterController {
impl Drop for ArbiterController {
fn drop(&mut self) {
if thread::panicking() {
eprintln!("Panic in Arbiter thread, shutting down system.");
if System::current().stop_on_panic() {
eprintln!("Panic in Arbiter thread, shutting down system.");
System::current().stop_with_code(1)
} else {
eprintln!("Panic in Arbiter thread.");
}
}
}

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server-config"
version = "0.1.2"
version = "0.2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server config utils"
homepage = "https://actix.rs"
@@ -33,6 +33,6 @@ futures = "0.1.25"
tokio-io = "0.1.12"
tokio-tcp = "0.1"
tokio-openssl = { version="0.3.0", optional = true }
rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true }
rustls = { version = "0.16.0", optional = true }
tokio-rustls = { version = "0.10.0", optional = true }
tokio-uds = { version="0.2.5", optional = true }

View File

@@ -1,5 +1,12 @@
# Changes
## [0.2.0] - 2019-10-03
### Changed
* Update `rustls` to 0.16
* Minimum required Rust version upped to 1.37.0
## [0.1.2] - 2019-07-18
### Added

View File

@@ -195,7 +195,7 @@ impl<T: IoStream> IoStream for tokio_openssl::SslStream<T> {
}
#[cfg(any(feature = "rust-tls"))]
impl<T: IoStream> IoStream for tokio_rustls::TlsStream<T, rustls::ServerSession> {
impl<T: IoStream> IoStream for tokio_rustls::server::TlsStream<T> {
#[inline]
fn peer_addr(&self) -> Option<net::SocketAddr> {
self.get_ref().0.peer_addr()

View File

@@ -1,5 +1,12 @@
# Changes
## [0.7.0] - 2019-10-04
### Changed
* Update `rustls` to 0.16
* Minimum required Rust version upped to 1.37.0
## [0.6.1] - 2019-09-25
### Added

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "0.6.1"
version = "0.7.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"]
@@ -38,7 +38,7 @@ uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
[dependencies]
actix-rt = "0.2.2"
actix-service = "0.4.1"
actix-server-config = "0.1.2"
actix-server-config = "0.2.0"
log = "0.4"
num_cpus = "1.0"
@@ -65,10 +65,10 @@ openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true }
# rustls
rustls = { version = "0.15.2", optional = true }
tokio-rustls = { version = "0.9.1", optional = true }
webpki = { version = "0.19", optional = true }
webpki-roots = { version = "0.16", optional = true }
rustls = { version = "0.16.0", optional = true }
tokio-rustls = { version = "0.10.0", optional = true }
webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.17", optional = true }
[dev-dependencies]
bytes = "0.4"

View File

@@ -4,9 +4,9 @@ use std::sync::Arc;
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use rustls::{ServerConfig, ServerSession};
use rustls::ServerConfig;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_rustls::{Accept, TlsAcceptor, TlsStream};
use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor};
use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
@@ -41,7 +41,7 @@ impl<T, P> Clone for RustlsAcceptor<T, P> {
impl<T: AsyncRead + AsyncWrite, P> NewService for RustlsAcceptor<T, P> {
type Request = Io<T, P>;
type Response = Io<TlsStream<T, ServerSession>, P>;
type Response = Io<TlsStream<T>, P>;
type Error = io::Error;
type Config = SrvConfig;
@@ -70,7 +70,7 @@ pub struct RustlsAcceptorService<T, P> {
impl<T: AsyncRead + AsyncWrite, P> Service for RustlsAcceptorService<T, P> {
type Request = Io<T, P>;
type Response = Io<TlsStream<T, ServerSession>, P>;
type Response = Io<TlsStream<T>, P>;
type Error = io::Error;
type Future = RustlsAcceptorServiceFut<T, P>;
@@ -102,7 +102,7 @@ where
}
impl<T: AsyncRead + AsyncWrite, P> Future for RustlsAcceptorServiceFut<T, P> {
type Item = Io<TlsStream<T, ServerSession>, P>;
type Item = Io<TlsStream<T>, P>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

View File

@@ -19,7 +19,7 @@ path = "src/lib.rs"
[dependencies]
actix-rt = "0.2.1"
actix-server = "0.5.0"
actix-server-config = "0.1.0"
actix-server-config = "0.2.0"
actix-testing = "0.1.0"
log = "0.4"

View File

@@ -1,5 +1,10 @@
# Changes
## [0.2.0] - 2019-10-14
* Upgrade actix-server and actix-server-config deps
## [0.1.0] - 2019-09-25
* Initial impl

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-testing"
version = "0.1.0"
version = "0.2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix testing utils"
keywords = ["network", "framework", "async", "futures"]
@@ -17,10 +17,10 @@ name = "actix_testing"
path = "src/lib.rs"
[dependencies]
actix-rt = "0.2.1"
actix-server = "0.6.0"
actix-server-config = "0.1.0"
actix-service = "0.4.0"
actix-rt = "0.2.5"
actix-server = "0.7.0"
actix-server-config = "0.2.0"
actix-service = "0.4.2"
log = "0.4"
net2 = "0.2"

View File

@@ -1,5 +1,15 @@
# Changes
## [0.4.7] - 2019-10-14
* Re-register task on every framed transport poll.
## [0.4.6] - 2019-10-08
* Refactor `Counter` type. register current task in available method.
## [0.4.5] - 2019-07-19
### Removed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-utils"
version = "0.4.5"
version = "0.4.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -25,11 +25,13 @@ impl Counter {
}))
}
/// Get counter guard.
pub fn get(&self) -> CounterGuard {
CounterGuard::new(self.0.clone())
}
/// Check if counter is not at capacity
/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
pub fn available(&self) -> bool {
self.0.available()
}
@@ -57,11 +59,7 @@ impl Drop for CounterGuard {
impl CounterInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.capacity {
self.task.register();
}
self.count.set(self.count.get() + 1);
}
fn dec(&self) {
@@ -73,6 +71,11 @@ impl CounterInner {
}
fn available(&self) -> bool {
self.count.get() < self.capacity
if self.count.get() < self.capacity {
true
} else {
self.task.register();
false
}
}
}

View File

@@ -129,7 +129,6 @@ where
};
let mut cell = self.inner.clone();
cell.get_mut().task.register();
tokio_current_thread::spawn(self.service.call(item).then(move |item| {
let inner = cell.get_mut();
inner.buf.push_back(item);
@@ -293,6 +292,8 @@ where
type Error = FramedTransportError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.get_ref().task.register();
match mem::replace(&mut self.state, TransportState::Processing) {
TransportState::Processing => {
if self.poll_read() || self.poll_write() {

View File

@@ -195,7 +195,7 @@ fn from_hex(v: u8) -> Option<u8> {
#[inline]
fn restore_ch(d1: u8, d2: u8) -> Option<u8> {
from_hex(d1).and_then(|d1| from_hex(d2).and_then(move |d2| Some(d1 << 4 | d2)))
from_hex(d1).and_then(|d1| from_hex(d2).map(move |d2| d1 << 4 | d2))
}
#[cfg(test)]