mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-12 16:47:05 +02:00
Compare commits
15 Commits
server-con
...
lets-encry
Author | SHA1 | Date | |
---|---|---|---|
|
bfa98627b4 | ||
|
2a26c87c36 | ||
|
e976758d92 | ||
|
e1ee3a1c32 | ||
|
3821d511d0 | ||
|
62e429cb0c | ||
|
a2643d475a | ||
|
34c259a8b5 | ||
|
8b398c3386 | ||
|
0baceb0e56 | ||
|
6be1f37f6c | ||
|
a742768feb | ||
|
f913872159 | ||
|
41145040e1 | ||
|
311bb14d97 |
@@ -2,7 +2,7 @@
|
||||
name = "actix-net"
|
||||
version = "0.3.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix net - framework for the compisible network services for Rust"
|
||||
description = "Actix net - framework for the composable network services for Rust"
|
||||
readme = "README.md"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
@@ -17,6 +17,7 @@ edition = "2018"
|
||||
members = [
|
||||
"actix-codec",
|
||||
"actix-connect",
|
||||
"actix-lets-encrypt",
|
||||
"actix-rt",
|
||||
"actix-service",
|
||||
"actix-server",
|
||||
|
@@ -1,5 +1,13 @@
|
||||
# Changes
|
||||
|
||||
## [0.2.3] - 2019-08-05
|
||||
|
||||
* Add `ConnectService` and `OpensslConnectService`
|
||||
|
||||
## [0.2.2] - 2019-07-24
|
||||
|
||||
* Add `rustls` support
|
||||
|
||||
## [0.2.1] - 2019-07-17
|
||||
|
||||
### Added
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-connect"
|
||||
version = "0.2.1"
|
||||
version = "0.2.3"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix Connector - tcp connector service"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
@@ -26,6 +26,9 @@ default = ["uri"]
|
||||
# openssl
|
||||
ssl = ["openssl", "tokio-openssl"]
|
||||
|
||||
#rustls
|
||||
rust-tls = ["rustls", "tokio-rustls", "webpki"]
|
||||
|
||||
# support http::Uri as connect address
|
||||
uri = ["http"]
|
||||
|
||||
@@ -46,6 +49,11 @@ trust-dns-resolver = { version="0.11.0", default-features = false }
|
||||
openssl = { version="0.10", optional = true }
|
||||
tokio-openssl = { version="0.3", optional = true }
|
||||
|
||||
#rustls
|
||||
rustls = { version = "0.15.2", optional = true }
|
||||
tokio-rustls = { version = "0.9.1", optional = true }
|
||||
webpki = { version = "0.19", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "0.4"
|
||||
actix-test-server = { version="0.2.2", features=["ssl"] }
|
||||
|
@@ -18,6 +18,17 @@ impl<T> TcpConnectorFactory<T> {
|
||||
pub fn new() -> Self {
|
||||
TcpConnectorFactory(PhantomData)
|
||||
}
|
||||
|
||||
/// Create tcp connector service
|
||||
pub fn service(&self) -> TcpConnector<T> {
|
||||
TcpConnector(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for TcpConnectorFactory<T> {
|
||||
fn default() -> Self {
|
||||
TcpConnectorFactory(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for TcpConnectorFactory<T> {
|
||||
@@ -36,7 +47,7 @@ impl<T: Address> NewService for TcpConnectorFactory<T> {
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(TcpConnector(PhantomData))
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -16,6 +16,7 @@ mod connect;
|
||||
mod connector;
|
||||
mod error;
|
||||
mod resolver;
|
||||
mod service;
|
||||
pub mod ssl;
|
||||
|
||||
#[cfg(feature = "uri")]
|
||||
@@ -29,6 +30,7 @@ pub use self::connect::{Address, Connect, Connection};
|
||||
pub use self::connector::{TcpConnector, TcpConnectorFactory};
|
||||
pub use self::error::ConnectError;
|
||||
pub use self::resolver::{Resolver, ResolverFactory};
|
||||
pub use self::service::{ConnectService, ConnectServiceFactory};
|
||||
|
||||
use actix_service::{NewService, Service, ServiceExt};
|
||||
use tokio_tcp::TcpStream;
|
||||
|
@@ -25,6 +25,13 @@ impl<T> ResolverFactory<T> {
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn service(&self) -> Resolver<T> {
|
||||
Resolver {
|
||||
resolver: self.resolver.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for ResolverFactory<T> {
|
||||
@@ -55,10 +62,7 @@ impl<T: Address> NewService for ResolverFactory<T> {
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(Resolver {
|
||||
resolver: self.resolver.clone(),
|
||||
_t: PhantomData,
|
||||
})
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
|
123
actix-connect/src/service.rs
Normal file
123
actix-connect/src/service.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use actix_service::{NewService, Service};
|
||||
use futures::future::{ok, FutureResult};
|
||||
use futures::{try_ready, Async, Future, Poll};
|
||||
use tokio_tcp::TcpStream;
|
||||
use trust_dns_resolver::AsyncResolver;
|
||||
|
||||
use crate::connect::{Address, Connect, Connection};
|
||||
use crate::connector::{TcpConnector, TcpConnectorFactory};
|
||||
use crate::error::ConnectError;
|
||||
use crate::resolver::{Resolver, ResolverFactory};
|
||||
|
||||
pub struct ConnectServiceFactory<T> {
|
||||
tcp: TcpConnectorFactory<T>,
|
||||
resolver: ResolverFactory<T>,
|
||||
}
|
||||
|
||||
impl<T> ConnectServiceFactory<T> {
|
||||
/// Construct new ConnectService factory
|
||||
pub fn new() -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new connect service with custom dns resolver
|
||||
pub fn with_resolver(resolver: AsyncResolver) -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::new(resolver),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new service
|
||||
pub fn service(&self) -> ConnectService<T> {
|
||||
ConnectService {
|
||||
tcp: self.tcp.service(),
|
||||
resolver: self.resolver.service(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for ConnectServiceFactory<T> {
|
||||
fn default() -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for ConnectServiceFactory<T> {
|
||||
fn clone(&self) -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: self.tcp.clone(),
|
||||
resolver: self.resolver.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> NewService for ConnectServiceFactory<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = ConnectService<T>;
|
||||
type InitError = ();
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectService<T> {
|
||||
tcp: TcpConnector<T>,
|
||||
resolver: Resolver<T>,
|
||||
}
|
||||
|
||||
impl<T: Address> Service for ConnectService<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Future = ConnectServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
ConnectServiceResponse {
|
||||
fut1: Some(self.resolver.call(req)),
|
||||
fut2: None,
|
||||
tcp: self.tcp.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectServiceResponse<T: Address> {
|
||||
fut1: Option<<Resolver<T> as Service>::Future>,
|
||||
fut2: Option<<TcpConnector<T> as Service>::Future>,
|
||||
tcp: TcpConnector<T>,
|
||||
}
|
||||
|
||||
impl<T: Address> Future for ConnectServiceResponse<T> {
|
||||
type Item = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut1 {
|
||||
let res = try_ready!(fut.poll());
|
||||
let _ = self.fut1.take();
|
||||
self.fut2 = Some(self.tcp.call(res));
|
||||
}
|
||||
|
||||
if let Some(ref mut fut) = self.fut2 {
|
||||
return fut.poll();
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
@@ -3,4 +3,10 @@
|
||||
#[cfg(feature = "ssl")]
|
||||
mod openssl;
|
||||
#[cfg(feature = "ssl")]
|
||||
pub use self::openssl::OpensslConnector;
|
||||
pub use self::openssl::{
|
||||
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
|
||||
};
|
||||
#[cfg(feature = "rust-tls")]
|
||||
mod rustls;
|
||||
#[cfg(feature = "rust-tls")]
|
||||
pub use self::rustls::RustlsConnector;
|
||||
|
@@ -1,13 +1,17 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::{fmt, io};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{NewService, Service};
|
||||
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
|
||||
use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll};
|
||||
use openssl::ssl::{HandshakeError, SslConnector};
|
||||
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
|
||||
use tokio_tcp::TcpStream;
|
||||
use trust_dns_resolver::AsyncResolver;
|
||||
|
||||
use crate::{Address, Connection};
|
||||
use crate::{
|
||||
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
|
||||
};
|
||||
|
||||
/// Openssl connector factory
|
||||
pub struct OpensslConnector<T, U> {
|
||||
@@ -77,6 +81,15 @@ pub struct OpensslConnectorService<T, U> {
|
||||
_t: PhantomData<(T, U)>,
|
||||
}
|
||||
|
||||
impl<T, U> Clone for OpensslConnectorService<T, U> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
connector: self.connector.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address, U> Service for OpensslConnectorService<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
@@ -126,3 +139,113 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpensslConnectServiceFactory<T> {
|
||||
tcp: ConnectServiceFactory<T>,
|
||||
openssl: OpensslConnector<T, TcpStream>,
|
||||
}
|
||||
|
||||
impl<T> OpensslConnectServiceFactory<T> {
|
||||
/// Construct new OpensslConnectService factory
|
||||
pub fn new(connector: SslConnector) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: ConnectServiceFactory::default(),
|
||||
openssl: OpensslConnector::new(connector),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new connect service with custom dns resolver
|
||||
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: ConnectServiceFactory::with_resolver(resolver),
|
||||
openssl: OpensslConnector::new(connector),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct openssl connect service
|
||||
pub fn service(&self) -> OpensslConnectService<T> {
|
||||
OpensslConnectService {
|
||||
tcp: self.tcp.service(),
|
||||
openssl: OpensslConnectorService {
|
||||
connector: self.openssl.connector.clone(),
|
||||
_t: PhantomData,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for OpensslConnectServiceFactory<T> {
|
||||
fn clone(&self) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: self.tcp.clone(),
|
||||
openssl: self.openssl.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> NewService for OpensslConnectServiceFactory<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = OpensslConnectService<T>;
|
||||
type InitError = ();
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(self.service())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OpensslConnectService<T> {
|
||||
tcp: ConnectService<T>,
|
||||
openssl: OpensslConnectorService<T, TcpStream>,
|
||||
}
|
||||
|
||||
impl<T: Address> Service for OpensslConnectService<T> {
|
||||
type Request = Connect<T>;
|
||||
type Response = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Future = OpensslConnectServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
OpensslConnectServiceResponse {
|
||||
fut1: Some(self.tcp.call(req)),
|
||||
fut2: None,
|
||||
openssl: self.openssl.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpensslConnectServiceResponse<T: Address> {
|
||||
fut1: Option<<ConnectService<T> as Service>::Future>,
|
||||
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
|
||||
openssl: OpensslConnectorService<T, TcpStream>,
|
||||
}
|
||||
|
||||
impl<T: Address> Future for OpensslConnectServiceResponse<T> {
|
||||
type Item = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut1 {
|
||||
let res = try_ready!(fut.poll());
|
||||
let _ = self.fut1.take();
|
||||
self.fut2 = Some(self.openssl.call(res));
|
||||
}
|
||||
|
||||
if let Some(ref mut fut) = self.fut2 {
|
||||
let connect = try_ready!(fut
|
||||
.poll()
|
||||
.map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e))));
|
||||
Ok(Async::Ready(connect.into_parts().0))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
133
actix-connect/src/ssl/rustls.rs
Normal file
133
actix-connect/src/ssl/rustls.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{NewService, Service};
|
||||
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
|
||||
use std::sync::Arc;
|
||||
use tokio_rustls::{
|
||||
rustls::{ClientConfig, ClientSession},
|
||||
Connect, TlsConnector, TlsStream,
|
||||
};
|
||||
use webpki::DNSNameRef;
|
||||
|
||||
use crate::{Address, Connection};
|
||||
|
||||
/// Rustls connector factory
|
||||
pub struct RustlsConnector<T, U> {
|
||||
connector: Arc<ClientConfig>,
|
||||
_t: PhantomData<(T, U)>,
|
||||
}
|
||||
|
||||
impl<T, U> RustlsConnector<T, U> {
|
||||
pub fn new(connector: Arc<ClientConfig>) -> Self {
|
||||
RustlsConnector {
|
||||
connector,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> RustlsConnector<T, U>
|
||||
where
|
||||
T: Address,
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
pub fn service(
|
||||
connector: Arc<ClientConfig>,
|
||||
) -> impl Service<
|
||||
Request = Connection<T, U>,
|
||||
Response = Connection<T, TlsStream<U, ClientSession>>,
|
||||
Error = std::io::Error,
|
||||
> {
|
||||
RustlsConnectorService {
|
||||
connector: connector,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Clone for RustlsConnector<T, U> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
connector: self.connector.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address, U> NewService for RustlsConnector<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
type Request = Connection<T, U>;
|
||||
type Response = Connection<T, TlsStream<U, ClientSession>>;
|
||||
type Error = std::io::Error;
|
||||
type Config = ();
|
||||
type Service = RustlsConnectorService<T, U>;
|
||||
type InitError = ();
|
||||
type Future = FutureResult<Self::Service, Self::InitError>;
|
||||
|
||||
fn new_service(&self, _: &()) -> Self::Future {
|
||||
ok(RustlsConnectorService {
|
||||
connector: self.connector.clone(),
|
||||
_t: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RustlsConnectorService<T, U> {
|
||||
connector: Arc<ClientConfig>,
|
||||
_t: PhantomData<(T, U)>,
|
||||
}
|
||||
|
||||
impl<T: Address, U> Service for RustlsConnectorService<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
type Request = Connection<T, U>;
|
||||
type Response = Connection<T, TlsStream<U, ClientSession>>;
|
||||
type Error = std::io::Error;
|
||||
type Future = ConnectAsyncExt<T, U>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
|
||||
trace!("SSL Handshake start for: {:?}", stream.host());
|
||||
let (io, stream) = stream.replace(());
|
||||
let host = DNSNameRef::try_from_ascii_str(stream.host()).unwrap();
|
||||
ConnectAsyncExt {
|
||||
fut: TlsConnector::from(self.connector.clone()).connect(host, io),
|
||||
stream: Some(stream),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectAsyncExt<T, U> {
|
||||
fut: Connect<U>,
|
||||
stream: Option<Connection<T, ()>>,
|
||||
}
|
||||
|
||||
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
|
||||
where
|
||||
U: AsyncRead + AsyncWrite + fmt::Debug,
|
||||
{
|
||||
type Item = Connection<T, TlsStream<U, ClientSession>>;
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self.fut.poll().map_err(|e| {
|
||||
trace!("SSL Handshake error: {:?}", e);
|
||||
e
|
||||
})? {
|
||||
Async::Ready(stream) => {
|
||||
let s = self.stream.take().unwrap();
|
||||
trace!("SSL Handshake success: {:?}", s.host());
|
||||
Ok(Async::Ready(s.replace(stream).1))
|
||||
}
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
@@ -26,6 +26,22 @@ fn test_string() {
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
#[test]
|
||||
fn test_rustls_string() {
|
||||
let mut srv = TestServer::with(|| {
|
||||
service_fn(|io: Io<tokio_tcp::TcpStream>| {
|
||||
Framed::new(io.into_parts().0, BytesCodec)
|
||||
.send(Bytes::from_static(b"test"))
|
||||
.then(|_| Ok::<_, ()>(()))
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = default_connector();
|
||||
let addr = format!("localhost:{}", srv.port());
|
||||
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
#[test]
|
||||
fn test_static_str() {
|
||||
let mut srv = TestServer::with(|| {
|
||||
@@ -107,3 +123,20 @@ fn test_uri() {
|
||||
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
#[test]
|
||||
fn test_rustls_uri() {
|
||||
let mut srv = TestServer::with(|| {
|
||||
service_fn(|io: Io<tokio_tcp::TcpStream>| {
|
||||
Framed::new(io.into_parts().0, BytesCodec)
|
||||
.send(Bytes::from_static(b"test"))
|
||||
.then(|_| Ok::<_, ()>(()))
|
||||
})
|
||||
});
|
||||
|
||||
let mut conn = default_connector();
|
||||
let addr = Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
|
||||
let con = srv.run_on(move || conn.call(addr.into())).unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
21
actix-lets-encrypt/Cargo.toml
Normal file
21
actix-lets-encrypt/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "actix-lets-encrypt"
|
||||
version = "0.1.0"
|
||||
authors = ["Jordan Deitch <jd@rsa.pub>"]
|
||||
description = "Actix Let's Encrypt"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-lets-encrypt/"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
edition = "2018"
|
||||
workspace = ".."
|
||||
|
||||
[lib]
|
||||
name = "actix_lets_encrypt"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
acme-client = {version = "0.5", default-features = false}
|
0
actix-lets-encrypt/src/authorization.rs
Normal file
0
actix-lets-encrypt/src/authorization.rs
Normal file
64
actix-lets-encrypt/src/certificate_signer.rs
Normal file
64
actix-lets-encrypt/src/certificate_signer.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
use acme_client::Directory;
|
||||
|
||||
struct CertificateError {
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl std::error::Error for CertificateError {
|
||||
fn description(&self) -> &str { self.message.as_str() }
|
||||
fn cause(&self) -> Option<&dyn std::error::Error> { None }
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { None }
|
||||
}
|
||||
|
||||
impl std::fmt::Display for CertificateError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "An Error Occurred, Please Try Again!")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for CertificateError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{{ file: {}, line: {} }}", file!(), line!())
|
||||
}
|
||||
}
|
||||
|
||||
impl CertificateError {
|
||||
fn new(message: String) -> Self {
|
||||
CertificateError { message }
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<acme_client::error::Error> for CertificateError {
|
||||
fn from(e: acme_client::error::Error) -> Self {
|
||||
return CertificateError::new(e.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
struct CertificateRequest<'a> {
|
||||
domain: &'a str,
|
||||
email: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> CertificateRequest<'a> {
|
||||
fn new(email: &'a str, domain: &'a str) -> Self {
|
||||
return CertificateRequest { domain, email };
|
||||
}
|
||||
|
||||
fn sign(self: &Self) -> Result<(), CertificateError> {
|
||||
let directory = Directory::lets_encrypt()?;
|
||||
let account = directory.account_registration()
|
||||
.email(self.email)
|
||||
.register()?;
|
||||
let authorization = account.authorization(self.domain)?;
|
||||
|
||||
let http_challenge = authorization.get_http_challenge().ok_or("HTTP challenge failed")?;
|
||||
http_challenge.save_key_authorization("/var/www")?;
|
||||
http_challenge.validate()?;
|
||||
|
||||
let cert = account.certificate_signer(&[self.domain]).sign_certificate()?;
|
||||
cert.save_signed_certificate("certificate.pem")?;
|
||||
cert.save_private_key("certificate.key")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
2
actix-lets-encrypt/src/lib.rs
Normal file
2
actix-lets-encrypt/src/lib.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
mod certificate_signer;
|
||||
mod authorization;
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.6.0] - 2019-07-18
|
||||
|
||||
### Added
|
||||
|
||||
* Support Unix domain sockets #3
|
||||
|
||||
|
||||
## [0.5.1] - 2019-05-18
|
||||
|
||||
### Changed
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-server"
|
||||
version = "0.5.1"
|
||||
version = "0.6.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix server - General purpose tcp server"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
@@ -14,7 +14,7 @@ edition = "2018"
|
||||
workspace = ".."
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
features = ["ssl", "tls", "rust-tls"]
|
||||
features = ["ssl", "tls", "rust-tls", "uds"]
|
||||
|
||||
[lib]
|
||||
name = "actix_server"
|
||||
@@ -32,15 +32,18 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"]
|
||||
# rustls
|
||||
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"]
|
||||
|
||||
# uds
|
||||
uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
|
||||
|
||||
[dependencies]
|
||||
actix-rt = "0.2.1"
|
||||
actix-service = "0.4.0"
|
||||
actix-server-config = "0.1.1"
|
||||
actix-rt = "0.2.2"
|
||||
actix-service = "0.4.1"
|
||||
actix-server-config = "0.1.2"
|
||||
|
||||
log = "0.4"
|
||||
num_cpus = "1.0"
|
||||
|
||||
mio = "0.6.13"
|
||||
mio = "0.6.19"
|
||||
net2 = "0.2"
|
||||
futures = "0.1"
|
||||
slab = "0.4"
|
||||
@@ -50,6 +53,10 @@ tokio-timer = "0.2.8"
|
||||
tokio-reactor = "0.1"
|
||||
tokio-signal = "0.2"
|
||||
|
||||
# unix domain sockets
|
||||
mio-uds = { version="0.6.7", optional = true }
|
||||
tokio-uds = { version="0.2.5", optional = true }
|
||||
|
||||
# native-tls
|
||||
native-tls = { version="0.2", optional = true }
|
||||
|
||||
@@ -57,7 +64,7 @@ native-tls = { version="0.2", optional = true }
|
||||
openssl = { version="0.10", optional = true }
|
||||
tokio-openssl = { version="0.3", optional = true }
|
||||
|
||||
#rustls
|
||||
# rustls
|
||||
rustls = { version = "0.15.2", optional = true }
|
||||
tokio-rustls = { version = "0.9.1", optional = true }
|
||||
webpki = { version = "0.19", optional = true }
|
||||
|
@@ -1,17 +1,17 @@
|
||||
use std::sync::mpsc as sync_mpsc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{io, net, thread};
|
||||
use std::{io, thread};
|
||||
|
||||
use actix_rt::System;
|
||||
use futures::future::{lazy, Future};
|
||||
use log::{error, info};
|
||||
use mio;
|
||||
use slab::Slab;
|
||||
use tokio_timer::Delay;
|
||||
|
||||
use super::server::Server;
|
||||
use super::worker::{Conn, WorkerClient};
|
||||
use super::Token;
|
||||
use crate::server::Server;
|
||||
use crate::socket::{SocketAddr, SocketListener, StdListener};
|
||||
use crate::worker::{Conn, WorkerClient};
|
||||
use crate::Token;
|
||||
|
||||
pub(crate) enum Command {
|
||||
Pause,
|
||||
@@ -21,9 +21,9 @@ pub(crate) enum Command {
|
||||
}
|
||||
|
||||
struct ServerSocketInfo {
|
||||
addr: net::SocketAddr,
|
||||
addr: SocketAddr,
|
||||
token: Token,
|
||||
sock: mio::net::TcpListener,
|
||||
sock: SocketListener,
|
||||
timeout: Option<Instant>,
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ impl AcceptLoop {
|
||||
|
||||
pub(crate) fn start(
|
||||
&mut self,
|
||||
socks: Vec<(Token, net::TcpListener)>,
|
||||
socks: Vec<(Token, StdListener)>,
|
||||
workers: Vec<WorkerClient>,
|
||||
) {
|
||||
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
|
||||
@@ -135,7 +135,7 @@ impl Accept {
|
||||
rx: sync_mpsc::Receiver<Command>,
|
||||
cmd_reg: mio::Registration,
|
||||
notify_reg: mio::Registration,
|
||||
socks: Vec<(Token, net::TcpListener)>,
|
||||
socks: Vec<(Token, StdListener)>,
|
||||
srv: Server,
|
||||
workers: Vec<WorkerClient>,
|
||||
) {
|
||||
@@ -174,7 +174,7 @@ impl Accept {
|
||||
|
||||
fn new(
|
||||
rx: sync_mpsc::Receiver<Command>,
|
||||
socks: Vec<(Token, net::TcpListener)>,
|
||||
socks: Vec<(Token, StdListener)>,
|
||||
workers: Vec<WorkerClient>,
|
||||
srv: Server,
|
||||
) -> Accept {
|
||||
@@ -187,10 +187,9 @@ impl Accept {
|
||||
// Start accept
|
||||
let mut sockets = Slab::new();
|
||||
for (hnd_token, lst) in socks.into_iter() {
|
||||
let addr = lst.local_addr().unwrap();
|
||||
let server = mio::net::TcpListener::from_std(lst)
|
||||
.expect("Can not create mio::net::TcpListener");
|
||||
let addr = lst.local_addr();
|
||||
|
||||
let server = lst.into_listener();
|
||||
let entry = sockets.vacant_entry();
|
||||
let token = entry.key();
|
||||
|
||||
@@ -422,12 +421,13 @@ impl Accept {
|
||||
fn accept(&mut self, token: usize) {
|
||||
loop {
|
||||
let msg = if let Some(info) = self.sockets.get_mut(token) {
|
||||
match info.sock.accept_std() {
|
||||
Ok((io, addr)) => Conn {
|
||||
match info.sock.accept() {
|
||||
Ok(Some((io, addr))) => Conn {
|
||||
io,
|
||||
token: info.token,
|
||||
peer: Some(addr),
|
||||
},
|
||||
Ok(None) => return,
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
|
||||
Err(ref e) if connection_error(e) => continue,
|
||||
Err(e) => {
|
||||
|
@@ -9,6 +9,7 @@ use futures::{Async, Future, Poll, Stream};
|
||||
use log::{error, info};
|
||||
use net2::TcpBuilder;
|
||||
use num_cpus;
|
||||
use tokio_tcp::TcpStream;
|
||||
use tokio_timer::sleep;
|
||||
|
||||
use crate::accept::{AcceptLoop, AcceptNotify, Command};
|
||||
@@ -16,6 +17,7 @@ use crate::config::{ConfiguredService, ServiceConfig};
|
||||
use crate::server::{Server, ServerCommand};
|
||||
use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||
use crate::signals::{Signal, Signals};
|
||||
use crate::socket::StdListener;
|
||||
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
|
||||
use crate::{ssl, Token};
|
||||
|
||||
@@ -25,8 +27,8 @@ pub struct ServerBuilder {
|
||||
token: Token,
|
||||
backlog: i32,
|
||||
workers: Vec<(usize, WorkerClient)>,
|
||||
services: Vec<Box<InternalServiceFactory>>,
|
||||
sockets: Vec<(Token, net::TcpListener)>,
|
||||
services: Vec<Box<dyn InternalServiceFactory>>,
|
||||
sockets: Vec<(Token, StdListener)>,
|
||||
accept: AcceptLoop,
|
||||
exit: bool,
|
||||
shutdown_timeout: Duration,
|
||||
@@ -151,7 +153,7 @@ impl ServerBuilder {
|
||||
for (name, lst) in cfg.services {
|
||||
let token = self.token.next();
|
||||
srv.stream(token, name, lst.local_addr()?);
|
||||
self.sockets.push((token, lst));
|
||||
self.sockets.push((token, StdListener::Tcp(lst)));
|
||||
}
|
||||
self.services.push(Box::new(srv));
|
||||
}
|
||||
@@ -163,7 +165,7 @@ impl ServerBuilder {
|
||||
/// Add new service to the server.
|
||||
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory,
|
||||
F: ServiceFactory<TcpStream>,
|
||||
U: net::ToSocketAddrs,
|
||||
{
|
||||
let sockets = bind_addr(addr, self.backlog)?;
|
||||
@@ -176,11 +178,39 @@ impl ServerBuilder {
|
||||
factory.clone(),
|
||||
lst.local_addr()?,
|
||||
));
|
||||
self.sockets.push((token, lst));
|
||||
self.sockets.push((token, StdListener::Tcp(lst)));
|
||||
}
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
/// Add new unix domain service to the server.
|
||||
pub fn bind_uds<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<tokio_uds::UnixStream>,
|
||||
N: AsRef<str>,
|
||||
U: AsRef<std::path::Path>,
|
||||
{
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::os::unix::net::UnixListener;
|
||||
|
||||
// TODO: need to do something with existing paths
|
||||
let _ = std::fs::remove_file(addr.as_ref());
|
||||
|
||||
let lst = UnixListener::bind(addr)?;
|
||||
|
||||
let token = self.token.next();
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||
self.services.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
factory.clone(),
|
||||
addr,
|
||||
));
|
||||
self.sockets.push((token, StdListener::Uds(lst)));
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Add new service to the server.
|
||||
pub fn listen<F, N: AsRef<str>>(
|
||||
mut self,
|
||||
@@ -189,7 +219,7 @@ impl ServerBuilder {
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory,
|
||||
F: ServiceFactory<TcpStream>,
|
||||
{
|
||||
let token = self.token.next();
|
||||
self.services.push(StreamNewService::create(
|
||||
@@ -198,7 +228,7 @@ impl ServerBuilder {
|
||||
factory,
|
||||
lst.local_addr()?,
|
||||
));
|
||||
self.sockets.push((token, lst));
|
||||
self.sockets.push((token, StdListener::Tcp(lst)));
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
@@ -243,7 +273,7 @@ impl ServerBuilder {
|
||||
|
||||
// start accept thread
|
||||
for sock in &self.sockets {
|
||||
info!("Starting server on {}", sock.1.local_addr().ok().unwrap());
|
||||
info!("Starting server on {}", sock.1);
|
||||
}
|
||||
self.accept
|
||||
.start(mem::replace(&mut self.sockets, Vec::new()), workers);
|
||||
@@ -266,7 +296,7 @@ impl ServerBuilder {
|
||||
let timeout = self.shutdown_timeout;
|
||||
let avail = WorkerAvailability::new(notify);
|
||||
let worker = WorkerClient::new(idx, tx1, tx2, avail.clone());
|
||||
let services: Vec<Box<InternalServiceFactory>> =
|
||||
let services: Vec<Box<dyn InternalServiceFactory>> =
|
||||
self.services.iter().map(|v| v.clone_factory()).collect();
|
||||
|
||||
Arbiter::new().send(lazy(move || {
|
||||
|
@@ -17,7 +17,7 @@ use super::Token;
|
||||
|
||||
pub struct ServiceConfig {
|
||||
pub(crate) services: Vec<(String, net::TcpListener)>,
|
||||
pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>,
|
||||
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
|
||||
pub(crate) threads: usize,
|
||||
pub(crate) backlog: i32,
|
||||
}
|
||||
@@ -75,13 +75,13 @@ impl ServiceConfig {
|
||||
}
|
||||
|
||||
pub(super) struct ConfiguredService {
|
||||
rt: Box<ServiceRuntimeConfiguration>,
|
||||
rt: Box<dyn ServiceRuntimeConfiguration>,
|
||||
names: HashMap<Token, (String, net::SocketAddr)>,
|
||||
services: HashMap<String, Token>,
|
||||
}
|
||||
|
||||
impl ConfiguredService {
|
||||
pub(super) fn new(rt: Box<ServiceRuntimeConfiguration>) -> Self {
|
||||
pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
|
||||
ConfiguredService {
|
||||
rt,
|
||||
names: HashMap::new(),
|
||||
@@ -100,7 +100,7 @@ impl InternalServiceFactory for ConfiguredService {
|
||||
&self.names[&token].0
|
||||
}
|
||||
|
||||
fn clone_factory(&self) -> Box<InternalServiceFactory> {
|
||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
|
||||
Box::new(Self {
|
||||
rt: self.rt.clone(),
|
||||
names: self.names.clone(),
|
||||
@@ -108,7 +108,7 @@ impl InternalServiceFactory for ConfiguredService {
|
||||
})
|
||||
}
|
||||
|
||||
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
|
||||
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
|
||||
// configure services
|
||||
let mut rt = ServiceRuntime::new(self.services.clone());
|
||||
self.rt.configure(&mut rt);
|
||||
@@ -156,7 +156,7 @@ impl InternalServiceFactory for ConfiguredService {
|
||||
}
|
||||
|
||||
pub(super) trait ServiceRuntimeConfiguration: Send {
|
||||
fn clone(&self) -> Box<ServiceRuntimeConfiguration>;
|
||||
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
|
||||
|
||||
fn configure(&self, rt: &mut ServiceRuntime);
|
||||
}
|
||||
@@ -165,7 +165,7 @@ impl<F> ServiceRuntimeConfiguration for F
|
||||
where
|
||||
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
||||
{
|
||||
fn clone(&self) -> Box<ServiceRuntimeConfiguration> {
|
||||
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ fn not_configured(_: &mut ServiceRuntime) {
|
||||
pub struct ServiceRuntime {
|
||||
names: HashMap<String, Token>,
|
||||
services: HashMap<Token, BoxedNewService>,
|
||||
onstart: Vec<Box<Future<Item = (), Error = ()>>>,
|
||||
onstart: Vec<Box<dyn Future<Item = (), Error = ()>>>,
|
||||
}
|
||||
|
||||
impl ServiceRuntime {
|
||||
@@ -236,14 +236,14 @@ impl ServiceRuntime {
|
||||
}
|
||||
|
||||
type BoxedNewService = Box<
|
||||
NewService<
|
||||
dyn NewService<
|
||||
Request = (Option<CounterGuard>, ServerMessage),
|
||||
Response = (),
|
||||
Error = (),
|
||||
InitError = (),
|
||||
Config = ServerConfig,
|
||||
Service = BoxedServerService,
|
||||
Future = Box<Future<Item = BoxedServerService, Error = ()>>,
|
||||
Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>,
|
||||
>,
|
||||
>;
|
||||
|
||||
@@ -265,7 +265,7 @@ where
|
||||
type InitError = ();
|
||||
type Config = ServerConfig;
|
||||
type Service = BoxedServerService;
|
||||
type Future = Box<Future<Item = BoxedServerService, Error = ()>>;
|
||||
type Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>;
|
||||
|
||||
fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
|
||||
Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| {
|
||||
|
@@ -7,6 +7,7 @@ mod counter;
|
||||
mod server;
|
||||
mod services;
|
||||
mod signals;
|
||||
mod socket;
|
||||
pub mod ssl;
|
||||
mod worker;
|
||||
|
||||
@@ -17,6 +18,9 @@ pub use self::config::{ServiceConfig, ServiceRuntime};
|
||||
pub use self::server::Server;
|
||||
pub use self::services::ServiceFactory;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use self::socket::FromStream;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use self::services::ServiceFactory as StreamServiceFactory;
|
||||
|
||||
|
@@ -1,4 +1,5 @@
|
||||
use std::net::{self, SocketAddr};
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_rt::spawn;
|
||||
@@ -7,24 +8,23 @@ use actix_service::{NewService, Service};
|
||||
use futures::future::{err, ok, FutureResult};
|
||||
use futures::{Future, Poll};
|
||||
use log::error;
|
||||
use tokio_reactor::Handle;
|
||||
use tokio_tcp::TcpStream;
|
||||
|
||||
use super::Token;
|
||||
use crate::counter::CounterGuard;
|
||||
use crate::socket::{FromStream, StdStream};
|
||||
|
||||
/// Server message
|
||||
pub(crate) enum ServerMessage {
|
||||
/// New stream
|
||||
Connect(net::TcpStream),
|
||||
Connect(StdStream),
|
||||
/// Gracefull shutdown
|
||||
Shutdown(Duration),
|
||||
/// Force shutdown
|
||||
ForceShutdown,
|
||||
}
|
||||
|
||||
pub trait ServiceFactory: Send + Clone + 'static {
|
||||
type NewService: NewService<Config = ServerConfig, Request = Io<TcpStream>>;
|
||||
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
|
||||
type NewService: NewService<Config = ServerConfig, Request = Io<Stream>>;
|
||||
|
||||
fn create(&self) -> Self::NewService;
|
||||
}
|
||||
@@ -32,13 +32,13 @@ pub trait ServiceFactory: Send + Clone + 'static {
|
||||
pub(crate) trait InternalServiceFactory: Send {
|
||||
fn name(&self, token: Token) -> &str;
|
||||
|
||||
fn clone_factory(&self) -> Box<InternalServiceFactory>;
|
||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
|
||||
|
||||
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
|
||||
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
|
||||
}
|
||||
|
||||
pub(crate) type BoxedServerService = Box<
|
||||
Service<
|
||||
dyn Service<
|
||||
Request = (Option<CounterGuard>, ServerMessage),
|
||||
Response = (),
|
||||
Error = (),
|
||||
@@ -56,11 +56,12 @@ impl<T> StreamService<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Service for StreamService<T>
|
||||
impl<T, I> Service for StreamService<T>
|
||||
where
|
||||
T: Service<Request = Io<TcpStream>>,
|
||||
T: Service<Request = Io<I>>,
|
||||
T::Future: 'static,
|
||||
T::Error: 'static,
|
||||
I: FromStream,
|
||||
{
|
||||
type Request = (Option<CounterGuard>, ServerMessage);
|
||||
type Response = ();
|
||||
@@ -74,7 +75,7 @@ where
|
||||
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
|
||||
match req {
|
||||
ServerMessage::Connect(stream) => {
|
||||
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
|
||||
let stream = FromStream::from_stdstream(stream).map_err(|e| {
|
||||
error!("Can not convert to an async tcp stream: {}", e);
|
||||
});
|
||||
|
||||
@@ -93,50 +94,55 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct StreamNewService<F: ServiceFactory> {
|
||||
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
|
||||
name: String,
|
||||
inner: F,
|
||||
token: Token,
|
||||
addr: SocketAddr,
|
||||
_t: PhantomData<Io>,
|
||||
}
|
||||
|
||||
impl<F> StreamNewService<F>
|
||||
impl<F, Io> StreamNewService<F, Io>
|
||||
where
|
||||
F: ServiceFactory,
|
||||
F: ServiceFactory<Io>,
|
||||
Io: FromStream + Send + 'static,
|
||||
{
|
||||
pub(crate) fn create(
|
||||
name: String,
|
||||
token: Token,
|
||||
inner: F,
|
||||
addr: SocketAddr,
|
||||
) -> Box<InternalServiceFactory> {
|
||||
) -> Box<dyn InternalServiceFactory> {
|
||||
Box::new(Self {
|
||||
name,
|
||||
token,
|
||||
inner,
|
||||
addr,
|
||||
_t: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> InternalServiceFactory for StreamNewService<F>
|
||||
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
|
||||
where
|
||||
F: ServiceFactory,
|
||||
F: ServiceFactory<Io>,
|
||||
Io: FromStream + Send + 'static,
|
||||
{
|
||||
fn name(&self, _: Token) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn clone_factory(&self) -> Box<InternalServiceFactory> {
|
||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
|
||||
Box::new(Self {
|
||||
name: self.name.clone(),
|
||||
inner: self.inner.clone(),
|
||||
token: self.token,
|
||||
addr: self.addr,
|
||||
_t: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
|
||||
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
|
||||
let token = self.token;
|
||||
let config = ServerConfig::new(self.addr);
|
||||
Box::new(
|
||||
@@ -152,24 +158,25 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl InternalServiceFactory for Box<InternalServiceFactory> {
|
||||
impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
|
||||
fn name(&self, token: Token) -> &str {
|
||||
self.as_ref().name(token)
|
||||
}
|
||||
|
||||
fn clone_factory(&self) -> Box<InternalServiceFactory> {
|
||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
|
||||
self.as_ref().clone_factory()
|
||||
}
|
||||
|
||||
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
|
||||
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
|
||||
self.as_ref().create()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, T> ServiceFactory for F
|
||||
impl<F, T, I> ServiceFactory<I> for F
|
||||
where
|
||||
F: Fn() -> T + Send + Clone + 'static,
|
||||
T: NewService<Config = ServerConfig, Request = Io<TcpStream>>,
|
||||
T: NewService<Config = ServerConfig, Request = Io<I>>,
|
||||
I: FromStream,
|
||||
{
|
||||
type NewService = T;
|
||||
|
||||
|
@@ -27,7 +27,7 @@ pub(crate) struct Signals {
|
||||
streams: Vec<SigStream>,
|
||||
}
|
||||
|
||||
type SigStream = Box<Stream<Item = Signal, Error = io::Error>>;
|
||||
type SigStream = Box<dyn Stream<Item = Signal, Error = io::Error>>;
|
||||
|
||||
impl Signals {
|
||||
pub(crate) fn start(srv: Server) {
|
||||
@@ -46,7 +46,7 @@ impl Signals {
|
||||
{
|
||||
use tokio_signal::unix;
|
||||
|
||||
let mut sigs: Vec<Box<Future<Item = SigStream, Error = io::Error>>> =
|
||||
let mut sigs: Vec<Box<dyn Future<Item = SigStream, Error = io::Error>>> =
|
||||
Vec::new();
|
||||
sigs.push(Box::new(
|
||||
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| {
|
||||
|
173
actix-server/src/socket.rs
Normal file
173
actix-server/src/socket.rs
Normal file
@@ -0,0 +1,173 @@
|
||||
use std::{fmt, io, net};
|
||||
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_reactor::Handle;
|
||||
use tokio_tcp::TcpStream;
|
||||
|
||||
pub(crate) enum StdListener {
|
||||
Tcp(net::TcpListener),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
Uds(std::os::unix::net::UnixListener),
|
||||
}
|
||||
|
||||
pub(crate) enum SocketAddr {
|
||||
Tcp(net::SocketAddr),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
Uds(std::os::unix::net::SocketAddr),
|
||||
}
|
||||
|
||||
impl fmt::Display for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StdListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StdListener {
|
||||
pub(crate) fn local_addr(&self) -> SocketAddr {
|
||||
match self {
|
||||
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_listener(self) -> SocketListener {
|
||||
match self {
|
||||
StdListener::Tcp(lst) => SocketListener::Tcp(
|
||||
mio::net::TcpListener::from_std(lst)
|
||||
.expect("Can not create mio::net::TcpListener"),
|
||||
),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
StdListener::Uds(lst) => SocketListener::Uds(
|
||||
mio_uds::UnixListener::from_listener(lst)
|
||||
.expect("Can not create mio_uds::UnixListener"),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum StdStream {
|
||||
Tcp(std::net::TcpStream),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
Uds(std::os::unix::net::UnixStream),
|
||||
}
|
||||
|
||||
pub(crate) enum SocketListener {
|
||||
Tcp(mio::net::TcpListener),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
Uds(mio_uds::UnixListener),
|
||||
}
|
||||
|
||||
impl SocketListener {
|
||||
pub(crate) fn accept(&self) -> io::Result<Option<(StdStream, SocketAddr)>> {
|
||||
match *self {
|
||||
SocketListener::Tcp(ref lst) => lst
|
||||
.accept_std()
|
||||
.map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
SocketListener::Uds(ref lst) => lst.accept_std().map(|res| {
|
||||
res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr)))
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl mio::Evented for SocketListener {
|
||||
fn register(
|
||||
&self,
|
||||
poll: &mio::Poll,
|
||||
token: mio::Token,
|
||||
interest: mio::Ready,
|
||||
opts: mio::PollOpt,
|
||||
) -> io::Result<()> {
|
||||
match *self {
|
||||
SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts),
|
||||
}
|
||||
}
|
||||
|
||||
fn reregister(
|
||||
&self,
|
||||
poll: &mio::Poll,
|
||||
token: mio::Token,
|
||||
interest: mio::Ready,
|
||||
opts: mio::PollOpt,
|
||||
) -> io::Result<()> {
|
||||
match *self {
|
||||
SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts),
|
||||
}
|
||||
}
|
||||
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
|
||||
match *self {
|
||||
SocketListener::Tcp(ref lst) => lst.deregister(poll),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
SocketListener::Uds(ref lst) => {
|
||||
let res = lst.deregister(poll);
|
||||
|
||||
// cleanup file path
|
||||
if let Ok(addr) = lst.local_addr() {
|
||||
if let Some(path) = addr.as_pathname() {
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait FromStream: AsyncRead + AsyncWrite + Sized {
|
||||
fn from_stdstream(sock: StdStream) -> io::Result<Self>;
|
||||
}
|
||||
|
||||
impl FromStream for TcpStream {
|
||||
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
|
||||
match sock {
|
||||
StdStream::Tcp(stream) => TcpStream::from_std(stream, &Handle::default()),
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
StdStream::Uds(_) => {
|
||||
panic!("Should not happen, bug in server impl");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "uds"))]
|
||||
impl FromStream for tokio_uds::UnixStream {
|
||||
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
|
||||
match sock {
|
||||
StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
|
||||
StdStream::Uds(stream) => {
|
||||
tokio_uds::UnixStream::from_std(stream, &Handle::default())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,6 +1,6 @@
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::{mem, net, time};
|
||||
use std::{mem, time};
|
||||
|
||||
use actix_rt::{spawn, Arbiter};
|
||||
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
@@ -12,6 +12,7 @@ use tokio_timer::{sleep, Delay};
|
||||
use crate::accept::AcceptNotify;
|
||||
use crate::counter::Counter;
|
||||
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
|
||||
use crate::socket::{SocketAddr, StdStream};
|
||||
use crate::Token;
|
||||
|
||||
pub(crate) struct WorkerCommand(Conn);
|
||||
@@ -25,9 +26,9 @@ pub(crate) struct StopCommand {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Conn {
|
||||
pub io: net::TcpStream,
|
||||
pub io: StdStream,
|
||||
pub token: Token,
|
||||
pub peer: Option<net::SocketAddr>,
|
||||
pub peer: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
|
||||
@@ -127,7 +128,7 @@ pub(crate) struct Worker {
|
||||
services: Vec<Option<(usize, BoxedServerService)>>,
|
||||
availability: WorkerAvailability,
|
||||
conns: Counter,
|
||||
factories: Vec<Box<InternalServiceFactory>>,
|
||||
factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||
state: WorkerState,
|
||||
shutdown_timeout: time::Duration,
|
||||
}
|
||||
@@ -136,7 +137,7 @@ impl Worker {
|
||||
pub(crate) fn start(
|
||||
rx: UnboundedReceiver<WorkerCommand>,
|
||||
rx2: UnboundedReceiver<StopCommand>,
|
||||
factories: Vec<Box<InternalServiceFactory>>,
|
||||
factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||
availability: WorkerAvailability,
|
||||
shutdown_timeout: time::Duration,
|
||||
) {
|
||||
@@ -237,7 +238,7 @@ enum WorkerState {
|
||||
Restarting(
|
||||
usize,
|
||||
Token,
|
||||
Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
|
||||
Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
|
||||
),
|
||||
Shutdown(Delay, Delay, oneshot::Sender<bool>),
|
||||
}
|
||||
|
@@ -176,7 +176,7 @@ impl<T: ?Sized> ServiceExt for T where T: Service {}
|
||||
/// Creates new `Service` values.
|
||||
///
|
||||
/// Acts as a service factory. This is useful for cases where new `Service`
|
||||
/// values must be produced. One case is a TCP servier listener. The listner
|
||||
/// values must be produced. One case is a TCP server listener. The listener
|
||||
/// accepts new TCP streams, obtains a new `Service` value using the
|
||||
/// `NewService` trait, and uses that new `Service` value to process inbound
|
||||
/// requests on that new TCP stream.
|
||||
|
@@ -1,9 +1,13 @@
|
||||
# Changes
|
||||
|
||||
## [0.1.2] - 2019-08-05
|
||||
|
||||
### Changed
|
||||
|
||||
* Update `derive_more` to 0.15
|
||||
|
||||
* Update `parking_lot` to 0.9
|
||||
|
||||
## [0.1.1] - 2019-06-05
|
||||
|
||||
* Update parking_lot
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-threadpool"
|
||||
version = "0.1.1"
|
||||
version = "0.1.2"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix thread pool for sync code"
|
||||
keywords = ["actix", "network", "framework", "async", "futures"]
|
||||
@@ -20,7 +20,7 @@ path = "src/lib.rs"
|
||||
[dependencies]
|
||||
derive_more = "0.15"
|
||||
futures = "0.1.25"
|
||||
parking_lot = "0.8"
|
||||
parking_lot = "0.9"
|
||||
lazy_static = "1.2"
|
||||
log = "0.4"
|
||||
num_cpus = "1.10"
|
||||
|
@@ -1,5 +1,12 @@
|
||||
# Changes
|
||||
|
||||
## [0.4.5] - 2019-07-19
|
||||
|
||||
### Removed
|
||||
|
||||
* Deprecated `CloneableService` as it is not safe
|
||||
|
||||
|
||||
## [0.4.4] - 2019-07-17
|
||||
|
||||
### Changed
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-utils"
|
||||
version = "0.4.4"
|
||||
version = "0.4.5"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix utils - various actix net related services"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
|
@@ -1,55 +0,0 @@
|
||||
#![allow(deprecated)]
|
||||
use std::marker::PhantomData;
|
||||
use std::rc::Rc;
|
||||
|
||||
use actix_service::Service;
|
||||
use futures::Poll;
|
||||
|
||||
use super::cell::Cell;
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated(since = "0.4.3", note = "support will be removed in actix-utils 0.4.5")]
|
||||
/// Service that allows to turn non-clone service to a service with `Clone` impl
|
||||
pub struct CloneableService<T> {
|
||||
service: Cell<T>,
|
||||
_t: PhantomData<Rc<()>>,
|
||||
}
|
||||
|
||||
impl<T> CloneableService<T> {
|
||||
pub fn new(service: T) -> Self
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
Self {
|
||||
service: Cell::new(service),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for CloneableService<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
service: self.service.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Service for CloneableService<T>
|
||||
where
|
||||
T: Service,
|
||||
{
|
||||
type Request = T::Request;
|
||||
type Response = T::Response;
|
||||
type Error = T::Error;
|
||||
type Future = T::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.service.get_mut().poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: T::Request) -> Self::Future {
|
||||
self.service.get_mut().call(req)
|
||||
}
|
||||
}
|
@@ -1,6 +1,6 @@
|
||||
//! Actix utils - various helper services
|
||||
|
||||
mod cell;
|
||||
pub mod cloneable;
|
||||
pub mod counter;
|
||||
pub mod either;
|
||||
pub mod framed;
|
||||
|
Reference in New Issue
Block a user