1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 00:01:11 +01:00

add unix domain sockets support #3

This commit is contained in:
Nikolay Kim 2019-07-18 17:05:40 +06:00
parent 2955e49d78
commit 311bb14d97
10 changed files with 304 additions and 75 deletions

View File

@ -1,5 +1,12 @@
# Changes # Changes
## [0.6.0] - 2019-07-18
### Added
* Support Unix domain sockets #3
## [0.5.1] - 2019-05-18 ## [0.5.1] - 2019-05-18
### Changed ### Changed

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "0.5.1" version = "0.6.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server" description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -14,7 +14,7 @@ edition = "2018"
workspace = ".." workspace = ".."
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["ssl", "tls", "rust-tls"] features = ["ssl", "tls", "rust-tls", "uds"]
[lib] [lib]
name = "actix_server" name = "actix_server"
@ -32,15 +32,18 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"]
# rustls # rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"] rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"]
# uds
uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
[dependencies] [dependencies]
actix-rt = "0.2.1" actix-rt = "0.2.2"
actix-service = "0.4.0" actix-service = "0.4.1"
actix-server-config = "0.1.1" actix-server-config = "0.1.2"
log = "0.4" log = "0.4"
num_cpus = "1.0" num_cpus = "1.0"
mio = "0.6.13" mio = "0.6.19"
net2 = "0.2" net2 = "0.2"
futures = "0.1" futures = "0.1"
slab = "0.4" slab = "0.4"
@ -50,6 +53,10 @@ tokio-timer = "0.2.8"
tokio-reactor = "0.1" tokio-reactor = "0.1"
tokio-signal = "0.2" tokio-signal = "0.2"
# unix domain sockets
mio-uds = { version="0.6.7", optional = true }
tokio-uds = { version="0.2.5", optional = true }
# native-tls # native-tls
native-tls = { version="0.2", optional = true } native-tls = { version="0.2", optional = true }

View File

@ -1,17 +1,17 @@
use std::sync::mpsc as sync_mpsc; use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{io, net, thread}; use std::{io, thread};
use actix_rt::System; use actix_rt::System;
use futures::future::{lazy, Future}; use futures::future::{lazy, Future};
use log::{error, info}; use log::{error, info};
use mio;
use slab::Slab; use slab::Slab;
use tokio_timer::Delay; use tokio_timer::Delay;
use super::server::Server; use crate::server::Server;
use super::worker::{Conn, WorkerClient}; use crate::socket::{SocketAddr, SocketListener, StdListener};
use super::Token; use crate::worker::{Conn, WorkerClient};
use crate::Token;
pub(crate) enum Command { pub(crate) enum Command {
Pause, Pause,
@ -21,9 +21,9 @@ pub(crate) enum Command {
} }
struct ServerSocketInfo { struct ServerSocketInfo {
addr: net::SocketAddr, addr: SocketAddr,
token: Token, token: Token,
sock: mio::net::TcpListener, sock: SocketListener,
timeout: Option<Instant>, timeout: Option<Instant>,
} }
@ -84,7 +84,7 @@ impl AcceptLoop {
pub(crate) fn start( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) { ) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
@ -135,7 +135,7 @@ impl Accept {
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
cmd_reg: mio::Registration, cmd_reg: mio::Registration,
notify_reg: mio::Registration, notify_reg: mio::Registration,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, StdListener)>,
srv: Server, srv: Server,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) { ) {
@ -174,7 +174,7 @@ impl Accept {
fn new( fn new(
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
socks: Vec<(Token, net::TcpListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
srv: Server, srv: Server,
) -> Accept { ) -> Accept {
@ -187,10 +187,9 @@ impl Accept {
// Start accept // Start accept
let mut sockets = Slab::new(); let mut sockets = Slab::new();
for (hnd_token, lst) in socks.into_iter() { for (hnd_token, lst) in socks.into_iter() {
let addr = lst.local_addr().unwrap(); let addr = lst.local_addr();
let server = mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener");
let server = lst.into_listener();
let entry = sockets.vacant_entry(); let entry = sockets.vacant_entry();
let token = entry.key(); let token = entry.key();
@ -422,12 +421,13 @@ impl Accept {
fn accept(&mut self, token: usize) { fn accept(&mut self, token: usize) {
loop { loop {
let msg = if let Some(info) = self.sockets.get_mut(token) { let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept_std() { match info.sock.accept() {
Ok((io, addr)) => Conn { Ok(Some((io, addr))) => Conn {
io, io,
token: info.token, token: info.token,
peer: Some(addr), peer: Some(addr),
}, },
Ok(None) => return,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,
Err(e) => { Err(e) => {

View File

@ -9,6 +9,7 @@ use futures::{Async, Future, Poll, Stream};
use log::{error, info}; use log::{error, info};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_tcp::TcpStream;
use tokio_timer::sleep; use tokio_timer::sleep;
use crate::accept::{AcceptLoop, AcceptNotify, Command}; use crate::accept::{AcceptLoop, AcceptNotify, Command};
@ -16,6 +17,7 @@ use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand}; use crate::server::{Server, ServerCommand};
use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::{ssl, Token}; use crate::{ssl, Token};
@ -25,8 +27,8 @@ pub struct ServerBuilder {
token: Token, token: Token,
backlog: i32, backlog: i32,
workers: Vec<(usize, WorkerClient)>, workers: Vec<(usize, WorkerClient)>,
services: Vec<Box<InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>, sockets: Vec<(Token, StdListener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
shutdown_timeout: Duration, shutdown_timeout: Duration,
@ -151,7 +153,7 @@ impl ServerBuilder {
for (name, lst) in cfg.services { for (name, lst) in cfg.services {
let token = self.token.next(); let token = self.token.next();
srv.stream(token, name, lst.local_addr()?); srv.stream(token, name, lst.local_addr()?);
self.sockets.push((token, lst)); self.sockets.push((token, StdListener::Tcp(lst)));
} }
self.services.push(Box::new(srv)); self.services.push(Box::new(srv));
} }
@ -163,7 +165,7 @@ impl ServerBuilder {
/// Add new service to the server. /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory, F: ServiceFactory<TcpStream>,
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
@ -176,11 +178,39 @@ impl ServerBuilder {
factory.clone(), factory.clone(),
lst.local_addr()?, lst.local_addr()?,
)); ));
self.sockets.push((token, lst)); self.sockets.push((token, StdListener::Tcp(lst)));
} }
Ok(self) Ok(self)
} }
#[cfg(all(unix, feature = "uds"))]
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<tokio_uds::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::net::UnixListener;
// TODO: need to do something with existing paths
let _ = std::fs::remove_file(addr.as_ref());
let lst = UnixListener::bind(addr)?;
let token = self.token.next();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
addr,
));
self.sockets.push((token, StdListener::Uds(lst)));
Ok(self)
}
/// Add new service to the server. /// Add new service to the server.
pub fn listen<F, N: AsRef<str>>( pub fn listen<F, N: AsRef<str>>(
mut self, mut self,
@ -189,7 +219,7 @@ impl ServerBuilder {
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory, F: ServiceFactory<TcpStream>,
{ {
let token = self.token.next(); let token = self.token.next();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
@ -198,7 +228,7 @@ impl ServerBuilder {
factory, factory,
lst.local_addr()?, lst.local_addr()?,
)); ));
self.sockets.push((token, lst)); self.sockets.push((token, StdListener::Tcp(lst)));
Ok(self) Ok(self)
} }
@ -243,7 +273,7 @@ impl ServerBuilder {
// start accept thread // start accept thread
for sock in &self.sockets { for sock in &self.sockets {
info!("Starting server on {}", sock.1.local_addr().ok().unwrap()); info!("Starting server on {}", sock.1);
} }
self.accept self.accept
.start(mem::replace(&mut self.sockets, Vec::new()), workers); .start(mem::replace(&mut self.sockets, Vec::new()), workers);
@ -266,7 +296,7 @@ impl ServerBuilder {
let timeout = self.shutdown_timeout; let timeout = self.shutdown_timeout;
let avail = WorkerAvailability::new(notify); let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx1, tx2, avail.clone()); let worker = WorkerClient::new(idx, tx1, tx2, avail.clone());
let services: Vec<Box<InternalServiceFactory>> = let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new().send(lazy(move || { Arbiter::new().send(lazy(move || {

View File

@ -17,7 +17,7 @@ use super::Token;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
pub(crate) apply: Option<Box<ServiceRuntimeConfiguration>>, pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
pub(crate) threads: usize, pub(crate) threads: usize,
pub(crate) backlog: i32, pub(crate) backlog: i32,
} }
@ -75,13 +75,13 @@ impl ServiceConfig {
} }
pub(super) struct ConfiguredService { pub(super) struct ConfiguredService {
rt: Box<ServiceRuntimeConfiguration>, rt: Box<dyn ServiceRuntimeConfiguration>,
names: HashMap<Token, (String, net::SocketAddr)>, names: HashMap<Token, (String, net::SocketAddr)>,
services: HashMap<String, Token>, services: HashMap<String, Token>,
} }
impl ConfiguredService { impl ConfiguredService {
pub(super) fn new(rt: Box<ServiceRuntimeConfiguration>) -> Self { pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
ConfiguredService { ConfiguredService {
rt, rt,
names: HashMap::new(), names: HashMap::new(),
@ -100,7 +100,7 @@ impl InternalServiceFactory for ConfiguredService {
&self.names[&token].0 &self.names[&token].0
} }
fn clone_factory(&self) -> Box<InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self { Box::new(Self {
rt: self.rt.clone(), rt: self.rt.clone(),
names: self.names.clone(), names: self.names.clone(),
@ -108,7 +108,7 @@ impl InternalServiceFactory for ConfiguredService {
}) })
} }
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
// configure services // configure services
let mut rt = ServiceRuntime::new(self.services.clone()); let mut rt = ServiceRuntime::new(self.services.clone());
self.rt.configure(&mut rt); self.rt.configure(&mut rt);
@ -156,7 +156,7 @@ impl InternalServiceFactory for ConfiguredService {
} }
pub(super) trait ServiceRuntimeConfiguration: Send { pub(super) trait ServiceRuntimeConfiguration: Send {
fn clone(&self) -> Box<ServiceRuntimeConfiguration>; fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
fn configure(&self, rt: &mut ServiceRuntime); fn configure(&self, rt: &mut ServiceRuntime);
} }
@ -165,7 +165,7 @@ impl<F> ServiceRuntimeConfiguration for F
where where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{ {
fn clone(&self) -> Box<ServiceRuntimeConfiguration> { fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
Box::new(self.clone()) Box::new(self.clone())
} }
@ -181,7 +181,7 @@ fn not_configured(_: &mut ServiceRuntime) {
pub struct ServiceRuntime { pub struct ServiceRuntime {
names: HashMap<String, Token>, names: HashMap<String, Token>,
services: HashMap<Token, BoxedNewService>, services: HashMap<Token, BoxedNewService>,
onstart: Vec<Box<Future<Item = (), Error = ()>>>, onstart: Vec<Box<dyn Future<Item = (), Error = ()>>>,
} }
impl ServiceRuntime { impl ServiceRuntime {
@ -236,14 +236,14 @@ impl ServiceRuntime {
} }
type BoxedNewService = Box< type BoxedNewService = Box<
NewService< dyn NewService<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, ServerMessage),
Response = (), Response = (),
Error = (), Error = (),
InitError = (), InitError = (),
Config = ServerConfig, Config = ServerConfig,
Service = BoxedServerService, Service = BoxedServerService,
Future = Box<Future<Item = BoxedServerService, Error = ()>>, Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>,
>, >,
>; >;
@ -265,7 +265,7 @@ where
type InitError = (); type InitError = ();
type Config = ServerConfig; type Config = ServerConfig;
type Service = BoxedServerService; type Service = BoxedServerService;
type Future = Box<Future<Item = BoxedServerService, Error = ()>>; type Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future { fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| { Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| {

View File

@ -7,6 +7,7 @@ mod counter;
mod server; mod server;
mod services; mod services;
mod signals; mod signals;
mod socket;
pub mod ssl; pub mod ssl;
mod worker; mod worker;
@ -17,6 +18,9 @@ pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server; pub use self::server::Server;
pub use self::services::ServiceFactory; pub use self::services::ServiceFactory;
#[doc(hidden)]
pub use self::socket::FromStream;
#[doc(hidden)] #[doc(hidden)]
pub use self::services::ServiceFactory as StreamServiceFactory; pub use self::services::ServiceFactory as StreamServiceFactory;

View File

@ -1,4 +1,5 @@
use std::net::{self, SocketAddr}; use std::marker::PhantomData;
use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use actix_rt::spawn; use actix_rt::spawn;
@ -7,24 +8,23 @@ use actix_service::{NewService, Service};
use futures::future::{err, ok, FutureResult}; use futures::future::{err, ok, FutureResult};
use futures::{Future, Poll}; use futures::{Future, Poll};
use log::error; use log::error;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
use super::Token; use super::Token;
use crate::counter::CounterGuard; use crate::counter::CounterGuard;
use crate::socket::{FromStream, StdStream};
/// Server message /// Server message
pub(crate) enum ServerMessage { pub(crate) enum ServerMessage {
/// New stream /// New stream
Connect(net::TcpStream), Connect(StdStream),
/// Gracefull shutdown /// Gracefull shutdown
Shutdown(Duration), Shutdown(Duration),
/// Force shutdown /// Force shutdown
ForceShutdown, ForceShutdown,
} }
pub trait ServiceFactory: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type NewService: NewService<Config = ServerConfig, Request = Io<TcpStream>>; type NewService: NewService<Config = ServerConfig, Request = Io<Stream>>;
fn create(&self) -> Self::NewService; fn create(&self) -> Self::NewService;
} }
@ -32,13 +32,13 @@ pub trait ServiceFactory: Send + Clone + 'static {
pub(crate) trait InternalServiceFactory: Send { pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str; fn name(&self, token: Token) -> &str;
fn clone_factory(&self) -> Box<InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>; fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
} }
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
Service< dyn Service<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, ServerMessage),
Response = (), Response = (),
Error = (), Error = (),
@ -56,11 +56,12 @@ impl<T> StreamService<T> {
} }
} }
impl<T> Service for StreamService<T> impl<T, I> Service for StreamService<T>
where where
T: Service<Request = Io<TcpStream>>, T: Service<Request = Io<I>>,
T::Future: 'static, T::Future: 'static,
T::Error: 'static, T::Error: 'static,
I: FromStream,
{ {
type Request = (Option<CounterGuard>, ServerMessage); type Request = (Option<CounterGuard>, ServerMessage);
type Response = (); type Response = ();
@ -74,7 +75,7 @@ where
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::Connect(stream) => {
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { let stream = FromStream::from_stdstream(stream).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e); error!("Can not convert to an async tcp stream: {}", e);
}); });
@ -93,50 +94,55 @@ where
} }
} }
pub(crate) struct StreamNewService<F: ServiceFactory> { pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
name: String, name: String,
inner: F, inner: F,
token: Token, token: Token,
addr: SocketAddr, addr: SocketAddr,
_t: PhantomData<Io>,
} }
impl<F> StreamNewService<F> impl<F, Io> StreamNewService<F, Io>
where where
F: ServiceFactory, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
{ {
pub(crate) fn create( pub(crate) fn create(
name: String, name: String,
token: Token, token: Token,
inner: F, inner: F,
addr: SocketAddr, addr: SocketAddr,
) -> Box<InternalServiceFactory> { ) -> Box<dyn InternalServiceFactory> {
Box::new(Self { Box::new(Self {
name, name,
token, token,
inner, inner,
addr, addr,
_t: PhantomData,
}) })
} }
} }
impl<F> InternalServiceFactory for StreamNewService<F> impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
where where
F: ServiceFactory, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
{ {
fn name(&self, _: Token) -> &str { fn name(&self, _: Token) -> &str {
&self.name &self.name
} }
fn clone_factory(&self) -> Box<InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
Box::new(Self { Box::new(Self {
name: self.name.clone(), name: self.name.clone(),
inner: self.inner.clone(), inner: self.inner.clone(),
token: self.token, token: self.token,
addr: self.addr, addr: self.addr,
_t: PhantomData,
}) })
} }
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token; let token = self.token;
let config = ServerConfig::new(self.addr); let config = ServerConfig::new(self.addr);
Box::new( Box::new(
@ -152,24 +158,25 @@ where
} }
} }
impl InternalServiceFactory for Box<InternalServiceFactory> { impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
fn name(&self, token: Token) -> &str { fn name(&self, token: Token) -> &str {
self.as_ref().name(token) self.as_ref().name(token)
} }
fn clone_factory(&self) -> Box<InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
self.as_ref().create() self.as_ref().create()
} }
} }
impl<F, T> ServiceFactory for F impl<F, T, I> ServiceFactory<I> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: NewService<Config = ServerConfig, Request = Io<TcpStream>>, T: NewService<Config = ServerConfig, Request = Io<I>>,
I: FromStream,
{ {
type NewService = T; type NewService = T;

View File

@ -27,7 +27,7 @@ pub(crate) struct Signals {
streams: Vec<SigStream>, streams: Vec<SigStream>,
} }
type SigStream = Box<Stream<Item = Signal, Error = io::Error>>; type SigStream = Box<dyn Stream<Item = Signal, Error = io::Error>>;
impl Signals { impl Signals {
pub(crate) fn start(srv: Server) { pub(crate) fn start(srv: Server) {
@ -46,7 +46,7 @@ impl Signals {
{ {
use tokio_signal::unix; use tokio_signal::unix;
let mut sigs: Vec<Box<Future<Item = SigStream, Error = io::Error>>> = let mut sigs: Vec<Box<dyn Future<Item = SigStream, Error = io::Error>>> =
Vec::new(); Vec::new();
sigs.push(Box::new( sigs.push(Box::new(
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| { tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| {

173
actix-server/src/socket.rs Normal file
View File

@ -0,0 +1,173 @@
use std::{fmt, io, net};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
pub(crate) enum StdListener {
Tcp(net::TcpListener),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::UnixListener),
}
pub(crate) enum SocketAddr {
Tcp(net::SocketAddr),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(all(unix, feature = "uds"))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix, feature = "uds"))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Display for StdListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
}
}
}
impl StdListener {
pub(crate) fn local_addr(&self) -> SocketAddr {
match self {
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
}
}
pub(crate) fn into_listener(self) -> SocketListener {
match self {
StdListener::Tcp(lst) => SocketListener::Tcp(
mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener"),
),
#[cfg(all(unix, feature = "uds"))]
StdListener::Uds(lst) => SocketListener::Uds(
mio_uds::UnixListener::from_listener(lst)
.expect("Can not create mio_uds::UnixListener"),
),
}
}
}
#[derive(Debug)]
pub enum StdStream {
Tcp(std::net::TcpStream),
#[cfg(all(unix, feature = "uds"))]
Uds(std::os::unix::net::UnixStream),
}
pub(crate) enum SocketListener {
Tcp(mio::net::TcpListener),
#[cfg(all(unix, feature = "uds"))]
Uds(mio_uds::UnixListener),
}
impl SocketListener {
pub(crate) fn accept(&self) -> io::Result<Option<(StdStream, SocketAddr)>> {
match *self {
SocketListener::Tcp(ref lst) => lst
.accept_std()
.map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.accept_std().map(|res| {
res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr)))
}),
}
}
}
impl mio::Evented for SocketListener {
fn register(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts),
}
}
fn reregister(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts),
}
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.deregister(poll),
#[cfg(all(unix, feature = "uds"))]
SocketListener::Uds(ref lst) => {
let res = lst.deregister(poll);
// cleanup file path
if let Ok(addr) = lst.local_addr() {
if let Some(path) = addr.as_pathname() {
let _ = std::fs::remove_file(path);
}
}
res
}
}
}
}
pub trait FromStream: AsyncRead + AsyncWrite + Sized {
fn from_stdstream(sock: StdStream) -> io::Result<Self>;
}
impl FromStream for TcpStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(stream) => TcpStream::from_std(stream, &Handle::default()),
#[cfg(all(unix, feature = "uds"))]
StdStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
#[cfg(all(unix, feature = "uds"))]
impl FromStream for tokio_uds::UnixStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
StdStream::Uds(stream) => {
tokio_uds::UnixStream::from_std(stream, &Handle::default())
}
}
}
}

View File

@ -1,6 +1,6 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{mem, net, time}; use std::{mem, time};
use actix_rt::{spawn, Arbiter}; use actix_rt::{spawn, Arbiter};
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@ -12,6 +12,7 @@ use tokio_timer::{sleep, Delay};
use crate::accept::AcceptNotify; use crate::accept::AcceptNotify;
use crate::counter::Counter; use crate::counter::Counter;
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{SocketAddr, StdStream};
use crate::Token; use crate::Token;
pub(crate) struct WorkerCommand(Conn); pub(crate) struct WorkerCommand(Conn);
@ -25,9 +26,9 @@ pub(crate) struct StopCommand {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Conn { pub(crate) struct Conn {
pub io: net::TcpStream, pub io: StdStream,
pub token: Token, pub token: Token,
pub peer: Option<net::SocketAddr>, pub peer: Option<SocketAddr>,
} }
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
@ -127,7 +128,7 @@ pub(crate) struct Worker {
services: Vec<Option<(usize, BoxedServerService)>>, services: Vec<Option<(usize, BoxedServerService)>>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
factories: Vec<Box<InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState, state: WorkerState,
shutdown_timeout: time::Duration, shutdown_timeout: time::Duration,
} }
@ -136,7 +137,7 @@ impl Worker {
pub(crate) fn start( pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>, rx2: UnboundedReceiver<StopCommand>,
factories: Vec<Box<InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
shutdown_timeout: time::Duration, shutdown_timeout: time::Duration,
) { ) {
@ -237,7 +238,7 @@ enum WorkerState {
Restarting( Restarting(
usize, usize,
Token, Token,
Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>, Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
), ),
Shutdown(Delay, Delay, oneshot::Sender<bool>), Shutdown(Delay, Delay, oneshot::Sender<bool>),
} }