1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-26 19:47:43 +02:00

refactor server configuration and tls support

This commit is contained in:
Nikolay Kim
2019-12-02 11:30:27 +06:00
parent 16ff283fb2
commit 9fbe6a1f6d
21 changed files with 366 additions and 526 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "0.8.0-alpha.1"
version = "0.8.0-alpha.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"]
@ -13,23 +13,16 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = ".."
[package.metadata.docs.rs]
features = ["nativetls", "openssl", "rustls", "uds"]
[lib]
name = "actix_server"
path = "src/lib.rs"
[features]
default = []
nativetls = ["native-tls", "tokio-tls"]
openssl = ["open-ssl", "tokio-openssl", "actix-server-config/openssl"]
# rustls = ["rust-tls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rustls"]
[dependencies]
actix-service = "1.0.0-alpha.1"
actix-server-config = "0.3.0-alpha.1"
actix-rt = "1.0.0-alpha.1"
actix-rt = "1.0.0-alpha.2"
actix-codec = "0.2.0-alpha.1"
log = "0.4"
@ -38,28 +31,13 @@ mio = "0.6.19"
net2 = "0.2"
futures = "0.3.1"
slab = "0.4"
tokio-net = { version = "0.2.0-alpha.6", features = ["signal", "tcp", "uds"] }
tokio-net = { version = "0.2.0-alpha.6", features = ["signal", "tcp", "uds"] }
futures-core-preview = "0.3.0-alpha.19"
# unix domain sockets
mio-uds = { version = "0.6.7" }
# nativetls
native-tls = { version = "0.2", optional = true }
tokio-tls = { version = "0.3.0-alpha.6", optional = true }
# openssl
open-ssl = { version = "0.10", package = "openssl", optional = true }
tokio-openssl = { version = "0.4.0-alpha.6", optional = true }
# rustls
rust-tls = { version = "0.16.0", package = "rustls", optional = true }
# tokio-rustls = { version = "0.12.0-alpha.2", optional = true }
# tokio-rustls = { git = "https://github.com/quininer/tokio-rustls.git", branch = "tokio-0.2", optional = true }
webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.17", optional = true }
[dev-dependencies]
bytes = "0.4"
actix-codec = "0.2.0-alpha.1"

View File

@ -3,6 +3,7 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{io, mem, net};
use actix_rt::net::TcpStream;
use actix_rt::{spawn, time::delay, Arbiter, System};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::channel::oneshot;
@ -12,7 +13,6 @@ use futures::{ready, Future, FutureExt, Stream, StreamExt};
use log::{error, info};
use net2::TcpBuilder;
use num_cpus;
use tokio_net::tcp::TcpStream;
use crate::accept::{AcceptLoop, AcceptNotify, Command};
use crate::config::{ConfiguredService, ServiceConfig};
@ -21,7 +21,7 @@ use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::{ssl, Token};
use crate::Token;
/// Server builder
pub struct ServerBuilder {
@ -104,17 +104,6 @@ impl ServerBuilder {
self
}
/// Sets the maximum per-worker concurrent connection establish process.
///
/// All listeners will stop accepting connections when this limit is reached. It
/// can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn maxconnrate(self, num: usize) -> Self {
ssl::max_concurrent_ssl_connect(num);
self
}
/// Stop actix system.
pub fn system_exit(mut self) -> Self {
self.exit = true;
@ -191,7 +180,7 @@ impl ServerBuilder {
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<tokio_net::uds::UnixStream>,
F: ServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
@ -221,7 +210,7 @@ impl ServerBuilder {
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<tokio_net::uds::UnixStream>,
F: ServiceFactory<actix_rt::net::UnixStream>,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let token = self.token.next();

View File

@ -1,11 +1,10 @@
use std::collections::HashMap;
use std::{fmt, io, net};
use actix_server_config::{Io, ServerConfig};
use actix_rt::net::TcpStream;
use actix_service as actix;
use futures::future::{Future, FutureExt, LocalBoxFuture};
use log::error;
use tokio_net::tcp::TcpStream;
use super::builder::bind_addr;
use super::service::{
@ -113,8 +112,6 @@ impl InternalServiceFactory for ConfiguredService {
self.rt.configure(&mut rt);
rt.validate();
let names = self.names.clone();
// construct services
async move {
let services = rt.services;
@ -124,9 +121,7 @@ impl InternalServiceFactory for ConfiguredService {
}
let mut res = vec![];
for (token, ns) in services.into_iter() {
let config = ServerConfig::new(names[&token].1);
let newserv = ns.new_service(&config);
let newserv = ns.new_service(&());
match newserv.await {
Ok(serv) => {
res.push((token, serv));
@ -196,7 +191,7 @@ impl ServiceRuntime {
pub fn service<T, F>(&mut self, name: &str, service: F)
where
F: actix::IntoServiceFactory<T>,
T: actix::ServiceFactory<Config = ServerConfig, Request = Io<TcpStream>> + 'static,
T: actix::ServiceFactory<Config = (), Request = TcpStream> + 'static,
T::Future: 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
@ -229,7 +224,7 @@ type BoxedNewService = Box<
Response = (),
Error = (),
InitError = (),
Config = ServerConfig,
Config = (),
Service = BoxedServerService,
Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
>,
@ -241,7 +236,7 @@ struct ServiceFactory<T> {
impl<T> actix::ServiceFactory for ServiceFactory<T>
where
T: actix::ServiceFactory<Config = ServerConfig, Request = Io<TcpStream>>,
T: actix::ServiceFactory<Config = (), Request = TcpStream>,
T::Future: 'static,
T::Service: 'static,
T::Error: 'static,
@ -251,11 +246,11 @@ where
type Response = ();
type Error = ();
type InitError = ();
type Config = ServerConfig;
type Config = ();
type Service = BoxedServerService;
type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
fn new_service(&self, cfg: &()) -> Self::Future {
let fut = self.inner.new_service(cfg);
async move {
return match fut.await {

View File

@ -8,11 +8,8 @@ mod server;
mod service;
mod signals;
mod socket;
pub mod ssl;
mod worker;
pub use actix_server_config::{Io, IoStream, Protocol, ServerConfig};
pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server;

View File

@ -4,7 +4,6 @@ use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::spawn;
use actix_server_config::{Io, ServerConfig};
use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
use futures::future::{err, ok, LocalBoxFuture, Ready};
use futures::{FutureExt, TryFutureExt};
@ -25,7 +24,7 @@ pub(crate) enum ServerMessage {
}
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: actix::ServiceFactory<Config = ServerConfig, Request = Io<Stream>>;
type Factory: actix::ServiceFactory<Config = (), Request = Stream>;
fn create(&self) -> Self::Factory;
}
@ -59,7 +58,7 @@ impl<T> StreamService<T> {
impl<T, I> Service for StreamService<T>
where
T: Service<Request = Io<I>>,
T: Service<Request = I>,
T::Future: 'static,
T::Error: 'static,
I: FromStream,
@ -81,7 +80,7 @@ where
});
if let Ok(stream) = stream {
let f = self.service.call(Io::new(stream));
let f = self.service.call(stream);
spawn(
async move {
let _ = f.await;
@ -149,11 +148,9 @@ where
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
let token = self.token;
let config = ServerConfig::new(self.addr);
self.inner
.create()
.new_service(&config)
.new_service(&())
.map_err(|_| ())
.map_ok(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner));
@ -180,7 +177,7 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
impl<F, T, I> ServiceFactory<I> for F
where
F: Fn() -> T + Send + Clone + 'static,
T: actix::ServiceFactory<Config = ServerConfig, Request = Io<I>>,
T: actix::ServiceFactory<Config = (), Request = I>,
I: FromStream,
{
type Factory = T;

View File

@ -23,9 +23,9 @@ pub(crate) enum Signal {
pub(crate) struct Signals {
srv: Server,
#[cfg(not(unix))]
stream: tokio_net::signal::CtrlC,
stream: actix_rt::signal::CtrlC,
#[cfg(unix)]
streams: Vec<(Signal, tokio_net::signal::unix::Signal)>,
streams: Vec<(Signal, actix_rt::signal::unix::Signal)>,
}
impl Signals {
@ -33,13 +33,13 @@ impl Signals {
actix_rt::spawn({
#[cfg(not(unix))]
{
let stream = tokio_net::signal::ctrl_c()?;
let stream = actix_rt::signal::ctrl_c()?;
Signals { srv, stream }
}
#[cfg(unix)]
{
use tokio_net::signal::unix;
use actix_rt::signal::unix;
let mut streams = Vec::new();

View File

@ -1,8 +1,9 @@
use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream;
use tokio_net::driver::Handle;
use tokio_net::tcp::TcpStream;
pub(crate) enum StdListener {
Tcp(net::TcpListener),
@ -161,12 +162,12 @@ impl FromStream for TcpStream {
}
#[cfg(all(unix))]
impl FromStream for tokio_net::uds::UnixStream {
impl FromStream for actix_rt::net::UnixStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
StdStream::Uds(stream) => {
tokio_net::uds::UnixStream::from_std(stream, &Handle::default())
actix_rt::net::UnixStream::from_std(stream, &Handle::default())
}
}
}

View File

@ -1,138 +0,0 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use futures::future::{ok, FutureExt, LocalBoxFuture, Ready};
use open_ssl::ssl::SslAcceptor;
use tokio_openssl::{HandshakeError, SslStream};
use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::{Io, Protocol, ServerConfig};
/// Support `SSL` connections via openssl package
///
/// `ssl` feature enables `OpensslAcceptor` type
pub struct OpensslAcceptor<T: AsyncRead + AsyncWrite, P = ()> {
acceptor: SslAcceptor,
io: PhantomData<(T, P)>,
}
impl<T: AsyncRead + AsyncWrite, P> OpensslAcceptor<T, P> {
/// Create default `OpensslAcceptor`
pub fn new(acceptor: SslAcceptor) -> Self {
OpensslAcceptor {
acceptor,
io: PhantomData,
}
}
}
impl<T: AsyncRead + AsyncWrite, P> Clone for OpensslAcceptor<T, P> {
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> ServiceFactory for OpensslAcceptor<T, P> {
type Request = Io<T, P>;
type Response = Io<SslStream<T>, P>;
type Error = HandshakeError<T>;
type Config = ServerConfig;
type Service = OpensslAcceptorService<T, P>;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
cfg.set_secure();
MAX_CONN_COUNTER.with(|conns| {
ok(OpensslAcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
io: PhantomData,
})
})
}
}
pub struct OpensslAcceptorService<T, P> {
acceptor: SslAcceptor,
conns: Counter,
io: PhantomData<(T, P)>,
}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> Service for OpensslAcceptorService<T, P> {
type Request = Io<T, P>;
type Response = Io<SslStream<T>, P>;
type Error = HandshakeError<T>;
type Future = OpensslAcceptorServiceFut<T, P>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let (io, params, _) = req.into_parts();
let acc = self.acceptor.clone();
OpensslAcceptorServiceFut {
_guard: self.conns.get(),
fut: async move {
let acc = acc;
tokio_openssl::accept(&acc, io).await
}
.boxed_local(),
params: Some(params),
}
}
}
pub struct OpensslAcceptorServiceFut<T, P>
where
T: AsyncRead + AsyncWrite,
{
fut: LocalBoxFuture<'static, Result<SslStream<T>, HandshakeError<T>>>,
params: Option<P>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite + Unpin, P> Unpin for OpensslAcceptorServiceFut<T, P> {}
impl<T: AsyncRead + AsyncWrite + Unpin, P> Future for OpensslAcceptorServiceFut<T, P> {
type Output = Result<Io<SslStream<T>, P>, HandshakeError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let io = futures::ready!(Pin::new(&mut this.fut).poll(cx))?;
let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() {
const H2: &[u8] = b"\x02h2";
const HTTP10: &[u8] = b"\x08http/1.0";
const HTTP11: &[u8] = b"\x08http/1.1";
if protos.windows(3).any(|window| window == H2) {
Protocol::Http2
} else if protos.windows(9).any(|window| window == HTTP11) {
Protocol::Http11
} else if protos.windows(9).any(|window| window == HTTP10) {
Protocol::Http10
} else {
Protocol::Unknown
}
} else {
Protocol::Unknown
};
Poll::Ready(Ok(Io::from_parts(io, this.params.take().unwrap(), proto)))
}
}