1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-17 00:08:59 +02:00

Compare commits

...

11 Commits

Author SHA1 Message Date
Rob Ede
681eeb497d prepare server release 1.0.4 (#188) 2020-09-12 15:28:17 +01:00
Igor Aleksanov
3e04b87311 actix-service: Fix broken link in readme (#189) 2020-09-12 15:08:03 +01:00
Rob Ede
77b7826658 prepare tls v2 release (#186) 2020-09-08 18:00:07 +01:00
Igor Aleksanov
b7a9cb7bb4 actix-rt: Make the process of running System in existing Runtime more clear (#173) 2020-09-06 11:01:24 +01:00
Robert Gabriel Jakabosky
88d99ac89c Fix clippy errors. (#187) 2020-09-06 10:41:42 +01:00
Rob Ede
7632f51509 prepare connect v2 stable release (#185) 2020-09-02 22:14:07 +01:00
Rob Ede
d28687d0d7 promote codec/utils out of beta (#184) 2020-08-24 09:18:37 +01:00
Rob Ede
27c6be9881 remove unused type parameter from Framed::replace_codec (#183) 2020-08-20 00:30:26 +01:00
Rob Ede
119dc39f5b prepare codec and utils betas (#182) 2020-08-19 11:00:12 +01:00
Rob Ede
b3010c13e0 solve framed integration with actix-http (#179) 2020-08-18 23:27:37 +01:00
Adrian Wechner
fecdfcd8d4 assert workers greater than zero (#167) 2020-08-18 16:44:22 +01:00
41 changed files with 399 additions and 322 deletions

View File

@@ -1,10 +1,21 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
## 0.3.0 - 2020-08-23
* No changes from beta 2.
## 0.3.0-beta.2 - 2020-08-19
* Remove unused type parameter from `Framed::replace_codec`.
## 0.3.0-beta.1 - 2020-08-19
* Use `.advance()` instead of `.split_to()`. * Use `.advance()` instead of `.split_to()`.
* Upgrade `tokio-util` to `0.3`. * Upgrade `tokio-util` to `0.3`.
* Improve `BytesCodec` `.encode()` performance * Improve `BytesCodec` `.encode()` performance
* Simplify `BytesCodec` `.decode()` * Simplify `BytesCodec` `.decode()`
* Rename methods on `Framed` to better describe their use.
* Add method on `Framed` to get a pinned reference to the underlying I/O.
* Add method on `Framed` check emptiness of read buffer.
## [0.2.0] - 2019-12-10 ## [0.2.0] - 2019-12-10

View File

@@ -1,8 +1,8 @@
[package] [package]
name = "actix-codec" name = "actix-codec"
version = "0.2.0" version = "0.3.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Utilities for encoding and decoding frames" description = "Codec utilities for working with framed protocols."
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
@@ -10,7 +10,6 @@ documentation = "https://docs.rs/actix-codec/"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
workspace = ".."
[lib] [lib]
name = "actix_codec" name = "actix_codec"
@@ -21,7 +20,7 @@ bitflags = "1.2.1"
bytes = "0.5.2" bytes = "0.5.2"
futures-core = { version = "0.3.4", default-features = false } futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false }
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }
log = "0.4" log = "0.4"
pin-project = "0.4.17" pin-project = "0.4.17"
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }

View File

@@ -23,6 +23,12 @@ bitflags::bitflags! {
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using /// A unified `Stream` and `Sink` interface to an underlying I/O object, using
/// the `Encoder` and `Decoder` traits to encode and decode frames. /// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder`
/// traits to handle encoding and decoding of message frames. Note that
/// the incoming and outgoing frame types may be distinct.
#[pin_project] #[pin_project]
pub struct Framed<T, U> { pub struct Framed<T, U> {
#[pin] #[pin]
@@ -38,15 +44,6 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder, U: Decoder,
{ {
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and /// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering /// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the /// things like gzip or TLS, which require both read and write access to the
@@ -63,40 +60,13 @@ where
} }
impl<T, U> Framed<T, U> { impl<T, U> Framed<T, U> {
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
///
/// Raw I/O objects work with byte sequences, but higher-level code usually
/// wants to batch these into meaningful chunks, called "frames". This
/// method layers framing on top of an I/O object, by using the `Codec`
/// traits to handle encoding and decoding of messages frames. Note that
/// the incoming and outgoing frame types may be distinct.
///
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// This objects takes a stream and a readbuffer and a writebuffer. These
/// field can be obtained from an existing `Framed` with the
/// `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}
/// Returns a reference to the underlying codec. /// Returns a reference to the underlying codec.
pub fn get_codec(&self) -> &U { pub fn codec_ref(&self) -> &U {
&self.codec &self.codec
} }
/// Returns a mutable reference to the underlying codec. /// Returns a mutable reference to the underlying codec.
pub fn get_codec_mut(&mut self) -> &mut U { pub fn codec_mut(&mut self) -> &mut U {
&mut self.codec &mut self.codec
} }
@@ -106,20 +76,29 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying stream /// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise /// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with. /// being worked with.
pub fn get_ref(&self) -> &T { pub fn io_ref(&self) -> &T {
&self.io &self.io
} }
/// Returns a mutable reference to the underlying I/O stream wrapped by /// Returns a mutable reference to the underlying I/O stream.
/// `Frame`.
/// ///
/// Note that care should be taken to not tamper with the underlying stream /// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise /// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with. /// being worked with.
pub fn get_mut(&mut self) -> &mut T { pub fn io_mut(&mut self) -> &mut T {
&mut self.io &mut self.io
} }
/// Returns a `Pin` of a mutable reference to the underlying I/O stream.
pub fn io_pin(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().io
}
/// Check if read buffer is empty.
pub fn is_read_buf_empty(&self) -> bool {
self.read_buf.is_empty()
}
/// Check if write buffer is empty. /// Check if write buffer is empty.
pub fn is_write_buf_empty(&self) -> bool { pub fn is_write_buf_empty(&self) -> bool {
self.write_buf.is_empty() self.write_buf.is_empty()
@@ -130,8 +109,15 @@ impl<T, U> Framed<T, U> {
self.write_buf.len() >= HW self.write_buf.len() >= HW
} }
/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}
/// Consume the `Frame`, returning `Frame` with different codec. /// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> { pub fn replace_codec<U2>(self, codec: U2) -> Framed<T, U2> {
Framed { Framed {
codec, codec,
io: self.io, io: self.io,
@@ -142,7 +128,7 @@ impl<T, U> Framed<T, U> {
} }
/// Consume the `Frame`, returning `Frame` with different io. /// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U> pub fn into_map_io<F, T2>(self, f: F) -> Framed<T2, U>
where where
F: Fn(T) -> T2, F: Fn(T) -> T2,
{ {
@@ -156,7 +142,7 @@ impl<T, U> Framed<T, U> {
} }
/// Consume the `Frame`, returning `Frame` with different codec. /// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2> pub fn into_map_codec<F, U2>(self, f: F) -> Framed<T, U2>
where where
F: Fn(U) -> U2, F: Fn(U) -> U2,
{ {
@@ -168,22 +154,6 @@ impl<T, U> Framed<T, U> {
write_buf: self.write_buf, write_buf: self.write_buf,
} }
} }
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
} }
impl<T, U> Framed<T, U> { impl<T, U> Framed<T, U> {
@@ -203,13 +173,6 @@ impl<T, U> Framed<T, U> {
Ok(()) Ok(())
} }
/// Check if framed is able to write more data.
///
/// `Framed` object considers ready if there is free space in write buffer.
pub fn is_write_ready(&self) -> bool {
self.write_buf.len() < HW
}
/// Try to read underlying I/O stream and decode item. /// Try to read underlying I/O stream and decode item.
pub fn next_item( pub fn next_item(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
@@ -376,6 +339,41 @@ where
} }
} }
impl<T, U> Framed<T, U> {
/// This function returns a *single* object that is both `Stream` and
/// `Sink`; grouping this into a single object is often useful for layering
/// things like gzip or TLS, which require both read and write access to the
/// underlying object.
///
/// These objects take a stream, a read buffer and a write buffer. These
/// fields can be obtained from an existing `Framed` with the `into_parts` method.
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
io: parts.io,
codec: parts.codec,
flags: parts.flags,
write_buf: parts.write_buf,
read_buf: parts.read_buf,
}
}
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
FramedParts {
io: self.io,
codec: self.codec,
flags: self.flags,
read_buf: self.read_buf,
write_buf: self.write_buf,
}
}
}
/// `FramedParts` contains an export of the data of a Framed transport. /// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new `Framed` with a different codec. /// It can be used to construct a new `Framed` with a different codec.
/// It contains all current buffers and the inner transport. /// It contains all current buffers and the inner transport.

View File

@@ -8,7 +8,9 @@
//! [`AsyncWrite`]: AsyncWrite //! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink //! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream //! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms)] #![deny(rust_2018_idioms)]
#![warn(missing_docs)]
mod bcodec; mod bcodec;
mod framed; mod framed;

View File

@@ -1,8 +1,11 @@
# Changes # Changes
## Unreleased ## Unreleased - 2020-xx-xx
## 2.0.0 - 2020-09-02
- No significant changes from `2.0.0-alpha.4`.
## 2.0.0-alpha.4 - 2020-08-17 ## 2.0.0-alpha.4 - 2020-08-17
### Changed ### Changed

View File

@@ -1,8 +1,8 @@
[package] [package]
name = "actix-connect" name = "actix-connect"
version = "2.0.0-alpha.4" version = "2.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix connect - tcp connector service" description = "TCP connector service for Actix ecosystem."
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
@@ -31,10 +31,11 @@ rustls = ["rust-tls", "tokio-rustls", "webpki"]
uri = ["http"] uri = ["http"]
[dependencies] [dependencies]
actix-service = "1.0.3" actix-service = "1.0.6"
actix-codec = "0.2.0" actix-codec = "0.3.0"
actix-utils = "1.0.6" actix-utils = "2.0.0"
actix-rt = "1.0.0" actix-rt = "1.1.1"
derive_more = "0.99.2" derive_more = "0.99.2"
either = "1.5.3" either = "1.5.3"
futures-util = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false }
@@ -44,14 +45,14 @@ trust-dns-proto = { version = "0.19", default-features = false, features = ["tok
trust-dns-resolver = { version = "0.19", default-features = false, features = ["tokio-runtime", "system-config"] } trust-dns-resolver = { version = "0.19", default-features = false, features = ["tokio-runtime", "system-config"] }
# openssl # openssl
open-ssl = { version="0.10", package = "openssl", optional = true } open-ssl = { package = "openssl", version = "0.10", optional = true }
tokio-openssl = { version = "0.4.0", optional = true } tokio-openssl = { version = "0.4.0", optional = true }
# rustls # rustls
rust-tls = { version = "0.18.0", package = "rustls", optional = true } rust-tls = { package = "rustls", version = "0.18.0", optional = true }
tokio-rustls = { version = "0.14.0", optional = true } tokio-rustls = { version = "0.14.0", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
[dev-dependencies] [dev-dependencies]
bytes = "0.5.3" bytes = "0.5.3"
actix-testing = { version="1.0.0" } actix-testing = "1.0.0"

View File

@@ -43,7 +43,7 @@ pub struct Connect<T> {
} }
impl<T: Address> Connect<T> { impl<T: Address> Connect<T> {
/// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 /// Create `Connect` instance by splitting the string by ':' and convert the second part to u16
pub fn new(req: T) -> Connect<T> { pub fn new(req: T) -> Connect<T> {
let (_, port) = parse(req.host()); let (_, port) = parse(req.host());
Connect { Connect {
@@ -53,7 +53,8 @@ impl<T: Address> Connect<T> {
} }
} }
/// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages. /// Create new `Connect` instance from host and address. Connector skips name resolution stage
/// for such connect messages.
pub fn with(req: T, addr: SocketAddr) -> Connect<T> { pub fn with(req: T, addr: SocketAddr) -> Connect<T> {
Connect { Connect {
req, req,
@@ -102,7 +103,7 @@ impl<T: Address> Connect<T> {
self.req.port().unwrap_or(self.port) self.req.port().unwrap_or(self.port)
} }
/// Preresolved addresses of the request. /// Pre-resolved addresses of the request.
pub fn addrs(&self) -> ConnectAddrsIter<'_> { pub fn addrs(&self) -> ConnectAddrsIter<'_> {
let inner = match self.addr { let inner = match self.addr {
None => Either::Left(None), None => Either::Left(None),
@@ -113,7 +114,7 @@ impl<T: Address> Connect<T> {
ConnectAddrsIter { inner } ConnectAddrsIter { inner }
} }
/// Takes preresolved addresses of the request. /// Takes pre-resolved addresses of the request.
pub fn take_addrs(&mut self) -> ConnectTakeAddrsIter { pub fn take_addrs(&mut self) -> ConnectTakeAddrsIter {
let inner = match self.addr.take() { let inner = match self.addr.take() {
None => Either::Left(None), None => Either::Left(None),

View File

@@ -13,7 +13,7 @@ use futures_util::future::{err, ok, BoxFuture, Either, FutureExt, Ready};
use super::connect::{Address, Connect, Connection}; use super::connect::{Address, Connect, Connection};
use super::error::ConnectError; use super::error::ConnectError;
/// Tcp connector service factory /// TCP connector service factory
#[derive(Debug)] #[derive(Debug)]
pub struct TcpConnectorFactory<T>(PhantomData<T>); pub struct TcpConnectorFactory<T>(PhantomData<T>);
@@ -22,7 +22,7 @@ impl<T> TcpConnectorFactory<T> {
TcpConnectorFactory(PhantomData) TcpConnectorFactory(PhantomData)
} }
/// Create tcp connector service /// Create TCP connector service
pub fn service(&self) -> TcpConnector<T> { pub fn service(&self) -> TcpConnector<T> {
TcpConnector(PhantomData) TcpConnector(PhantomData)
} }
@@ -54,7 +54,7 @@ impl<T: Address> ServiceFactory for TcpConnectorFactory<T> {
} }
} }
/// Tcp connector service /// TCP connector service
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct TcpConnector<T>(PhantomData<T>); pub struct TcpConnector<T>(PhantomData<T>);
@@ -74,6 +74,7 @@ impl<T: Address> Service for TcpConnector<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
#[allow(clippy::type_complexity)]
type Future = Either<TcpConnectorResponse<T>, Ready<Result<Self::Response, Self::Error>>>; type Future = Either<TcpConnectorResponse<T>, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@@ -94,7 +95,7 @@ impl<T: Address> Service for TcpConnector<T> {
} }
#[doc(hidden)] #[doc(hidden)]
/// Tcp stream connector response future /// TCP stream connector response future
pub struct TcpConnectorResponse<T> { pub struct TcpConnectorResponse<T> {
req: Option<T>, req: Option<T>,
port: u16, port: u16,

View File

@@ -20,7 +20,7 @@ pub enum ConnectError {
#[display(fmt = "Connector received `Connect` method with unresolved host")] #[display(fmt = "Connector received `Connect` method with unresolved host")]
Unresolved, Unresolved,
/// Connection io error /// Connection IO error
#[display(fmt = "{}", _0)] #[display(fmt = "{}", _0)]
Io(io::Error), Io(io::Error),
} }

View File

@@ -1,11 +1,11 @@
//! Actix connect - tcp connector service //! TCP connector service for Actix ecosystem.
//! //!
//! ## Package feature //! ## Package feature
//! //!
//! * `openssl` - enables ssl support via `openssl` crate //! * `openssl` - enables TLS support via `openssl` crate
//! * `rustls` - enables ssl support via `rustls` crate //! * `rustls` - enables TLS support via `rustls` crate
#![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)] #![deny(rust_2018_idioms)]
#![recursion_limit = "128"] #![recursion_limit = "128"]
#[macro_use] #[macro_use]
@@ -71,7 +71,7 @@ pub async fn start_default_resolver() -> Result<AsyncResolver, ConnectError> {
get_default_resolver().await get_default_resolver().await
} }
/// Create tcp connector service /// Create TCP connector service.
pub fn new_connector<T: Address + 'static>( pub fn new_connector<T: Address + 'static>(
resolver: AsyncResolver, resolver: AsyncResolver,
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> ) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
@@ -79,7 +79,7 @@ pub fn new_connector<T: Address + 'static>(
pipeline(Resolver::new(resolver)).and_then(TcpConnector::new()) pipeline(Resolver::new(resolver)).and_then(TcpConnector::new())
} }
/// Create tcp connector service /// Create TCP connector service factory.
pub fn new_connector_factory<T: Address + 'static>( pub fn new_connector_factory<T: Address + 'static>(
resolver: AsyncResolver, resolver: AsyncResolver,
) -> impl ServiceFactory< ) -> impl ServiceFactory<
@@ -92,14 +92,14 @@ pub fn new_connector_factory<T: Address + 'static>(
pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory::new()) pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory::new())
} }
/// Create connector service with default parameters /// Create connector service with default parameters.
pub fn default_connector<T: Address + 'static>( pub fn default_connector<T: Address + 'static>(
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> ) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone { + Clone {
pipeline(Resolver::default()).and_then(TcpConnector::new()) pipeline(Resolver::default()).and_then(TcpConnector::new())
} }
/// Create connector service factory with default parameters /// Create connector service factory with default parameters.
pub fn default_connector_factory<T: Address + 'static>() -> impl ServiceFactory< pub fn default_connector_factory<T: Address + 'static>() -> impl ServiceFactory<
Config = (), Config = (),
Request = Connect<T>, Request = Connect<T>,

View File

@@ -106,6 +106,7 @@ impl<T: Address> Service for Resolver<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connect<T>; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
#[allow(clippy::type_complexity)]
type Future = Either< type Future = Either<
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>, Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>,
Ready<Result<Connect<T>, Self::Error>>, Ready<Result<Connect<T>, Self::Error>>,

View File

@@ -114,6 +114,7 @@ enum ConnectState<T: Address> {
} }
impl<T: Address> ConnectState<T> { impl<T: Address> ConnectState<T> {
#[allow(clippy::type_complexity)]
fn poll( fn poll(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,

View File

@@ -17,7 +17,7 @@ use crate::{
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection, Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
}; };
/// Openssl connector factory /// OpenSSL connector factory
pub struct OpensslConnector<T, U> { pub struct OpensslConnector<T, U> {
connector: SslConnector, connector: SslConnector,
_t: PhantomData<(T, U)>, _t: PhantomData<(T, U)>,
@@ -97,6 +97,7 @@ where
type Request = Connection<T, U>; type Request = Connection<T, U>;
type Response = Connection<T, SslStream<U>>; type Response = Connection<T, SslStream<U>>;
type Error = io::Error; type Error = io::Error;
#[allow(clippy::type_complexity)]
type Future = Either<ConnectAsyncExt<T, U>, Ready<Result<Self::Response, Self::Error>>>; type Future = Either<ConnectAsyncExt<T, U>, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@@ -164,7 +165,7 @@ impl<T> OpensslConnectServiceFactory<T> {
} }
} }
/// Construct new connect service with custom dns resolver /// Construct new connect service with custom DNS resolver
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self { pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
OpensslConnectServiceFactory { OpensslConnectServiceFactory {
tcp: ConnectServiceFactory::with_resolver(resolver), tcp: ConnectServiceFactory::with_resolver(resolver),
@@ -172,7 +173,7 @@ impl<T> OpensslConnectServiceFactory<T> {
} }
} }
/// Construct openssl connect service /// Construct OpenSSL connect service
pub fn service(&self) -> OpensslConnectService<T> { pub fn service(&self) -> OpensslConnectService<T> {
OpensslConnectService { OpensslConnectService {
tcp: self.tcp.service(), tcp: self.tcp.service(),

View File

@@ -88,9 +88,9 @@ async fn test_new_service() {
assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
} }
#[cfg(feature = "openssl")] #[cfg(all(feature = "openssl", feature = "uri"))]
#[actix_rt::test] #[actix_rt::test]
async fn test_uri() { async fn test_openssl_uri() {
use std::convert::TryFrom; use std::convert::TryFrom;
let srv = TestServer::with(|| { let srv = TestServer::with(|| {
@@ -107,7 +107,7 @@ async fn test_uri() {
assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
} }
#[cfg(feature = "rustls")] #[cfg(all(feature = "rustls", feature = "uri"))]
#[actix_rt::test] #[actix_rt::test]
async fn test_rustls_uri() { async fn test_rustls_uri() {
use std::convert::TryFrom; use std::convert::TryFrom;

View File

@@ -1,5 +1,11 @@
# Changes # Changes
## Unreleased - 2020-xx-xx
### Added
* Add `System::attach_to_tokio` method. [#173]
## [1.1.1] - 2020-04-30 ## [1.1.1] - 2020-04-30
### Fixed ### Fixed

View File

@@ -23,3 +23,6 @@ futures-util = { version = "0.3.4", default-features = false, features = ["alloc
copyless = "0.1.4" copyless = "0.1.4"
smallvec = "1" smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
[dev-dependencies]
tokio = { version = "0.2.6", features = ["full"] }

View File

@@ -137,7 +137,7 @@ impl AsyncSystemRunner {
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}; };
Arbiter::stop_system(); Arbiter::stop_system();
return res; res
} }
}) })
.flatten() .flatten()

View File

@@ -57,10 +57,59 @@ impl System {
Self::builder().name(name).build() Self::builder().name(name).build()
} }
#[allow(clippy::new_ret_no_self)] /// Create new system using provided tokio `LocalSet`.
/// Create new system using provided tokio Handle.
/// ///
/// This method panics if it can not spawn system arbiter /// This method panics if it can not spawn system arbiter
///
/// Note: This method uses provided `LocalSet` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by [`tokio_compat`]).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Examples
///
/// ```
/// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let mut runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
///
/// let actix_system_task = LocalSet::new();
/// let sys = System::run_in_tokio("actix-main-system", &actix_system_task);
/// actix_system_task.spawn_local(sys);
///
/// let rest_operations = run_application();
/// runtime.block_on(actix_system_task.run_until(rest_operations));
/// ```
pub fn run_in_tokio<T: Into<String>>( pub fn run_in_tokio<T: Into<String>>(
name: T, name: T,
local: &LocalSet, local: &LocalSet,
@@ -71,6 +120,77 @@ impl System {
.run_nonblocking() .run_nonblocking()
} }
/// Consume the provided tokio Runtime and start the `System` in it.
/// This method will create a `LocalSet` object and occupy the current thread
/// for the created `System` exclusively. All the other asynchronous tasks that
/// should be executed as well must be aggregated into one future, provided as the last
/// argument to this method.
///
/// Note: This method uses provided `Runtime` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by `tokio_compat`).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Arguments
///
/// - `name`: Name of the System
/// - `runtime`: A tokio Runtime to run the system in.
/// - `rest_operations`: A future to be executed in the runtime along with the System.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
/// let rest_operations = run_application();
/// System::attach_to_tokio("actix-main-system", runtime, rest_operations);
/// ```
pub fn attach_to_tokio<Fut, R>(
name: impl Into<String>,
mut runtime: tokio::runtime::Runtime,
rest_operations: Fut,
) -> R
where
Fut: std::future::Future<Output = R>,
{
let actix_system_task = LocalSet::new();
let sys = System::run_in_tokio(name.into(), &actix_system_task);
actix_system_task.spawn_local(sys);
runtime.block_on(actix_system_task.run_until(rest_operations))
}
/// Get current running system. /// Get current running system.
pub fn current() -> System { pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() { CURRENT.with(|cell| match *cell.borrow() {

View File

@@ -1,210 +1,128 @@
# Changes # Changes
## [1.0.3] - 2020-05-19 ## Unreleased - 2020-xx-xx
### Changed
## 1.0.4 - 2020-09-12
* Update actix-codec to 0.3.0.
* Workers must be greater than 0. [#167]
[#167]: https://github.com/actix/actix-net/pull/167
## 1.0.3 - 2020-05-19
* Replace deprecated `net2` crate with `socket2` [#140] * Replace deprecated `net2` crate with `socket2` [#140]
[#140]: https://github.com/actix/actix-net/pull/140 [#140]: https://github.com/actix/actix-net/pull/140
## [1.0.2] - 2020-02-26
### Fixed
## 1.0.2 - 2020-02-26
* Avoid error by calling `reregister()` on Windows [#103] * Avoid error by calling `reregister()` on Windows [#103]
[#103]: https://github.com/actix/actix-net/pull/103 [#103]: https://github.com/actix/actix-net/pull/103
## [1.0.1] - 2019-12-29
### Changed
## 1.0.1 - 2019-12-29
* Rename `.start()` method to `.run()` * Rename `.start()` method to `.run()`
## [1.0.0] - 2019-12-11
### Changed
## 1.0.0 - 2019-12-11
* Use actix-net releases * Use actix-net releases
## [1.0.0-alpha.4] - 2019-12-08 ## 1.0.0-alpha.4 - 2019-12-08
### Changed
* Use actix-service 1.0.0-alpha.4 * Use actix-service 1.0.0-alpha.4
## [1.0.0-alpha.3] - 2019-12-07
### Changed
## 1.0.0-alpha.3 - 2019-12-07
* Migrate to tokio 0.2 * Migrate to tokio 0.2
### Fixed
* Fix compilation on non-unix platforms * Fix compilation on non-unix platforms
* Better handling server configuration * Better handling server configuration
## [1.0.0-alpha.2] - 2019-12-02 ## 1.0.0-alpha.2 - 2019-12-02
### Changed
* Simplify server service (remove actix-server-config) * Simplify server service (remove actix-server-config)
* Allow to wait on `Server` until server stops * Allow to wait on `Server` until server stops
## [0.8.0-alpha.1] - 2019-11-22 ## 0.8.0-alpha.1 - 2019-11-22
### Changed
* Migrate to `std::future` * Migrate to `std::future`
## [0.7.0] - 2019-10-04 ## 0.7.0 - 2019-10-04
### Changed
* Update `rustls` to 0.16 * Update `rustls` to 0.16
* Minimum required Rust version upped to 1.37.0 * Minimum required Rust version upped to 1.37.0
## [0.6.1] - 2019-09-25 ## 0.6.1 - 2019-09-25
### Added
* Add UDS listening support to `ServerBuilder` * Add UDS listening support to `ServerBuilder`
## [0.6.0] - 2019-07-18 ## 0.6.0 - 2019-07-18
### Added
* Support Unix domain sockets #3 * Support Unix domain sockets #3
## [0.5.1] - 2019-05-18 ## 0.5.1 - 2019-05-18
### Changed
* ServerBuilder::shutdown_timeout() accepts u64 * ServerBuilder::shutdown_timeout() accepts u64
## [0.5.0] - 2019-05-12 ## 0.5.0 - 2019-05-12
### Added
* Add `Debug` impl for `SslError` * Add `Debug` impl for `SslError`
* Derive debug for `Server` and `ServerCommand` * Derive debug for `Server` and `ServerCommand`
### Changed
* Upgrade to actix-service 0.4 * Upgrade to actix-service 0.4
## [0.4.3] - 2019-04-16 ## 0.4.3 - 2019-04-16
### Added
* Re-export `IoStream` trait * Re-export `IoStream` trait
* Depend on `ssl` and `rust-tls` features from actix-server-config
### Changed
* Deppend on `ssl` and `rust-tls` features from actix-server-config
## [0.4.2] - 2019-03-30 ## 0.4.2 - 2019-03-30
### Fixed
* Fix SIGINT force shutdown * Fix SIGINT force shutdown
## [0.4.1] - 2019-03-14 ## 0.4.1 - 2019-03-14
### Added
* `SystemRuntime::on_start()` - allow to run future before server service initialization * `SystemRuntime::on_start()` - allow to run future before server service initialization
## [0.4.0] - 2019-03-12 ## 0.4.0 - 2019-03-12
### Changed
* Use `ServerConfig` for service factory * Use `ServerConfig` for service factory
* Wrap tcp socket to `Io` type * Wrap tcp socket to `Io` type
* Upgrade actix-service * Upgrade actix-service
## [0.3.1] - 2019-03-04 ## 0.3.1 - 2019-03-04
### Added
* Add `ServerBuilder::maxconnrate` sets the maximum per-worker number of concurrent connections * Add `ServerBuilder::maxconnrate` sets the maximum per-worker number of concurrent connections
* Add helper ssl error `SslError` * Add helper ssl error `SslError`
### Changed
* Rename `StreamServiceFactory` to `ServiceFactory` * Rename `StreamServiceFactory` to `ServiceFactory`
* Deprecate `StreamServiceFactory` * Deprecate `StreamServiceFactory`
## [0.3.0] - 2019-03-02 ## 0.3.0 - 2019-03-02
### Changed
* Use new `NewService` trait * Use new `NewService` trait
## [0.2.1] - 2019-02-09 ## 0.2.1 - 2019-02-09
### Changed
* Drop service response * Drop service response
## [0.2.0] - 2019-02-01 ## 0.2.0 - 2019-02-01
### Changed
* Migrate to actix-service 0.2 * Migrate to actix-service 0.2
* Updated rustls dependency * Updated rustls dependency
## [0.1.3] - 2018-12-21 ## 0.1.3 - 2018-12-21
### Fixed
* Fix max concurrent connections handling * Fix max concurrent connections handling
## [0.1.2] - 2018-12-12 ## 0.1.2 - 2018-12-12
### Changed
* rename ServiceConfig::rt() to ServiceConfig::apply() * rename ServiceConfig::rt() to ServiceConfig::apply()
### Fixed
* Fix back-pressure for concurrent ssl handshakes * Fix back-pressure for concurrent ssl handshakes
## [0.1.1] - 2018-12-11 ## 0.1.1 - 2018-12-11
* Fix signal handling on windows * Fix signal handling on windows
## [0.1.0] - 2018-12-09 ## 0.1.0 - 2018-12-09
* Move server to separate crate * Move server to separate crate

View File

@@ -1,8 +1,8 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "1.0.3" version = "1.0.4"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server" description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
@@ -11,7 +11,6 @@ categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config"] exclude = [".gitignore", ".cargo/config"]
edition = "2018" edition = "2018"
workspace = ".."
[lib] [lib]
name = "actix_server" name = "actix_server"
@@ -21,13 +20,13 @@ path = "src/lib.rs"
default = [] default = []
[dependencies] [dependencies]
actix-service = "1.0.1" actix-service = "1.0.6"
actix-rt = "1.0.0" actix-rt = "1.1.1"
actix-codec = "0.2.0" actix-codec = "0.3.0"
actix-utils = "1.0.4" actix-utils = "2.0.0"
log = "0.4" log = "0.4"
num_cpus = "1.11" num_cpus = "1.13"
mio = "0.6.19" mio = "0.6.19"
socket2 = "0.3" socket2 = "0.3"
futures-channel = { version = "0.3.4", default-features = false } futures-channel = { version = "0.3.4", default-features = false }

View File

@@ -72,8 +72,9 @@ impl ServerBuilder {
/// Set number of workers to start. /// Set number of workers to start.
/// ///
/// By default server uses number of available logical cpu as workers /// By default server uses number of available logical cpu as workers
/// count. /// count. Workers must be greater than 0.
pub fn workers(mut self, num: usize) -> Self { pub fn workers(mut self, num: usize) -> Self {
assert_ne!(num, 0, "workers must be greater than 0");
self.threads = num; self.threads = num;
self self
} }

View File

@@ -151,7 +151,7 @@ impl InternalServiceFactory for ConfiguredService {
)); ));
}; };
} }
return Ok(res); Ok(res)
} }
.boxed_local() .boxed_local()
} }
@@ -272,13 +272,13 @@ where
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
let fut = self.inner.new_service(()); let fut = self.inner.new_service(());
async move { async move {
return match fut.await { match fut.await {
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
Err(e) => { Err(e) => {
error!("Can not construct service: {:?}", e); error!("Can not construct service: {:?}", e);
Err(()) Err(())
} }
}; }
} }
.boxed_local() .boxed_local()
} }

View File

@@ -2,6 +2,6 @@
> Service trait and combinators for representing asynchronous request/response operations. > Service trait and combinators for representing asynchronous request/response operations.
See documentation for detailed explanations these components: [https://docs.rs/actix-service](docs). See documentation for detailed explanations these components: [https://docs.rs/actix-service][docs].
[docs]: https://docs.rs/actix-service [docs]: https://docs.rs/actix-service

View File

@@ -1,39 +1,37 @@
# Changes # Changes
## Unreleased ## Unreleased - 2020-xx-xx
## 2.0.0 - 2020-09-03
* `nativetls::NativeTlsAcceptor` is renamed to `nativetls::Acceptor`.
* Where possible, "SSL" terminology is replaced with "TLS".
* `SslError` is renamed to `TlsError`.
* `TlsError::Ssl` enum variant is renamed to `TlsError::Tls`.
* `max_concurrent_ssl_connect` is renamed to `max_concurrent_tls_connect`.
## 2.0.0-alpha.2 - 2020-08-17 ## 2.0.0-alpha.2 - 2020-08-17
### Changed
* Update `rustls` dependency to 0.18 * Update `rustls` dependency to 0.18
* Update `tokio-rustls` dependency to 0.14 * Update `tokio-rustls` dependency to 0.14
* Update `webpki-roots` dependency to 0.20 * Update `webpki-roots` dependency to 0.20
## [2.0.0-alpha.1] - 2020-03-03 ## [2.0.0-alpha.1] - 2020-03-03
### Changed
* Update `rustls` dependency to 0.17 * Update `rustls` dependency to 0.17
* Update `tokio-rustls` dependency to 0.13 * Update `tokio-rustls` dependency to 0.13
* Update `webpki-roots` dependency to 0.19 * Update `webpki-roots` dependency to 0.19
## [1.0.0] - 2019-12-11
## [1.0.0] - 2019-12-11
* 1.0.0 release * 1.0.0 release
## [1.0.0-alpha.3] - 2019-12-07 ## [1.0.0-alpha.3] - 2019-12-07
### Changed
* Migrate to tokio 0.2 * Migrate to tokio 0.2
* Enable rustls acceptor service * Enable rustls acceptor service
* Enable native-tls acceptor service * Enable native-tls acceptor service
## [1.0.0-alpha.1] - 2019-12-02
* Split openssl accetor from actix-server package ## [1.0.0-alpha.1] - 2019-12-02
* Split openssl acceptor from actix-server package

View File

@@ -1,16 +1,15 @@
[package] [package]
name = "actix-tls" name = "actix-tls"
version = "2.0.0-alpha.2" version = "2.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix tls services" description = "TLS acceptor services for Actix ecosystem."
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "tls", "ssl"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tls/" documentation = "https://docs.rs/actix-tls/"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
workspace = ".."
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["openssl", "rustls", "nativetls"] features = ["openssl", "rustls", "nativetls"]
@@ -33,20 +32,17 @@ nativetls = ["native-tls", "tokio-tls"]
[dependencies] [dependencies]
actix-service = "1.0.0" actix-service = "1.0.0"
actix-codec = "0.2.0" actix-codec = "0.3.0"
actix-utils = "1.0.0" actix-utils = "2.0.0"
actix-rt = "1.0.0"
derive_more = "0.99.2"
either = "1.5.2"
futures-util = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false }
log = "0.4"
# openssl # openssl
open-ssl = { version="0.10", package = "openssl", optional = true } open-ssl = { package = "openssl", version = "0.10", optional = true }
tokio-openssl = { version = "0.4.0", optional = true } tokio-openssl = { version = "0.4.0", optional = true }
# rustls # rustls
rust-tls = { version = "0.18.0", package = "rustls", optional = true } rust-tls = { package = "rustls", version = "0.18.0", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.20", optional = true } webpki-roots = { version = "0.20", optional = true }
tokio-rustls = { version = "0.14.0", optional = true } tokio-rustls = { version = "0.14.0", optional = true }
@@ -57,4 +53,4 @@ tokio-tls = { version="0.3", optional = true }
[dev-dependencies] [dev-dependencies]
bytes = "0.5" bytes = "0.5"
actix-testing = { version="1.0.0" } actix-testing = "1.0.0"

View File

@@ -1,6 +1,11 @@
//! SSL Services //! TLS acceptor services for Actix ecosystem.
#![deny(rust_2018_idioms, warnings)] //!
#![allow(clippy::type_complexity)] //! ## Crate Features
//! * `openssl` - TLS acceptor using the `openssl` crate.
//! * `rustls` - TLS acceptor using the `rustls` crate.
//! * `nativetls` - TLS acceptor using the `native-tls` crate.
#![deny(rust_2018_idioms)]
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@@ -15,25 +20,25 @@ pub mod rustls;
#[cfg(feature = "nativetls")] #[cfg(feature = "nativetls")]
pub mod nativetls; pub mod nativetls;
/// Sets the maximum per-worker concurrent ssl connection establish process.
///
/// All listeners will stop accepting connections when this limit is
/// reached. It can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn max_concurrent_ssl_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed);
}
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256); pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
thread_local! { thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed)); static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
} }
/// Ssl error combinded with service error. /// Sets the maximum per-worker concurrent TLS connection limit.
///
/// All listeners will stop accepting connections when this limit is reached.
/// It can be used to regulate the global TLS CPU usage.
///
/// By default, the connection limit is 256.
pub fn max_concurrent_tls_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed);
}
/// TLS error combined with service error.
#[derive(Debug)] #[derive(Debug)]
pub enum SslError<E1, E2> { pub enum TlsError<E1, E2> {
Ssl(E1), Tls(E1),
Service(E2), Service(E2),
} }

View File

@@ -5,34 +5,35 @@ use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_util::future::{self, FutureExt, LocalBoxFuture, TryFutureExt}; use futures_util::future::{self, FutureExt, LocalBoxFuture, TryFutureExt};
pub use native_tls::Error; pub use native_tls::Error;
pub use tokio_tls::{TlsAcceptor, TlsStream}; pub use tokio_tls::{TlsAcceptor, TlsStream};
use crate::MAX_CONN_COUNTER; use crate::MAX_CONN_COUNTER;
/// Support `SSL` connections via native-tls package /// Accept TLS connections via `native-tls` package.
/// ///
/// `tls` feature enables `NativeTlsAcceptor` type /// `nativetls` feature enables this `Acceptor` type.
pub struct NativeTlsAcceptor<T> { pub struct Acceptor<T> {
acceptor: TlsAcceptor, acceptor: TlsAcceptor,
io: PhantomData<T>, io: PhantomData<T>,
} }
impl<T> NativeTlsAcceptor<T> impl<T> Acceptor<T>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
{ {
/// Create `NativeTlsAcceptor` instance /// Create `native-tls` based `Acceptor` service factory.
#[inline] #[inline]
pub fn new(acceptor: TlsAcceptor) -> Self { pub fn new(acceptor: TlsAcceptor) -> Self {
NativeTlsAcceptor { Acceptor {
acceptor, acceptor,
io: PhantomData, io: PhantomData,
} }
} }
} }
impl<T> Clone for NativeTlsAcceptor<T> { impl<T> Clone for Acceptor<T> {
#[inline] #[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
@@ -42,7 +43,7 @@ impl<T> Clone for NativeTlsAcceptor<T> {
} }
} }
impl<T> ServiceFactory for NativeTlsAcceptor<T> impl<T> ServiceFactory for Acceptor<T>
where where
T: AsyncRead + AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
@@ -104,8 +105,7 @@ where
let this = self.clone(); let this = self.clone();
async move { this.acceptor.accept(req).await } async move { this.acceptor.accept(req).await }
.map_ok(move |io| { .map_ok(move |io| {
// Required to preserve `CounterGuard` until `Self::Future` // Required to preserve `CounterGuard` until `Self::Future` is completely resolved.
// is completely resolved.
let _ = guard; let _ = guard;
io io
}) })

View File

@@ -3,26 +3,27 @@ use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub use open_ssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
pub use tokio_openssl::{HandshakeError, SslStream};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard}; use actix_utils::counter::{Counter, CounterGuard};
use futures_util::future::{ok, FutureExt, LocalBoxFuture, Ready}; use futures_util::future::{ok, FutureExt, LocalBoxFuture, Ready};
pub use open_ssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
pub use tokio_openssl::{HandshakeError, SslStream};
use crate::MAX_CONN_COUNTER; use crate::MAX_CONN_COUNTER;
/// Support `TLS` server connections via openssl package /// Accept TLS connections via `openssl` package.
/// ///
/// `openssl` feature enables `Acceptor` type /// `openssl` feature enables this `Acceptor` type.
pub struct Acceptor<T: AsyncRead + AsyncWrite> { pub struct Acceptor<T: AsyncRead + AsyncWrite> {
acceptor: SslAcceptor, acceptor: SslAcceptor,
io: PhantomData<T>, io: PhantomData<T>,
} }
impl<T: AsyncRead + AsyncWrite> Acceptor<T> { impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
/// Create default `OpensslAcceptor` /// Create OpenSSL based `Acceptor` service factory.
#[inline]
pub fn new(acceptor: SslAcceptor) -> Self { pub fn new(acceptor: SslAcceptor) -> Self {
Acceptor { Acceptor {
acceptor, acceptor,
@@ -32,6 +33,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
} }
impl<T: AsyncRead + AsyncWrite> Clone for Acceptor<T> { impl<T: AsyncRead + AsyncWrite> Clone for Acceptor<T> {
#[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),

View File

@@ -17,16 +17,17 @@ pub use webpki_roots::TLS_SERVER_ROOTS;
use crate::MAX_CONN_COUNTER; use crate::MAX_CONN_COUNTER;
/// Support `SSL` connections via rustls package /// Accept TLS connections via `rustls` package.
/// ///
/// `rust-tls` feature enables `RustlsAcceptor` type /// `rustls` feature enables this `Acceptor` type.
pub struct Acceptor<T> { pub struct Acceptor<T> {
config: Arc<ServerConfig>, config: Arc<ServerConfig>,
io: PhantomData<T>, io: PhantomData<T>,
} }
impl<T: AsyncRead + AsyncWrite> Acceptor<T> { impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
/// Create rustls based `Acceptor` service factory /// Create Rustls based `Acceptor` service factory.
#[inline]
pub fn new(config: ServerConfig) -> Self { pub fn new(config: ServerConfig) -> Self {
Acceptor { Acceptor {
config: Arc::new(config), config: Arc::new(config),
@@ -36,6 +37,7 @@ impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
} }
impl<T> Clone for Acceptor<T> { impl<T> Clone for Acceptor<T> {
#[inline]
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
config: self.config.clone(), config: self.config.clone(),
@@ -65,7 +67,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> ServiceFactory for Acceptor<T> {
} }
} }
/// RusTLS based `Acceptor` service /// Rustls based `Acceptor` service
pub struct AcceptorService<T> { pub struct AcceptorService<T> {
acceptor: TlsAcceptor, acceptor: TlsAcceptor,
io: PhantomData<T>, io: PhantomData<T>,

View File

@@ -2,8 +2,13 @@
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
## 2.0.0 - 2020-08-23
* No changes from beta 1.
## 2.0.0-beta.1 - 2020-08-19
* Upgrade `tokio-util` to `0.3`. * Upgrade `tokio-util` to `0.3`.
* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`. * Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`.
* Rename method to correctly spelled `LocalWaker::is_registered`.
## [1.0.6] - 2020-01-08 ## [1.0.6] - 2020-01-08

View File

@@ -1,8 +1,8 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "1.0.6" version = "2.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services" description = "Various network related services and utilities for the Actix ecosystem."
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
@@ -16,15 +16,15 @@ name = "actix_utils"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "1.0.1" actix-codec = "0.3.0"
actix-rt = "1.0.0" actix-rt = "1.1.1"
actix-codec = "0.2.0" actix-service = "1.0.6"
bitflags = "1.2" bitflags = "1.2.1"
bytes = "0.5.3" bytes = "0.5.3"
either = "1.5.3" either = "1.5.3"
futures-channel = { version = "0.3.4", default-features = false } futures-channel = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false }
pin-project = "0.4.17"
log = "0.4" log = "0.4"
pin-project = "0.4.17"
slab = "0.4" slab = "0.4"

View File

@@ -7,7 +7,7 @@ use crate::task::LocalWaker;
#[derive(Clone)] #[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number /// Simple counter with ability to notify task on reaching specific number
/// ///
/// Counter could be cloned, total ncount is shared across all clones. /// Counter could be cloned, total n-count is shared across all clones.
pub struct Counter(Rc<CounterInner>); pub struct Counter(Rc<CounterInner>);
struct CounterInner { struct CounterInner {

View File

@@ -1,5 +1,7 @@
//! Framed dispatcher service and related utilities //! Framed dispatcher service and related utilities
#![allow(type_alias_bounds)] #![allow(type_alias_bounds)]
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, mem}; use std::{fmt, mem};

View File

@@ -152,7 +152,7 @@ mod tests {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_newtransform() { async fn test_new_transform() {
let wait_time = Duration::from_millis(50); let wait_time = Duration::from_millis(50);
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time)))); let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));

View File

@@ -1,11 +1,12 @@
//! Actix utils - various helper services //! Actix utils - various helper services
#![deny(rust_2018_idioms)] #![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
pub mod condition; pub mod condition;
pub mod counter; pub mod counter;
pub mod dispatcher;
pub mod either; pub mod either;
pub mod framed;
pub mod inflight; pub mod inflight;
pub mod keepalive; pub mod keepalive;
pub mod mpsc; pub mod mpsc;

View File

@@ -170,7 +170,7 @@ pub struct PReceiver<T> {
inner: Rc<RefCell<Slab<PoolInner<T>>>>, inner: Rc<RefCell<Slab<PoolInner<T>>>>,
} }
// The oneshots do not ever project Pin to the inner T // The one-shots do not ever project Pin to the inner T
impl<T> Unpin for PReceiver<T> {} impl<T> Unpin for PReceiver<T> {}
impl<T> Unpin for PSender<T> {} impl<T> Unpin for PSender<T> {}

View File

@@ -231,7 +231,7 @@ mod tests {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_inorder() { async fn test_in_order() {
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel(); let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel(); let (tx3, rx3) = oneshot::channel();

View File

@@ -36,7 +36,7 @@ impl LocalWaker {
#[inline] #[inline]
/// Check if waker has been registered. /// Check if waker has been registered.
pub fn is_registed(&self) -> bool { pub fn is_registered(&self) -> bool {
unsafe { (*self.waker.get()).is_some() } unsafe { (*self.waker.get()).is_some() }
} }

View File

@@ -173,7 +173,7 @@ mod tests {
/// ///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value. /// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[actix_rt::test] #[actix_rt::test]
async fn lowres_time_service_time_does_not_immediately_change() { async fn low_res_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50); let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution); let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now()); assert_eq!(time_service.now(), time_service.now());
@@ -210,7 +210,7 @@ mod tests {
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values /// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval. /// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test] #[actix_rt::test]
async fn lowres_time_service_time_updates_after_resolution_interval() { async fn low_res_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100); let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300); let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution); let time_service = LowResTimeService::with(resolution);

View File

@@ -220,7 +220,7 @@ mod tests {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_timeout_newservice() { async fn test_timeout_new_service() {
let resolution = Duration::from_millis(100); let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(500); let wait_time = Duration::from_millis(500);

View File

@@ -624,7 +624,7 @@ impl ResourceDef {
} }
if !for_prefix { if !for_prefix {
re.push_str("$"); re.push('$');
} }
(re, elems, true, pattern.chars().count()) (re, elems, true, pattern.chars().count())
} }