mirror of
https://github.com/actix/actix-extras.git
synced 2025-06-29 19:24:58 +02:00
use one thread for accept loop; refactor rust-tls support
This commit is contained in:
@ -1,22 +1,16 @@
|
||||
use std::sync::mpsc as sync_mpsc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{io, net, thread};
|
||||
|
||||
use futures::sync::mpsc;
|
||||
use futures::{sync::mpsc, Future};
|
||||
use mio;
|
||||
use slab::Slab;
|
||||
use tokio_timer::Delay;
|
||||
|
||||
#[cfg(feature = "tls")]
|
||||
use native_tls::TlsAcceptor;
|
||||
|
||||
#[cfg(feature = "alpn")]
|
||||
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
use rustls::ServerConfig;
|
||||
use actix::{msgs::Execute, Arbiter, System};
|
||||
|
||||
use super::srv::{ServerCommand, Socket};
|
||||
use super::worker::{Conn, SocketInfo};
|
||||
use super::worker::Conn;
|
||||
|
||||
pub(crate) enum Command {
|
||||
Pause,
|
||||
@ -25,169 +19,43 @@ pub(crate) enum Command {
|
||||
Worker(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>),
|
||||
}
|
||||
|
||||
struct ServerSocketInfo {
|
||||
addr: net::SocketAddr,
|
||||
token: usize,
|
||||
sock: mio::net::TcpListener,
|
||||
timeout: Option<Instant>,
|
||||
}
|
||||
|
||||
struct Accept {
|
||||
poll: mio::Poll,
|
||||
rx: sync_mpsc::Receiver<Command>,
|
||||
sockets: Slab<ServerSocketInfo>,
|
||||
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
||||
_reg: mio::Registration,
|
||||
next: usize,
|
||||
srv: mpsc::UnboundedSender<ServerCommand>,
|
||||
timer: (mio::Registration, mio::SetReadiness),
|
||||
}
|
||||
|
||||
const CMD: mio::Token = mio::Token(0);
|
||||
const TIMER: mio::Token = mio::Token(1);
|
||||
|
||||
pub(crate) fn start_accept_thread(
|
||||
token: usize, sock: Socket, srv: mpsc::UnboundedSender<ServerCommand>,
|
||||
socks: Slab<SocketInfo>,
|
||||
mut workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
||||
socks: Vec<(usize, Socket)>, srv: mpsc::UnboundedSender<ServerCommand>,
|
||||
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
||||
) -> (mio::SetReadiness, sync_mpsc::Sender<Command>) {
|
||||
let (tx, rx) = sync_mpsc::channel();
|
||||
let (reg, readiness) = mio::Registration::new2();
|
||||
|
||||
let sys = System::current();
|
||||
|
||||
// start accept thread
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
|
||||
let _ = thread::Builder::new()
|
||||
.name(format!("Accept on {}", sock.addr))
|
||||
.name("actix-web accept loop".to_owned())
|
||||
.spawn(move || {
|
||||
const SRV: mio::Token = mio::Token(0);
|
||||
const CMD: mio::Token = mio::Token(1);
|
||||
|
||||
let addr = sock.addr;
|
||||
let mut server = Some(
|
||||
mio::net::TcpListener::from_std(sock.lst)
|
||||
.expect("Can not create mio::net::TcpListener"),
|
||||
);
|
||||
|
||||
// Create a poll instance
|
||||
let poll = match mio::Poll::new() {
|
||||
Ok(poll) => poll,
|
||||
Err(err) => panic!("Can not create mio::Poll: {}", err),
|
||||
};
|
||||
|
||||
// Start listening for incoming connections
|
||||
if let Some(ref srv) = server {
|
||||
if let Err(err) =
|
||||
poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge())
|
||||
{
|
||||
panic!("Can not register io: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
// Start listening for incoming commands
|
||||
if let Err(err) =
|
||||
poll.register(®, CMD, mio::Ready::readable(), mio::PollOpt::edge())
|
||||
{
|
||||
panic!("Can not register Registration: {}", err);
|
||||
}
|
||||
|
||||
// Create storage for events
|
||||
let mut events = mio::Events::with_capacity(128);
|
||||
|
||||
// Sleep on error
|
||||
let sleep = Duration::from_millis(100);
|
||||
|
||||
let mut next = 0;
|
||||
loop {
|
||||
if let Err(err) = poll.poll(&mut events, None) {
|
||||
panic!("Poll error: {}", err);
|
||||
}
|
||||
|
||||
for event in events.iter() {
|
||||
match event.token() {
|
||||
SRV => if let Some(ref server) = server {
|
||||
loop {
|
||||
match server.accept_std() {
|
||||
Ok((io, addr)) => {
|
||||
let mut msg = Conn {
|
||||
io,
|
||||
token,
|
||||
peer: Some(addr),
|
||||
http2: false,
|
||||
};
|
||||
while !workers.is_empty() {
|
||||
match workers[next].1.unbounded_send(msg) {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
let _ = srv.unbounded_send(
|
||||
ServerCommand::WorkerDied(
|
||||
workers[next].0,
|
||||
socks.clone(),
|
||||
),
|
||||
);
|
||||
msg = err.into_inner();
|
||||
workers.swap_remove(next);
|
||||
if workers.is_empty() {
|
||||
error!("No workers");
|
||||
thread::sleep(sleep);
|
||||
break;
|
||||
} else if workers.len() <= next {
|
||||
next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
next = (next + 1) % workers.len();
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(ref e)
|
||||
if e.kind() == io::ErrorKind::WouldBlock =>
|
||||
{
|
||||
break
|
||||
}
|
||||
Err(ref e) if connection_error(e) => continue,
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
// sleep after error
|
||||
thread::sleep(sleep);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
CMD => match rx.try_recv() {
|
||||
Ok(cmd) => match cmd {
|
||||
Command::Pause => if let Some(ref server) = server {
|
||||
if let Err(err) = poll.deregister(server) {
|
||||
error!(
|
||||
"Can not deregister server socket {}",
|
||||
err
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"Paused accepting connections on {}",
|
||||
addr
|
||||
);
|
||||
}
|
||||
},
|
||||
Command::Resume => {
|
||||
if let Some(ref server) = server {
|
||||
if let Err(err) = poll.register(
|
||||
server,
|
||||
SRV,
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
error!("Can not resume socket accept process: {}", err);
|
||||
} else {
|
||||
info!("Accepting connections on {} has been resumed",
|
||||
addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::Stop => {
|
||||
if let Some(server) = server.take() {
|
||||
let _ = poll.deregister(&server);
|
||||
}
|
||||
return;
|
||||
}
|
||||
Command::Worker(idx, addr) => {
|
||||
workers.push((idx, addr));
|
||||
}
|
||||
},
|
||||
Err(err) => match err {
|
||||
sync_mpsc::TryRecvError::Empty => (),
|
||||
sync_mpsc::TryRecvError::Disconnected => {
|
||||
if let Some(server) = server.take() {
|
||||
let _ = poll.deregister(&server);
|
||||
}
|
||||
return;
|
||||
}
|
||||
},
|
||||
},
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
System::set_current(sys);
|
||||
Accept::new(reg, rx, socks, workers, srv).poll();
|
||||
});
|
||||
|
||||
(readiness, tx)
|
||||
@ -205,3 +73,244 @@ fn connection_error(e: &io::Error) -> bool {
|
||||
|| e.kind() == io::ErrorKind::ConnectionAborted
|
||||
|| e.kind() == io::ErrorKind::ConnectionReset
|
||||
}
|
||||
|
||||
impl Accept {
|
||||
fn new(
|
||||
_reg: mio::Registration, rx: sync_mpsc::Receiver<Command>,
|
||||
socks: Vec<(usize, Socket)>,
|
||||
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
||||
srv: mpsc::UnboundedSender<ServerCommand>,
|
||||
) -> Accept {
|
||||
// Create a poll instance
|
||||
let poll = match mio::Poll::new() {
|
||||
Ok(poll) => poll,
|
||||
Err(err) => panic!("Can not create mio::Poll: {}", err),
|
||||
};
|
||||
|
||||
// Start listening for incoming commands
|
||||
if let Err(err) =
|
||||
poll.register(&_reg, CMD, mio::Ready::readable(), mio::PollOpt::edge())
|
||||
{
|
||||
panic!("Can not register Registration: {}", err);
|
||||
}
|
||||
|
||||
// Start accept
|
||||
let mut sockets = Slab::new();
|
||||
for (stoken, sock) in socks {
|
||||
let server = mio::net::TcpListener::from_std(sock.lst)
|
||||
.expect("Can not create mio::net::TcpListener");
|
||||
|
||||
let entry = sockets.vacant_entry();
|
||||
let token = entry.key();
|
||||
|
||||
// Start listening for incoming connections
|
||||
if let Err(err) = poll.register(
|
||||
&server,
|
||||
mio::Token(token + 1000),
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
panic!("Can not register io: {}", err);
|
||||
}
|
||||
|
||||
entry.insert(ServerSocketInfo {
|
||||
token: stoken,
|
||||
addr: sock.addr,
|
||||
sock: server,
|
||||
timeout: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Timer
|
||||
let (tm, tmr) = mio::Registration::new2();
|
||||
if let Err(err) =
|
||||
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
|
||||
{
|
||||
panic!("Can not register Registration: {}", err);
|
||||
}
|
||||
|
||||
Accept {
|
||||
poll,
|
||||
rx,
|
||||
_reg,
|
||||
sockets,
|
||||
workers,
|
||||
srv,
|
||||
next: 0,
|
||||
timer: (tm, tmr),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(&mut self) {
|
||||
// Create storage for events
|
||||
let mut events = mio::Events::with_capacity(128);
|
||||
|
||||
loop {
|
||||
if let Err(err) = self.poll.poll(&mut events, None) {
|
||||
panic!("Poll error: {}", err);
|
||||
}
|
||||
|
||||
for event in events.iter() {
|
||||
let token = event.token();
|
||||
match token {
|
||||
CMD => if !self.process_cmd() {
|
||||
return;
|
||||
},
|
||||
TIMER => self.process_timer(),
|
||||
_ => self.accept(token),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_timer(&mut self) {
|
||||
let now = Instant::now();
|
||||
for (token, info) in self.sockets.iter_mut() {
|
||||
if let Some(inst) = info.timeout.take() {
|
||||
if now > inst {
|
||||
if let Err(err) = self.poll.register(
|
||||
&info.sock,
|
||||
mio::Token(token + 1000),
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
error!("Can not register server socket {}", err);
|
||||
} else {
|
||||
info!("Resume accepting connections on {}", info.addr);
|
||||
}
|
||||
} else {
|
||||
info.timeout = Some(inst);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_cmd(&mut self) -> bool {
|
||||
loop {
|
||||
match self.rx.try_recv() {
|
||||
Ok(cmd) => match cmd {
|
||||
Command::Pause => {
|
||||
for (_, info) in self.sockets.iter_mut() {
|
||||
if let Err(err) = self.poll.deregister(&info.sock) {
|
||||
error!("Can not deregister server socket {}", err);
|
||||
} else {
|
||||
info!("Paused accepting connections on {}", info.addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::Resume => {
|
||||
for (token, info) in self.sockets.iter() {
|
||||
if let Err(err) = self.poll.register(
|
||||
&info.sock,
|
||||
mio::Token(token + 1000),
|
||||
mio::Ready::readable(),
|
||||
mio::PollOpt::edge(),
|
||||
) {
|
||||
error!("Can not resume socket accept process: {}", err);
|
||||
} else {
|
||||
info!(
|
||||
"Accepting connections on {} has been resumed",
|
||||
info.addr
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::Stop => {
|
||||
for (_, info) in self.sockets.iter() {
|
||||
let _ = self.poll.deregister(&info.sock);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
Command::Worker(idx, addr) => {
|
||||
self.workers.push((idx, addr));
|
||||
}
|
||||
},
|
||||
Err(err) => match err {
|
||||
sync_mpsc::TryRecvError::Empty => break,
|
||||
sync_mpsc::TryRecvError::Disconnected => {
|
||||
for (_, info) in self.sockets.iter() {
|
||||
let _ = self.poll.deregister(&info.sock);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
fn accept(&mut self, token: mio::Token) {
|
||||
let token = usize::from(token);
|
||||
if token < 1000 {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(info) = self.sockets.get_mut(token - 1000) {
|
||||
loop {
|
||||
match info.sock.accept_std() {
|
||||
Ok((io, addr)) => {
|
||||
let mut msg = Conn {
|
||||
io,
|
||||
token: info.token,
|
||||
peer: Some(addr),
|
||||
http2: false,
|
||||
};
|
||||
while !self.workers.is_empty() {
|
||||
match self.workers[self.next].1.unbounded_send(msg) {
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
let _ = self.srv.unbounded_send(
|
||||
ServerCommand::WorkerDied(
|
||||
self.workers[self.next].0,
|
||||
),
|
||||
);
|
||||
msg = err.into_inner();
|
||||
self.workers.swap_remove(self.next);
|
||||
if self.workers.is_empty() {
|
||||
error!("No workers");
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
break;
|
||||
} else if self.workers.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
self.next = (self.next + 1) % self.workers.len();
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
|
||||
Err(ref e) if connection_error(e) => continue,
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
if let Err(err) = self.poll.deregister(&info.sock) {
|
||||
error!("Can not deregister server socket {}", err);
|
||||
}
|
||||
|
||||
// sleep after error
|
||||
info.timeout = Some(Instant::now() + Duration::from_millis(500));
|
||||
|
||||
let r = self.timer.1.clone();
|
||||
System::current().arbiter().do_send(Execute::new(
|
||||
move || -> Result<(), ()> {
|
||||
Arbiter::spawn(
|
||||
Delay::new(
|
||||
Instant::now() + Duration::from_millis(510),
|
||||
).map_err(|_| ())
|
||||
.and_then(move |_| {
|
||||
let _ =
|
||||
r.set_readiness(mio::Ready::readable());
|
||||
Ok(())
|
||||
}),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -315,10 +315,10 @@ impl IoStream for TlsStream<TcpStream> {
|
||||
#[cfg(feature = "rust-tls")]
|
||||
use rustls::{ClientSession, ServerSession};
|
||||
#[cfg(feature = "rust-tls")]
|
||||
use tokio_rustls::TlsStream;
|
||||
use tokio_rustls::TlsStream as RustlsStream;
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
impl IoStream for TlsStream<TcpStream, ClientSession> {
|
||||
impl IoStream for RustlsStream<TcpStream, ClientSession> {
|
||||
#[inline]
|
||||
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
|
||||
let _ = <Self as AsyncWrite>::shutdown(self);
|
||||
@ -337,7 +337,7 @@ impl IoStream for TlsStream<TcpStream, ClientSession> {
|
||||
}
|
||||
|
||||
#[cfg(feature = "rust-tls")]
|
||||
impl IoStream for TlsStream<TcpStream, ServerSession> {
|
||||
impl IoStream for RustlsStream<TcpStream, ServerSession> {
|
||||
#[inline]
|
||||
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
|
||||
let _ = <Self as AsyncWrite>::shutdown(self);
|
||||
|
@ -46,14 +46,6 @@ fn configure_alpn(builder: &mut SslAcceptorBuilder) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
|
||||
fn configure_alpn(builder: &mut Arc<ServerConfig>) -> io::Result<()> {
|
||||
Arc::<ServerConfig>::get_mut(builder)
|
||||
.unwrap()
|
||||
.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// An HTTP Server
|
||||
pub struct HttpServer<H>
|
||||
where
|
||||
@ -68,7 +60,11 @@ where
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
|
||||
workers: Vec<(usize, Addr<Worker<H::Handler>>)>,
|
||||
sockets: Vec<Socket>,
|
||||
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
|
||||
accept: Option<(
|
||||
mio::SetReadiness,
|
||||
sync_mpsc::Sender<Command>,
|
||||
Slab<SocketInfo>,
|
||||
)>,
|
||||
exit: bool,
|
||||
shutdown_timeout: u16,
|
||||
signals: Option<Addr<signal::ProcessSignals>>,
|
||||
@ -77,7 +73,7 @@ where
|
||||
}
|
||||
|
||||
pub(crate) enum ServerCommand {
|
||||
WorkerDied(usize, Slab<SocketInfo>),
|
||||
WorkerDied(usize),
|
||||
}
|
||||
|
||||
impl<H> Actor for HttpServer<H>
|
||||
@ -114,7 +110,7 @@ where
|
||||
factory: Arc::new(f),
|
||||
workers: Vec::new(),
|
||||
sockets: Vec::new(),
|
||||
accept: Vec::new(),
|
||||
accept: None,
|
||||
exit: false,
|
||||
shutdown_timeout: 30,
|
||||
signals: None,
|
||||
@ -280,22 +276,22 @@ where
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
|
||||
#[cfg(feature = "rust-tls")]
|
||||
/// Use listener for accepting incoming tls connection requests
|
||||
///
|
||||
/// This method sets alpn protocols to "h2" and "http/1.1"
|
||||
pub fn listen_ssl(
|
||||
mut self, lst: net::TcpListener, mut builder: Arc<ServerConfig>,
|
||||
pub fn listen_rustls(
|
||||
mut self, lst: net::TcpListener, mut builder: ServerConfig,
|
||||
) -> io::Result<Self> {
|
||||
// alpn support
|
||||
if !self.no_http2 {
|
||||
configure_alpn(&mut builder)?;
|
||||
builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
|
||||
}
|
||||
let addr = lst.local_addr().unwrap();
|
||||
self.sockets.push(Socket {
|
||||
addr,
|
||||
lst,
|
||||
tp: StreamHandlerType::Rustls(builder.clone()),
|
||||
tp: StreamHandlerType::Rustls(Arc::new(builder)),
|
||||
});
|
||||
Ok(self)
|
||||
}
|
||||
@ -378,20 +374,21 @@ where
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
|
||||
#[cfg(feature = "rust-tls")]
|
||||
/// Start listening for incoming tls connections.
|
||||
///
|
||||
/// This method sets alpn protocols to "h2" and "http/1.1"
|
||||
pub fn bind_ssl<S: net::ToSocketAddrs>(
|
||||
mut self, addr: S, mut builder: Arc<ServerConfig>,
|
||||
pub fn bind_rustls<S: net::ToSocketAddrs>(
|
||||
mut self, addr: S, mut builder: ServerConfig,
|
||||
) -> io::Result<Self> {
|
||||
// alpn support
|
||||
if !self.no_http2 {
|
||||
configure_alpn(&mut builder)?;
|
||||
builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
|
||||
}
|
||||
|
||||
let builder = Arc::new(builder);
|
||||
let sockets = self.bind2(addr)?;
|
||||
self.sockets.extend(sockets.into_iter().map(|mut s| {
|
||||
self.sockets.extend(sockets.into_iter().map(move |mut s| {
|
||||
s.tp = StreamHandlerType::Rustls(builder.clone());
|
||||
s
|
||||
}));
|
||||
@ -487,17 +484,12 @@ impl<H: IntoHttpHandler> HttpServer<H> {
|
||||
let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false);
|
||||
let workers = self.start_workers(&settings, &socks);
|
||||
|
||||
// start acceptors threads
|
||||
for (token, sock) in addrs {
|
||||
// start accept thread
|
||||
for (_, sock) in &addrs {
|
||||
info!("Starting server on http://{}", sock.addr);
|
||||
self.accept.push(start_accept_thread(
|
||||
token,
|
||||
sock,
|
||||
tx.clone(),
|
||||
socks.clone(),
|
||||
workers.clone(),
|
||||
));
|
||||
}
|
||||
let (r, cmd) = start_accept_thread(addrs, tx.clone(), workers.clone());
|
||||
self.accept = Some((r, cmd, socks));
|
||||
|
||||
// start http server actor
|
||||
let signals = self.subscribe_to_signals();
|
||||
@ -672,7 +664,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
|
||||
|
||||
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
|
||||
match msg {
|
||||
ServerCommand::WorkerDied(idx, socks) => {
|
||||
ServerCommand::WorkerDied(idx) => {
|
||||
let mut found = false;
|
||||
for i in 0..self.workers.len() {
|
||||
if self.workers[i].0 == idx {
|
||||
@ -700,6 +692,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
|
||||
let ka = self.keep_alive;
|
||||
let factory = Arc::clone(&self.factory);
|
||||
let host = self.host.clone();
|
||||
let socks = self.accept.as_ref().unwrap().2.clone();
|
||||
let addr = socks[0].addr;
|
||||
|
||||
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
|
||||
@ -709,7 +702,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
|
||||
ctx.add_message_stream(rx);
|
||||
Worker::new(apps, socks, ka, settings)
|
||||
});
|
||||
for item in &self.accept {
|
||||
if let Some(ref item) = &self.accept {
|
||||
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
|
||||
let _ = item.0.set_readiness(mio::Ready::readable());
|
||||
}
|
||||
|
Reference in New Issue
Block a user