1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 22:51:07 +01:00

move server to separate crate

This commit is contained in:
Nikolay Kim 2018-12-09 22:14:29 -08:00
parent ffb07c8884
commit 8ad93f4838
21 changed files with 308 additions and 149 deletions

View File

@ -33,7 +33,10 @@ script:
if [[ "$TRAVIS_RUST_VERSION" != "nightly" ]]; then if [[ "$TRAVIS_RUST_VERSION" != "nightly" ]]; then
cargo clean cargo clean
cargo test --features="ssl,tls,rust-tls" -- --nocapture cargo test --features="ssl,tls,rust-tls" -- --nocapture
cd actix-codec && cargo test
cd actix-service && cargo test cd actix-service && cargo test
cd actix-server && cargo test --features="ssl,tls,rust-tls" -- --nocapture
cd actix-rt && cargo test
fi fi
- | - |
if [[ "$TRAVIS_RUST_VERSION" == "nightly" ]]; then if [[ "$TRAVIS_RUST_VERSION" == "nightly" ]]; then

View File

@ -18,6 +18,7 @@ members = [
"./", "./",
"actix-codec", "actix-codec",
"actix-service", "actix-service",
"actix-server",
"actix-rt", "actix-rt",
] ]
@ -36,15 +37,9 @@ path = "src/lib.rs"
[features] [features]
default = [] default = []
# tls
tls = ["native-tls"]
# openssl # openssl
ssl = ["openssl", "tokio-openssl"] ssl = ["openssl", "tokio-openssl"]
# rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
cell = [] cell = []
[dependencies] [dependencies]
@ -70,18 +65,9 @@ tokio-signal = "0.2"
trust-dns-proto = "^0.5.0" trust-dns-proto = "^0.5.0"
trust-dns-resolver = "^0.10.0" trust-dns-resolver = "^0.10.0"
# native-tls
native-tls = { version="0.2", optional = true }
# openssl # openssl
openssl = { version="0.10", optional = true } openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.2", optional = true } tokio-openssl = { version="0.3", optional = true }
#rustls
rustls = { version = "^0.14", optional = true }
tokio-rustls = { version = "^0.8", optional = true }
webpki = { version = "0.18", optional = true }
webpki-roots = { version = "0.15", optional = true }
[dev-dependencies] [dev-dependencies]
env_logger = "0.5" env_logger = "0.5"

View File

@ -2,4 +2,4 @@
## [0.1.0] - 2018-12-09 ## [0.1.0] - 2018-12-09
* Move codec to separate crate * Initial release

View File

@ -1,7 +1,3 @@
#![allow(
clippy::borrow_interior_mutable_const,
clippy::declare_interior_mutable_const
)]
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@ -21,7 +17,7 @@ thread_local!(
static Q: RefCell<Vec<Box<Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new()); static Q: RefCell<Vec<Box<Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
); );
pub(crate) const COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) enum ArbiterCommand { pub(crate) enum ArbiterCommand {
Stop, Stop,

5
actix-server/CHANGES.md Normal file
View File

@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2018-12-09
* Move server to separate crate

74
actix-server/Cargo.toml Normal file
View File

@ -0,0 +1,74 @@
[package]
name = "actix-server"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
readme = "README.md"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-server/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = "../"
[package.metadata.docs.rs]
features = ["ssl", "tls", "rust-tls"]
[lib]
name = "actix_server"
path = "src/lib.rs"
[features]
default = []
# tls
tls = ["native-tls"]
# openssl
ssl = ["openssl", "tokio-openssl"]
# rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
[dependencies]
actix-service = "0.1.1"
actix-rt = { path = "../actix-rt" }
log = "0.4"
num_cpus = "1.0"
# io
mio = "^0.6.13"
net2 = "0.2"
bytes = "0.4"
futures = "0.1"
slab = "0.4"
tokio-io = "0.1"
tokio-tcp = "0.1"
tokio-timer = "0.2"
tokio-reactor = "0.1"
tokio-signal = "0.2"
# native-tls
native-tls = { version="0.2", optional = true }
# openssl
openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true }
#rustls
rustls = { version = "^0.14", optional = true }
tokio-rustls = { version = "^0.8", optional = true }
webpki = { version = "0.18", optional = true }
webpki-roots = { version = "0.15", optional = true }
[dev-dependencies]
env_logger = "0.5"
[profile.release]
lto = true
opt-level = 3
codegen-units = 1

View File

@ -371,7 +371,7 @@ impl Accept {
match self.workers[self.next].send(msg) { match self.workers[self.next].send(msg) {
Ok(_) => (), Ok(_) => (),
Err(tmp) => { Err(tmp) => {
let _ = self.srv.worker_died(self.workers[self.next].idx); self.srv.worker_died(self.workers[self.next].idx);
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {
@ -397,7 +397,7 @@ impl Accept {
return; return;
} }
Err(tmp) => { Err(tmp) => {
let _ = self.srv.worker_died(self.workers[self.next].idx); self.srv.worker_died(self.workers[self.next].idx);
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {

View File

@ -0,0 +1,78 @@
use std::cell::Cell;
use std::rc::Rc;
use futures::task::AtomicTask;
#[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number
///
/// Counter could be cloned, total ncount is shared across all clones.
pub struct Counter(Rc<CounterInner>);
struct CounterInner {
count: Cell<usize>,
capacity: usize,
task: AtomicTask,
}
impl Counter {
/// Create `Counter` instance and set max value.
pub fn new(capacity: usize) -> Self {
Counter(Rc::new(CounterInner {
capacity,
count: Cell::new(0),
task: AtomicTask::new(),
}))
}
pub fn get(&self) -> CounterGuard {
CounterGuard::new(self.0.clone())
}
/// Check if counter is not at capacity
pub fn available(&self) -> bool {
self.0.available()
}
/// Get total number of acquired counts
pub fn total(&self) -> usize {
self.0.count.get()
}
}
pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard {
fn new(inner: Rc<CounterInner>) -> Self {
inner.inc();
CounterGuard(inner)
}
}
impl Drop for CounterGuard {
fn drop(&mut self) {
self.0.dec();
}
}
impl CounterInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.capacity {
self.task.register();
}
}
fn dec(&self) {
let num = self.count.get();
self.count.set(num - 1);
if num == self.capacity {
self.task.notify();
}
}
fn available(&self) -> bool {
self.count.get() < self.capacity
}
}

View File

@ -3,8 +3,10 @@
mod accept; mod accept;
mod builder; mod builder;
mod config; mod config;
mod counter;
mod server; mod server;
mod services; mod services;
pub mod ssl;
mod worker; mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;

View File

@ -0,0 +1,35 @@
//! SSL Services
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::counter::Counter;
#[cfg(feature = "ssl")]
mod openssl;
#[cfg(feature = "ssl")]
pub use self::openssl::OpensslAcceptor;
#[cfg(feature = "tls")]
mod nativetls;
#[cfg(feature = "tls")]
pub use self::nativetls::{NativeTlsAcceptor, TlsStream};
#[cfg(feature = "rust-tls")]
mod rustls;
#[cfg(feature = "rust-tls")]
pub use self::rustls::RustlsAcceptor;
/// Sets the maximum per-worker concurrent ssl connection establish process.
///
/// All listeners will stop accepting connections when this limit is
/// reached. It can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn max_concurrent_ssl_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed);
}
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
}

View File

@ -21,7 +21,7 @@ impl<T: AsyncRead + AsyncWrite> NativeTlsAcceptor<T> {
/// Create `NativeTlsAcceptor` instance /// Create `NativeTlsAcceptor` instance
pub fn new(acceptor: TlsAcceptor) -> Self { pub fn new(acceptor: TlsAcceptor) -> Self {
NativeTlsAcceptor { NativeTlsAcceptor {
acceptor: acceptor.into(), acceptor,
io: PhantomData, io: PhantomData,
} }
} }

View File

@ -0,0 +1,99 @@
use std::marker::PhantomData;
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use openssl::ssl::{HandshakeError, SslAcceptor};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream};
use super::MAX_CONN_COUNTER;
use crate::counter::{Counter, CounterGuard};
/// Support `SSL` connections via openssl package
///
/// `ssl` feature enables `OpensslAcceptor` type
pub struct OpensslAcceptor<T> {
acceptor: SslAcceptor,
io: PhantomData<T>,
}
impl<T> OpensslAcceptor<T> {
/// Create default `OpensslAcceptor`
pub fn new(acceptor: SslAcceptor) -> Self {
OpensslAcceptor {
acceptor,
io: PhantomData,
}
}
}
impl<T: AsyncRead + AsyncWrite> Clone for OpensslAcceptor<T> {
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<T: AsyncRead + AsyncWrite> NewService<T> for OpensslAcceptor<T> {
type Response = SslStream<T>;
type Error = HandshakeError<T>;
type Service = OpensslAcceptorService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
MAX_CONN_COUNTER.with(|conns| {
ok(OpensslAcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
io: PhantomData,
})
})
}
}
pub struct OpensslAcceptorService<T> {
acceptor: SslAcceptor,
io: PhantomData<T>,
conns: Counter,
}
impl<T: AsyncRead + AsyncWrite> Service<T> for OpensslAcceptorService<T> {
type Response = SslStream<T>;
type Error = HandshakeError<T>;
type Future = OpensslAcceptorServiceFut<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.conns.available() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn call(&mut self, req: T) -> Self::Future {
OpensslAcceptorServiceFut {
_guard: self.conns.get(),
fut: SslAcceptorExt::accept_async(&self.acceptor, req),
}
}
}
pub struct OpensslAcceptorServiceFut<T>
where
T: AsyncRead + AsyncWrite,
{
fut: AcceptAsync<T>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite> Future for OpensslAcceptorServiceFut<T> {
type Item = SslStream<T>;
type Error = HandshakeError<T>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll()
}
}

View File

@ -9,10 +9,10 @@ use futures::{future, Async, Future, Poll, Stream};
use log::{error, info, trace}; use log::{error, info, trace};
use tokio_timer::{sleep, Delay}; use tokio_timer::{sleep, Delay};
use super::accept::AcceptNotify; use crate::accept::AcceptNotify;
use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use super::Token;
use crate::counter::Counter; use crate::counter::Counter;
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::Token;
pub(crate) struct WorkerCommand(Conn); pub(crate) struct WorkerCommand(Conn);
@ -30,7 +30,7 @@ pub(crate) struct Conn {
pub peer: Option<net::SocketAddr>, pub peer: Option<net::SocketAddr>,
} }
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections. /// Sets the maximum per-worker number of concurrent connections.
/// ///

View File

@ -21,7 +21,6 @@ pub mod framed;
pub mod inflight; pub mod inflight;
pub mod keepalive; pub mod keepalive;
pub mod resolver; pub mod resolver;
pub mod server;
pub mod ssl; pub mod ssl;
pub mod stream; pub mod stream;
pub mod time; pub mod time;

View File

@ -1,35 +1,6 @@
//! SSL Services //! SSL Services
use std::sync::atomic::{AtomicUsize, Ordering};
use super::counter::Counter;
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
mod openssl; mod openssl;
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
pub use self::openssl::{OpensslAcceptor, OpensslConnector}; pub use self::openssl::OpensslConnector;
#[cfg(feature = "tls")]
mod nativetls;
#[cfg(feature = "tls")]
pub use self::nativetls::{NativeTlsAcceptor, TlsStream};
#[cfg(feature = "rust-tls")]
mod rustls;
#[cfg(feature = "rust-tls")]
pub use self::rustls::RustlsAcceptor;
/// Sets the maximum per-worker concurrent ssl connection establish process.
///
/// All listeners will stop accepting connections when this limit is
/// reached. It can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn max_concurrent_ssl_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed);
}
pub(crate) const MAX_CONN: AtomicUsize = AtomicUsize::new(256);
thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
}

View File

@ -10,95 +10,6 @@ use super::MAX_CONN_COUNTER;
use crate::counter::{Counter, CounterGuard}; use crate::counter::{Counter, CounterGuard};
use crate::resolver::RequestHost; use crate::resolver::RequestHost;
/// Support `SSL` connections via openssl package
///
/// `ssl` feature enables `OpensslAcceptor` type
pub struct OpensslAcceptor<T> {
acceptor: SslAcceptor,
io: PhantomData<T>,
}
impl<T> OpensslAcceptor<T> {
/// Create default `OpensslAcceptor`
pub fn new(acceptor: SslAcceptor) -> Self {
OpensslAcceptor {
acceptor,
io: PhantomData,
}
}
}
impl<T: AsyncRead + AsyncWrite> Clone for OpensslAcceptor<T> {
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<T: AsyncRead + AsyncWrite> NewService<T> for OpensslAcceptor<T> {
type Response = SslStream<T>;
type Error = Error;
type Service = OpensslAcceptorService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
MAX_CONN_COUNTER.with(|conns| {
ok(OpensslAcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
io: PhantomData,
})
})
}
}
pub struct OpensslAcceptorService<T> {
acceptor: SslAcceptor,
io: PhantomData<T>,
conns: Counter,
}
impl<T: AsyncRead + AsyncWrite> Service<T> for OpensslAcceptorService<T> {
type Response = SslStream<T>;
type Error = Error;
type Future = OpensslAcceptorServiceFut<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.conns.available() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn call(&mut self, req: T) -> Self::Future {
OpensslAcceptorServiceFut {
_guard: self.conns.get(),
fut: SslAcceptorExt::accept_async(&self.acceptor, req),
}
}
}
pub struct OpensslAcceptorServiceFut<T>
where
T: AsyncRead + AsyncWrite,
{
fut: AcceptAsync<T>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite> Future for OpensslAcceptorServiceFut<T> {
type Item = SslStream<T>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll()
}
}
/// Openssl connector factory /// Openssl connector factory
pub struct OpensslConnector<R, T, E> { pub struct OpensslConnector<R, T, E> {
connector: SslConnector, connector: SslConnector,