From 311bb14d97e7530031149c561a907404b3753182 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 18 Jul 2019 17:05:40 +0600 Subject: [PATCH] add unix domain sockets support #3 --- actix-server/CHANGES.md | 7 ++ actix-server/Cargo.toml | 21 +++-- actix-server/src/accept.rs | 30 +++--- actix-server/src/builder.rs | 48 ++++++++-- actix-server/src/config.rs | 22 ++--- actix-server/src/lib.rs | 4 + actix-server/src/services.rs | 57 +++++++----- actix-server/src/signals.rs | 4 +- actix-server/src/socket.rs | 173 +++++++++++++++++++++++++++++++++++ actix-server/src/worker.rs | 13 +-- 10 files changed, 304 insertions(+), 75 deletions(-) create mode 100644 actix-server/src/socket.rs diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index f4e89442..9094f5ff 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.6.0] - 2019-07-18 + +### Added + +* Support Unix domain sockets #3 + + ## [0.5.1] - 2019-05-18 ### Changed diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index dee5b0c0..c7c40820 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "0.5.1" +version = "0.6.0" authors = ["Nikolay Kim "] description = "Actix server - General purpose tcp server" keywords = ["network", "framework", "async", "futures"] @@ -14,7 +14,7 @@ edition = "2018" workspace = ".." [package.metadata.docs.rs] -features = ["ssl", "tls", "rust-tls"] +features = ["ssl", "tls", "rust-tls", "uds"] [lib] name = "actix_server" @@ -32,15 +32,18 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"] # rustls rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"] +# uds +uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"] + [dependencies] -actix-rt = "0.2.1" -actix-service = "0.4.0" -actix-server-config = "0.1.1" +actix-rt = "0.2.2" +actix-service = "0.4.1" +actix-server-config = "0.1.2" log = "0.4" num_cpus = "1.0" -mio = "0.6.13" +mio = "0.6.19" net2 = "0.2" futures = "0.1" slab = "0.4" @@ -50,6 +53,10 @@ tokio-timer = "0.2.8" tokio-reactor = "0.1" tokio-signal = "0.2" +# unix domain sockets +mio-uds = { version="0.6.7", optional = true } +tokio-uds = { version="0.2.5", optional = true } + # native-tls native-tls = { version="0.2", optional = true } @@ -57,7 +64,7 @@ native-tls = { version="0.2", optional = true } openssl = { version="0.10", optional = true } tokio-openssl = { version="0.3", optional = true } -#rustls +# rustls rustls = { version = "0.15.2", optional = true } tokio-rustls = { version = "0.9.1", optional = true } webpki = { version = "0.19", optional = true } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index e02fb05f..ce7c9325 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,17 +1,17 @@ use std::sync::mpsc as sync_mpsc; use std::time::{Duration, Instant}; -use std::{io, net, thread}; +use std::{io, thread}; use actix_rt::System; use futures::future::{lazy, Future}; use log::{error, info}; -use mio; use slab::Slab; use tokio_timer::Delay; -use super::server::Server; -use super::worker::{Conn, WorkerClient}; -use super::Token; +use crate::server::Server; +use crate::socket::{SocketAddr, SocketListener, StdListener}; +use crate::worker::{Conn, WorkerClient}; +use crate::Token; pub(crate) enum Command { Pause, @@ -21,9 +21,9 @@ pub(crate) enum Command { } struct ServerSocketInfo { - addr: net::SocketAddr, + addr: SocketAddr, token: Token, - sock: mio::net::TcpListener, + sock: SocketListener, timeout: Option, } @@ -84,7 +84,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, - socks: Vec<(Token, net::TcpListener)>, + socks: Vec<(Token, StdListener)>, workers: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); @@ -135,7 +135,7 @@ impl Accept { rx: sync_mpsc::Receiver, cmd_reg: mio::Registration, notify_reg: mio::Registration, - socks: Vec<(Token, net::TcpListener)>, + socks: Vec<(Token, StdListener)>, srv: Server, workers: Vec, ) { @@ -174,7 +174,7 @@ impl Accept { fn new( rx: sync_mpsc::Receiver, - socks: Vec<(Token, net::TcpListener)>, + socks: Vec<(Token, StdListener)>, workers: Vec, srv: Server, ) -> Accept { @@ -187,10 +187,9 @@ impl Accept { // Start accept let mut sockets = Slab::new(); for (hnd_token, lst) in socks.into_iter() { - let addr = lst.local_addr().unwrap(); - let server = mio::net::TcpListener::from_std(lst) - .expect("Can not create mio::net::TcpListener"); + let addr = lst.local_addr(); + let server = lst.into_listener(); let entry = sockets.vacant_entry(); let token = entry.key(); @@ -422,12 +421,13 @@ impl Accept { fn accept(&mut self, token: usize) { loop { let msg = if let Some(info) = self.sockets.get_mut(token) { - match info.sock.accept_std() { - Ok((io, addr)) => Conn { + match info.sock.accept() { + Ok(Some((io, addr))) => Conn { io, token: info.token, peer: Some(addr), }, + Ok(None) => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, Err(e) => { diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index be7e8107..d9f809fc 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -9,6 +9,7 @@ use futures::{Async, Future, Poll, Stream}; use log::{error, info}; use net2::TcpBuilder; use num_cpus; +use tokio_tcp::TcpStream; use tokio_timer::sleep; use crate::accept::{AcceptLoop, AcceptNotify, Command}; @@ -16,6 +17,7 @@ use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; +use crate::socket::StdListener; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::{ssl, Token}; @@ -25,8 +27,8 @@ pub struct ServerBuilder { token: Token, backlog: i32, workers: Vec<(usize, WorkerClient)>, - services: Vec>, - sockets: Vec<(Token, net::TcpListener)>, + services: Vec>, + sockets: Vec<(Token, StdListener)>, accept: AcceptLoop, exit: bool, shutdown_timeout: Duration, @@ -151,7 +153,7 @@ impl ServerBuilder { for (name, lst) in cfg.services { let token = self.token.next(); srv.stream(token, name, lst.local_addr()?); - self.sockets.push((token, lst)); + self.sockets.push((token, StdListener::Tcp(lst))); } self.services.push(Box::new(srv)); } @@ -163,7 +165,7 @@ impl ServerBuilder { /// Add new service to the server. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where - F: ServiceFactory, + F: ServiceFactory, U: net::ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; @@ -176,11 +178,39 @@ impl ServerBuilder { factory.clone(), lst.local_addr()?, )); - self.sockets.push((token, lst)); + self.sockets.push((token, StdListener::Tcp(lst))); } Ok(self) } + #[cfg(all(unix, feature = "uds"))] + /// Add new unix domain service to the server. + pub fn bind_uds(mut self, name: N, addr: U, factory: F) -> io::Result + where + F: ServiceFactory, + N: AsRef, + U: AsRef, + { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::os::unix::net::UnixListener; + + // TODO: need to do something with existing paths + let _ = std::fs::remove_file(addr.as_ref()); + + let lst = UnixListener::bind(addr)?; + + let token = self.token.next(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + self.services.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory.clone(), + addr, + )); + self.sockets.push((token, StdListener::Uds(lst))); + Ok(self) + } + /// Add new service to the server. pub fn listen>( mut self, @@ -189,7 +219,7 @@ impl ServerBuilder { factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory, { let token = self.token.next(); self.services.push(StreamNewService::create( @@ -198,7 +228,7 @@ impl ServerBuilder { factory, lst.local_addr()?, )); - self.sockets.push((token, lst)); + self.sockets.push((token, StdListener::Tcp(lst))); Ok(self) } @@ -243,7 +273,7 @@ impl ServerBuilder { // start accept thread for sock in &self.sockets { - info!("Starting server on {}", sock.1.local_addr().ok().unwrap()); + info!("Starting server on {}", sock.1); } self.accept .start(mem::replace(&mut self.sockets, Vec::new()), workers); @@ -266,7 +296,7 @@ impl ServerBuilder { let timeout = self.shutdown_timeout; let avail = WorkerAvailability::new(notify); let worker = WorkerClient::new(idx, tx1, tx2, avail.clone()); - let services: Vec> = + let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); Arbiter::new().send(lazy(move || { diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 1bb2d2ac..e224d6a4 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -17,7 +17,7 @@ use super::Token; pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, - pub(crate) apply: Option>, + pub(crate) apply: Option>, pub(crate) threads: usize, pub(crate) backlog: i32, } @@ -75,13 +75,13 @@ impl ServiceConfig { } pub(super) struct ConfiguredService { - rt: Box, + rt: Box, names: HashMap, services: HashMap, } impl ConfiguredService { - pub(super) fn new(rt: Box) -> Self { + pub(super) fn new(rt: Box) -> Self { ConfiguredService { rt, names: HashMap::new(), @@ -100,7 +100,7 @@ impl InternalServiceFactory for ConfiguredService { &self.names[&token].0 } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { Box::new(Self { rt: self.rt.clone(), names: self.names.clone(), @@ -108,7 +108,7 @@ impl InternalServiceFactory for ConfiguredService { }) } - fn create(&self) -> Box, Error = ()>> { + fn create(&self) -> Box, Error = ()>> { // configure services let mut rt = ServiceRuntime::new(self.services.clone()); self.rt.configure(&mut rt); @@ -156,7 +156,7 @@ impl InternalServiceFactory for ConfiguredService { } pub(super) trait ServiceRuntimeConfiguration: Send { - fn clone(&self) -> Box; + fn clone(&self) -> Box; fn configure(&self, rt: &mut ServiceRuntime); } @@ -165,7 +165,7 @@ impl ServiceRuntimeConfiguration for F where F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, { - fn clone(&self) -> Box { + fn clone(&self) -> Box { Box::new(self.clone()) } @@ -181,7 +181,7 @@ fn not_configured(_: &mut ServiceRuntime) { pub struct ServiceRuntime { names: HashMap, services: HashMap, - onstart: Vec>>, + onstart: Vec>>, } impl ServiceRuntime { @@ -236,14 +236,14 @@ impl ServiceRuntime { } type BoxedNewService = Box< - NewService< + dyn NewService< Request = (Option, ServerMessage), Response = (), Error = (), InitError = (), Config = ServerConfig, Service = BoxedServerService, - Future = Box>, + Future = Box>, >, >; @@ -265,7 +265,7 @@ where type InitError = (); type Config = ServerConfig; type Service = BoxedServerService; - type Future = Box>; + type Future = Box>; fn new_service(&self, cfg: &ServerConfig) -> Self::Future { Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| { diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 3948fbf9..3e8cc8fc 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -7,6 +7,7 @@ mod counter; mod server; mod services; mod signals; +mod socket; pub mod ssl; mod worker; @@ -17,6 +18,9 @@ pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::services::ServiceFactory; +#[doc(hidden)] +pub use self::socket::FromStream; + #[doc(hidden)] pub use self::services::ServiceFactory as StreamServiceFactory; diff --git a/actix-server/src/services.rs b/actix-server/src/services.rs index afdc06ee..6dd90838 100644 --- a/actix-server/src/services.rs +++ b/actix-server/src/services.rs @@ -1,4 +1,5 @@ -use std::net::{self, SocketAddr}; +use std::marker::PhantomData; +use std::net::SocketAddr; use std::time::Duration; use actix_rt::spawn; @@ -7,24 +8,23 @@ use actix_service::{NewService, Service}; use futures::future::{err, ok, FutureResult}; use futures::{Future, Poll}; use log::error; -use tokio_reactor::Handle; -use tokio_tcp::TcpStream; use super::Token; use crate::counter::CounterGuard; +use crate::socket::{FromStream, StdStream}; /// Server message pub(crate) enum ServerMessage { /// New stream - Connect(net::TcpStream), + Connect(StdStream), /// Gracefull shutdown Shutdown(Duration), /// Force shutdown ForceShutdown, } -pub trait ServiceFactory: Send + Clone + 'static { - type NewService: NewService>; +pub trait ServiceFactory: Send + Clone + 'static { + type NewService: NewService>; fn create(&self) -> Self::NewService; } @@ -32,13 +32,13 @@ pub trait ServiceFactory: Send + Clone + 'static { pub(crate) trait InternalServiceFactory: Send { fn name(&self, token: Token) -> &str; - fn clone_factory(&self) -> Box; + fn clone_factory(&self) -> Box; - fn create(&self) -> Box, Error = ()>>; + fn create(&self) -> Box, Error = ()>>; } pub(crate) type BoxedServerService = Box< - Service< + dyn Service< Request = (Option, ServerMessage), Response = (), Error = (), @@ -56,11 +56,12 @@ impl StreamService { } } -impl Service for StreamService +impl Service for StreamService where - T: Service>, + T: Service>, T::Future: 'static, T::Error: 'static, + I: FromStream, { type Request = (Option, ServerMessage); type Response = (); @@ -74,7 +75,7 @@ where fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { - let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { + let stream = FromStream::from_stdstream(stream).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); }); @@ -93,50 +94,55 @@ where } } -pub(crate) struct StreamNewService { +pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, token: Token, addr: SocketAddr, + _t: PhantomData, } -impl StreamNewService +impl StreamNewService where - F: ServiceFactory, + F: ServiceFactory, + Io: FromStream + Send + 'static, { pub(crate) fn create( name: String, token: Token, inner: F, addr: SocketAddr, - ) -> Box { + ) -> Box { Box::new(Self { name, token, inner, addr, + _t: PhantomData, }) } } -impl InternalServiceFactory for StreamNewService +impl InternalServiceFactory for StreamNewService where - F: ServiceFactory, + F: ServiceFactory, + Io: FromStream + Send + 'static, { fn name(&self, _: Token) -> &str { &self.name } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), token: self.token, addr: self.addr, + _t: PhantomData, }) } - fn create(&self) -> Box, Error = ()>> { + fn create(&self) -> Box, Error = ()>> { let token = self.token; let config = ServerConfig::new(self.addr); Box::new( @@ -152,24 +158,25 @@ where } } -impl InternalServiceFactory for Box { +impl InternalServiceFactory for Box { fn name(&self, token: Token) -> &str { self.as_ref().name(token) } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } - fn create(&self) -> Box, Error = ()>> { + fn create(&self) -> Box, Error = ()>> { self.as_ref().create() } } -impl ServiceFactory for F +impl ServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, - T: NewService>, + T: NewService>, + I: FromStream, { type NewService = T; diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index c3c6dd26..4d9e085f 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -27,7 +27,7 @@ pub(crate) struct Signals { streams: Vec, } -type SigStream = Box>; +type SigStream = Box>; impl Signals { pub(crate) fn start(srv: Server) { @@ -46,7 +46,7 @@ impl Signals { { use tokio_signal::unix; - let mut sigs: Vec>> = + let mut sigs: Vec>> = Vec::new(); sigs.push(Box::new( tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| { diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs new file mode 100644 index 00000000..4cbefb80 --- /dev/null +++ b/actix-server/src/socket.rs @@ -0,0 +1,173 @@ +use std::{fmt, io, net}; + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_reactor::Handle; +use tokio_tcp::TcpStream; + +pub(crate) enum StdListener { + Tcp(net::TcpListener), + #[cfg(all(unix, feature = "uds"))] + Uds(std::os::unix::net::UnixListener), +} + +pub(crate) enum SocketAddr { + Tcp(net::SocketAddr), + #[cfg(all(unix, feature = "uds"))] + Uds(std::os::unix::net::SocketAddr), +} + +impl fmt::Display for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), + #[cfg(all(unix, feature = "uds"))] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), + #[cfg(all(unix, feature = "uds"))] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + +impl fmt::Display for StdListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), + #[cfg(all(unix, feature = "uds"))] + StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), + } + } +} + +impl StdListener { + pub(crate) fn local_addr(&self) -> SocketAddr { + match self { + StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), + #[cfg(all(unix, feature = "uds"))] + StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()), + } + } + + pub(crate) fn into_listener(self) -> SocketListener { + match self { + StdListener::Tcp(lst) => SocketListener::Tcp( + mio::net::TcpListener::from_std(lst) + .expect("Can not create mio::net::TcpListener"), + ), + #[cfg(all(unix, feature = "uds"))] + StdListener::Uds(lst) => SocketListener::Uds( + mio_uds::UnixListener::from_listener(lst) + .expect("Can not create mio_uds::UnixListener"), + ), + } + } +} + +#[derive(Debug)] +pub enum StdStream { + Tcp(std::net::TcpStream), + #[cfg(all(unix, feature = "uds"))] + Uds(std::os::unix::net::UnixStream), +} + +pub(crate) enum SocketListener { + Tcp(mio::net::TcpListener), + #[cfg(all(unix, feature = "uds"))] + Uds(mio_uds::UnixListener), +} + +impl SocketListener { + pub(crate) fn accept(&self) -> io::Result> { + match *self { + SocketListener::Tcp(ref lst) => lst + .accept_std() + .map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))), + #[cfg(all(unix, feature = "uds"))] + SocketListener::Uds(ref lst) => lst.accept_std().map(|res| { + res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr))) + }), + } + } +} + +impl mio::Evented for SocketListener { + fn register( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> io::Result<()> { + match *self { + SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts), + #[cfg(all(unix, feature = "uds"))] + SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts), + } + } + + fn reregister( + &self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::Ready, + opts: mio::PollOpt, + ) -> io::Result<()> { + match *self { + SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts), + #[cfg(all(unix, feature = "uds"))] + SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts), + } + } + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + match *self { + SocketListener::Tcp(ref lst) => lst.deregister(poll), + #[cfg(all(unix, feature = "uds"))] + SocketListener::Uds(ref lst) => { + let res = lst.deregister(poll); + + // cleanup file path + if let Ok(addr) = lst.local_addr() { + if let Some(path) = addr.as_pathname() { + let _ = std::fs::remove_file(path); + } + } + res + } + } + } +} + +pub trait FromStream: AsyncRead + AsyncWrite + Sized { + fn from_stdstream(sock: StdStream) -> io::Result; +} + +impl FromStream for TcpStream { + fn from_stdstream(sock: StdStream) -> io::Result { + match sock { + StdStream::Tcp(stream) => TcpStream::from_std(stream, &Handle::default()), + #[cfg(all(unix, feature = "uds"))] + StdStream::Uds(_) => { + panic!("Should not happen, bug in server impl"); + } + } + } +} + +#[cfg(all(unix, feature = "uds"))] +impl FromStream for tokio_uds::UnixStream { + fn from_stdstream(sock: StdStream) -> io::Result { + match sock { + StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"), + StdStream::Uds(stream) => { + tokio_uds::UnixStream::from_std(stream, &Handle::default()) + } + } + } +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f72294e2..e03688a8 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,6 +1,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use std::{mem, net, time}; +use std::{mem, time}; use actix_rt::{spawn, Arbiter}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -12,6 +12,7 @@ use tokio_timer::{sleep, Delay}; use crate::accept::AcceptNotify; use crate::counter::Counter; use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; +use crate::socket::{SocketAddr, StdStream}; use crate::Token; pub(crate) struct WorkerCommand(Conn); @@ -25,9 +26,9 @@ pub(crate) struct StopCommand { #[derive(Debug)] pub(crate) struct Conn { - pub io: net::TcpStream, + pub io: StdStream, pub token: Token, - pub peer: Option, + pub peer: Option, } static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); @@ -127,7 +128,7 @@ pub(crate) struct Worker { services: Vec>, availability: WorkerAvailability, conns: Counter, - factories: Vec>, + factories: Vec>, state: WorkerState, shutdown_timeout: time::Duration, } @@ -136,7 +137,7 @@ impl Worker { pub(crate) fn start( rx: UnboundedReceiver, rx2: UnboundedReceiver, - factories: Vec>, + factories: Vec>, availability: WorkerAvailability, shutdown_timeout: time::Duration, ) { @@ -237,7 +238,7 @@ enum WorkerState { Restarting( usize, Token, - Box, Error = ()>>, + Box, Error = ()>>, ), Shutdown(Delay, Delay, oneshot::Sender), }