mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-24 07:53:00 +01:00
Merge branch 'master' of https://github.com/actix/actix-web into feat/432
This commit is contained in:
commit
246eafb8d2
@ -32,12 +32,12 @@ script:
|
|||||||
- |
|
- |
|
||||||
if [[ "$TRAVIS_RUST_VERSION" != "stable" ]]; then
|
if [[ "$TRAVIS_RUST_VERSION" != "stable" ]]; then
|
||||||
cargo clean
|
cargo clean
|
||||||
cargo test --features="alpn,tls" -- --nocapture
|
cargo test --features="alpn,tls,rust-tls" -- --nocapture
|
||||||
fi
|
fi
|
||||||
- |
|
- |
|
||||||
if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then
|
if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then
|
||||||
RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install -f cargo-tarpaulin
|
RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install -f cargo-tarpaulin
|
||||||
cargo tarpaulin --features="alpn,tls" --out Xml --no-count
|
cargo tarpaulin --features="alpn,tls,rust-tls" --out Xml --no-count
|
||||||
bash <(curl -s https://codecov.io/bash)
|
bash <(curl -s https://codecov.io/bash)
|
||||||
echo "Uploaded code coverage"
|
echo "Uploaded code coverage"
|
||||||
fi
|
fi
|
||||||
@ -46,7 +46,7 @@ script:
|
|||||||
after_success:
|
after_success:
|
||||||
- |
|
- |
|
||||||
if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then
|
if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then
|
||||||
cargo doc --features "alpn, tls, session" --no-deps &&
|
cargo doc --features "alpn, tls, rust-tls, session" --no-deps &&
|
||||||
echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html &&
|
echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html &&
|
||||||
git clone https://github.com/davisp/ghp-import.git &&
|
git clone https://github.com/davisp/ghp-import.git &&
|
||||||
./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc &&
|
./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc &&
|
||||||
|
@ -1,22 +1,16 @@
|
|||||||
use std::sync::mpsc as sync_mpsc;
|
use std::sync::mpsc as sync_mpsc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
use std::{io, net, thread};
|
use std::{io, net, thread};
|
||||||
|
|
||||||
use futures::sync::mpsc;
|
use futures::{sync::mpsc, Future};
|
||||||
use mio;
|
use mio;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
use tokio_timer::Delay;
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
use actix::{msgs::Execute, Arbiter, System};
|
||||||
use native_tls::TlsAcceptor;
|
|
||||||
|
|
||||||
#[cfg(feature = "alpn")]
|
|
||||||
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
|
|
||||||
|
|
||||||
#[cfg(feature = "rust-tls")]
|
|
||||||
use rustls::ServerConfig;
|
|
||||||
|
|
||||||
use super::srv::{ServerCommand, Socket};
|
use super::srv::{ServerCommand, Socket};
|
||||||
use super::worker::{Conn, SocketInfo};
|
use super::worker::Conn;
|
||||||
|
|
||||||
pub(crate) enum Command {
|
pub(crate) enum Command {
|
||||||
Pause,
|
Pause,
|
||||||
@ -25,169 +19,43 @@ pub(crate) enum Command {
|
|||||||
Worker(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>),
|
Worker(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ServerSocketInfo {
|
||||||
|
addr: net::SocketAddr,
|
||||||
|
token: usize,
|
||||||
|
sock: mio::net::TcpListener,
|
||||||
|
timeout: Option<Instant>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Accept {
|
||||||
|
poll: mio::Poll,
|
||||||
|
rx: sync_mpsc::Receiver<Command>,
|
||||||
|
sockets: Slab<ServerSocketInfo>,
|
||||||
|
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
||||||
|
_reg: mio::Registration,
|
||||||
|
next: usize,
|
||||||
|
srv: mpsc::UnboundedSender<ServerCommand>,
|
||||||
|
timer: (mio::Registration, mio::SetReadiness),
|
||||||
|
}
|
||||||
|
|
||||||
|
const CMD: mio::Token = mio::Token(0);
|
||||||
|
const TIMER: mio::Token = mio::Token(1);
|
||||||
|
|
||||||
pub(crate) fn start_accept_thread(
|
pub(crate) fn start_accept_thread(
|
||||||
token: usize, sock: Socket, srv: mpsc::UnboundedSender<ServerCommand>,
|
socks: Vec<(usize, Socket)>, srv: mpsc::UnboundedSender<ServerCommand>,
|
||||||
socks: Slab<SocketInfo>,
|
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
||||||
mut workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
|
||||||
) -> (mio::SetReadiness, sync_mpsc::Sender<Command>) {
|
) -> (mio::SetReadiness, sync_mpsc::Sender<Command>) {
|
||||||
let (tx, rx) = sync_mpsc::channel();
|
let (tx, rx) = sync_mpsc::channel();
|
||||||
let (reg, readiness) = mio::Registration::new2();
|
let (reg, readiness) = mio::Registration::new2();
|
||||||
|
|
||||||
|
let sys = System::current();
|
||||||
|
|
||||||
// start accept thread
|
// start accept thread
|
||||||
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
|
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
|
||||||
let _ = thread::Builder::new()
|
let _ = thread::Builder::new()
|
||||||
.name(format!("Accept on {}", sock.addr))
|
.name("actix-web accept loop".to_owned())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
const SRV: mio::Token = mio::Token(0);
|
System::set_current(sys);
|
||||||
const CMD: mio::Token = mio::Token(1);
|
Accept::new(reg, rx, socks, workers, srv).poll();
|
||||||
|
|
||||||
let addr = sock.addr;
|
|
||||||
let mut server = Some(
|
|
||||||
mio::net::TcpListener::from_std(sock.lst)
|
|
||||||
.expect("Can not create mio::net::TcpListener"),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Create a poll instance
|
|
||||||
let poll = match mio::Poll::new() {
|
|
||||||
Ok(poll) => poll,
|
|
||||||
Err(err) => panic!("Can not create mio::Poll: {}", err),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Start listening for incoming connections
|
|
||||||
if let Some(ref srv) = server {
|
|
||||||
if let Err(err) =
|
|
||||||
poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge())
|
|
||||||
{
|
|
||||||
panic!("Can not register io: {}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start listening for incoming commands
|
|
||||||
if let Err(err) =
|
|
||||||
poll.register(®, CMD, mio::Ready::readable(), mio::PollOpt::edge())
|
|
||||||
{
|
|
||||||
panic!("Can not register Registration: {}", err);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create storage for events
|
|
||||||
let mut events = mio::Events::with_capacity(128);
|
|
||||||
|
|
||||||
// Sleep on error
|
|
||||||
let sleep = Duration::from_millis(100);
|
|
||||||
|
|
||||||
let mut next = 0;
|
|
||||||
loop {
|
|
||||||
if let Err(err) = poll.poll(&mut events, None) {
|
|
||||||
panic!("Poll error: {}", err);
|
|
||||||
}
|
|
||||||
|
|
||||||
for event in events.iter() {
|
|
||||||
match event.token() {
|
|
||||||
SRV => if let Some(ref server) = server {
|
|
||||||
loop {
|
|
||||||
match server.accept_std() {
|
|
||||||
Ok((io, addr)) => {
|
|
||||||
let mut msg = Conn {
|
|
||||||
io,
|
|
||||||
token,
|
|
||||||
peer: Some(addr),
|
|
||||||
http2: false,
|
|
||||||
};
|
|
||||||
while !workers.is_empty() {
|
|
||||||
match workers[next].1.unbounded_send(msg) {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(err) => {
|
|
||||||
let _ = srv.unbounded_send(
|
|
||||||
ServerCommand::WorkerDied(
|
|
||||||
workers[next].0,
|
|
||||||
socks.clone(),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
msg = err.into_inner();
|
|
||||||
workers.swap_remove(next);
|
|
||||||
if workers.is_empty() {
|
|
||||||
error!("No workers");
|
|
||||||
thread::sleep(sleep);
|
|
||||||
break;
|
|
||||||
} else if workers.len() <= next {
|
|
||||||
next = 0;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
next = (next + 1) % workers.len();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(ref e)
|
|
||||||
if e.kind() == io::ErrorKind::WouldBlock =>
|
|
||||||
{
|
|
||||||
break
|
|
||||||
}
|
|
||||||
Err(ref e) if connection_error(e) => continue,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Error accepting connection: {}", e);
|
|
||||||
// sleep after error
|
|
||||||
thread::sleep(sleep);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
CMD => match rx.try_recv() {
|
|
||||||
Ok(cmd) => match cmd {
|
|
||||||
Command::Pause => if let Some(ref server) = server {
|
|
||||||
if let Err(err) = poll.deregister(server) {
|
|
||||||
error!(
|
|
||||||
"Can not deregister server socket {}",
|
|
||||||
err
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
"Paused accepting connections on {}",
|
|
||||||
addr
|
|
||||||
);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Command::Resume => {
|
|
||||||
if let Some(ref server) = server {
|
|
||||||
if let Err(err) = poll.register(
|
|
||||||
server,
|
|
||||||
SRV,
|
|
||||||
mio::Ready::readable(),
|
|
||||||
mio::PollOpt::edge(),
|
|
||||||
) {
|
|
||||||
error!("Can not resume socket accept process: {}", err);
|
|
||||||
} else {
|
|
||||||
info!("Accepting connections on {} has been resumed",
|
|
||||||
addr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Command::Stop => {
|
|
||||||
if let Some(server) = server.take() {
|
|
||||||
let _ = poll.deregister(&server);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Command::Worker(idx, addr) => {
|
|
||||||
workers.push((idx, addr));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => match err {
|
|
||||||
sync_mpsc::TryRecvError::Empty => (),
|
|
||||||
sync_mpsc::TryRecvError::Disconnected => {
|
|
||||||
if let Some(server) = server.take() {
|
|
||||||
let _ = poll.deregister(&server);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
(readiness, tx)
|
(readiness, tx)
|
||||||
@ -205,3 +73,244 @@ fn connection_error(e: &io::Error) -> bool {
|
|||||||
|| e.kind() == io::ErrorKind::ConnectionAborted
|
|| e.kind() == io::ErrorKind::ConnectionAborted
|
||||||
|| e.kind() == io::ErrorKind::ConnectionReset
|
|| e.kind() == io::ErrorKind::ConnectionReset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Accept {
|
||||||
|
fn new(
|
||||||
|
_reg: mio::Registration, rx: sync_mpsc::Receiver<Command>,
|
||||||
|
socks: Vec<(usize, Socket)>,
|
||||||
|
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
|
||||||
|
srv: mpsc::UnboundedSender<ServerCommand>,
|
||||||
|
) -> Accept {
|
||||||
|
// Create a poll instance
|
||||||
|
let poll = match mio::Poll::new() {
|
||||||
|
Ok(poll) => poll,
|
||||||
|
Err(err) => panic!("Can not create mio::Poll: {}", err),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start listening for incoming commands
|
||||||
|
if let Err(err) =
|
||||||
|
poll.register(&_reg, CMD, mio::Ready::readable(), mio::PollOpt::edge())
|
||||||
|
{
|
||||||
|
panic!("Can not register Registration: {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start accept
|
||||||
|
let mut sockets = Slab::new();
|
||||||
|
for (stoken, sock) in socks {
|
||||||
|
let server = mio::net::TcpListener::from_std(sock.lst)
|
||||||
|
.expect("Can not create mio::net::TcpListener");
|
||||||
|
|
||||||
|
let entry = sockets.vacant_entry();
|
||||||
|
let token = entry.key();
|
||||||
|
|
||||||
|
// Start listening for incoming connections
|
||||||
|
if let Err(err) = poll.register(
|
||||||
|
&server,
|
||||||
|
mio::Token(token + 1000),
|
||||||
|
mio::Ready::readable(),
|
||||||
|
mio::PollOpt::edge(),
|
||||||
|
) {
|
||||||
|
panic!("Can not register io: {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
entry.insert(ServerSocketInfo {
|
||||||
|
token: stoken,
|
||||||
|
addr: sock.addr,
|
||||||
|
sock: server,
|
||||||
|
timeout: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timer
|
||||||
|
let (tm, tmr) = mio::Registration::new2();
|
||||||
|
if let Err(err) =
|
||||||
|
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
|
||||||
|
{
|
||||||
|
panic!("Can not register Registration: {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Accept {
|
||||||
|
poll,
|
||||||
|
rx,
|
||||||
|
_reg,
|
||||||
|
sockets,
|
||||||
|
workers,
|
||||||
|
srv,
|
||||||
|
next: 0,
|
||||||
|
timer: (tm, tmr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(&mut self) {
|
||||||
|
// Create storage for events
|
||||||
|
let mut events = mio::Events::with_capacity(128);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Err(err) = self.poll.poll(&mut events, None) {
|
||||||
|
panic!("Poll error: {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
for event in events.iter() {
|
||||||
|
let token = event.token();
|
||||||
|
match token {
|
||||||
|
CMD => if !self.process_cmd() {
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
TIMER => self.process_timer(),
|
||||||
|
_ => self.accept(token),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_timer(&mut self) {
|
||||||
|
let now = Instant::now();
|
||||||
|
for (token, info) in self.sockets.iter_mut() {
|
||||||
|
if let Some(inst) = info.timeout.take() {
|
||||||
|
if now > inst {
|
||||||
|
if let Err(err) = self.poll.register(
|
||||||
|
&info.sock,
|
||||||
|
mio::Token(token + 1000),
|
||||||
|
mio::Ready::readable(),
|
||||||
|
mio::PollOpt::edge(),
|
||||||
|
) {
|
||||||
|
error!("Can not register server socket {}", err);
|
||||||
|
} else {
|
||||||
|
info!("Resume accepting connections on {}", info.addr);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info.timeout = Some(inst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_cmd(&mut self) -> bool {
|
||||||
|
loop {
|
||||||
|
match self.rx.try_recv() {
|
||||||
|
Ok(cmd) => match cmd {
|
||||||
|
Command::Pause => {
|
||||||
|
for (_, info) in self.sockets.iter_mut() {
|
||||||
|
if let Err(err) = self.poll.deregister(&info.sock) {
|
||||||
|
error!("Can not deregister server socket {}", err);
|
||||||
|
} else {
|
||||||
|
info!("Paused accepting connections on {}", info.addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Command::Resume => {
|
||||||
|
for (token, info) in self.sockets.iter() {
|
||||||
|
if let Err(err) = self.poll.register(
|
||||||
|
&info.sock,
|
||||||
|
mio::Token(token + 1000),
|
||||||
|
mio::Ready::readable(),
|
||||||
|
mio::PollOpt::edge(),
|
||||||
|
) {
|
||||||
|
error!("Can not resume socket accept process: {}", err);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"Accepting connections on {} has been resumed",
|
||||||
|
info.addr
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Command::Stop => {
|
||||||
|
for (_, info) in self.sockets.iter() {
|
||||||
|
let _ = self.poll.deregister(&info.sock);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Command::Worker(idx, addr) => {
|
||||||
|
self.workers.push((idx, addr));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => match err {
|
||||||
|
sync_mpsc::TryRecvError::Empty => break,
|
||||||
|
sync_mpsc::TryRecvError::Disconnected => {
|
||||||
|
for (_, info) in self.sockets.iter() {
|
||||||
|
let _ = self.poll.deregister(&info.sock);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn accept(&mut self, token: mio::Token) {
|
||||||
|
let token = usize::from(token);
|
||||||
|
if token < 1000 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(info) = self.sockets.get_mut(token - 1000) {
|
||||||
|
loop {
|
||||||
|
match info.sock.accept_std() {
|
||||||
|
Ok((io, addr)) => {
|
||||||
|
let mut msg = Conn {
|
||||||
|
io,
|
||||||
|
token: info.token,
|
||||||
|
peer: Some(addr),
|
||||||
|
http2: false,
|
||||||
|
};
|
||||||
|
while !self.workers.is_empty() {
|
||||||
|
match self.workers[self.next].1.unbounded_send(msg) {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(err) => {
|
||||||
|
let _ = self.srv.unbounded_send(
|
||||||
|
ServerCommand::WorkerDied(
|
||||||
|
self.workers[self.next].0,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
msg = err.into_inner();
|
||||||
|
self.workers.swap_remove(self.next);
|
||||||
|
if self.workers.is_empty() {
|
||||||
|
error!("No workers");
|
||||||
|
thread::sleep(Duration::from_millis(100));
|
||||||
|
break;
|
||||||
|
} else if self.workers.len() <= self.next {
|
||||||
|
self.next = 0;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.next = (self.next + 1) % self.workers.len();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
|
||||||
|
Err(ref e) if connection_error(e) => continue,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error accepting connection: {}", e);
|
||||||
|
if let Err(err) = self.poll.deregister(&info.sock) {
|
||||||
|
error!("Can not deregister server socket {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// sleep after error
|
||||||
|
info.timeout = Some(Instant::now() + Duration::from_millis(500));
|
||||||
|
|
||||||
|
let r = self.timer.1.clone();
|
||||||
|
System::current().arbiter().do_send(Execute::new(
|
||||||
|
move || -> Result<(), ()> {
|
||||||
|
Arbiter::spawn(
|
||||||
|
Delay::new(
|
||||||
|
Instant::now() + Duration::from_millis(510),
|
||||||
|
).map_err(|_| ())
|
||||||
|
.and_then(move |_| {
|
||||||
|
let _ =
|
||||||
|
r.set_readiness(mio::Ready::readable());
|
||||||
|
Ok(())
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -315,10 +315,10 @@ impl IoStream for TlsStream<TcpStream> {
|
|||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
use rustls::{ClientSession, ServerSession};
|
use rustls::{ClientSession, ServerSession};
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
use tokio_rustls::TlsStream;
|
use tokio_rustls::TlsStream as RustlsStream;
|
||||||
|
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
impl IoStream for TlsStream<TcpStream, ClientSession> {
|
impl IoStream for RustlsStream<TcpStream, ClientSession> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
|
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
|
||||||
let _ = <Self as AsyncWrite>::shutdown(self);
|
let _ = <Self as AsyncWrite>::shutdown(self);
|
||||||
@ -337,7 +337,7 @@ impl IoStream for TlsStream<TcpStream, ClientSession> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
impl IoStream for TlsStream<TcpStream, ServerSession> {
|
impl IoStream for RustlsStream<TcpStream, ServerSession> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
|
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
|
||||||
let _ = <Self as AsyncWrite>::shutdown(self);
|
let _ = <Self as AsyncWrite>::shutdown(self);
|
||||||
|
@ -46,14 +46,6 @@ fn configure_alpn(builder: &mut SslAcceptorBuilder) -> io::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
|
|
||||||
fn configure_alpn(builder: &mut Arc<ServerConfig>) -> io::Result<()> {
|
|
||||||
Arc::<ServerConfig>::get_mut(builder)
|
|
||||||
.unwrap()
|
|
||||||
.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An HTTP Server
|
/// An HTTP Server
|
||||||
pub struct HttpServer<H>
|
pub struct HttpServer<H>
|
||||||
where
|
where
|
||||||
@ -68,7 +60,11 @@ where
|
|||||||
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
|
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
|
||||||
workers: Vec<(usize, Addr<Worker<H::Handler>>)>,
|
workers: Vec<(usize, Addr<Worker<H::Handler>>)>,
|
||||||
sockets: Vec<Socket>,
|
sockets: Vec<Socket>,
|
||||||
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
|
accept: Option<(
|
||||||
|
mio::SetReadiness,
|
||||||
|
sync_mpsc::Sender<Command>,
|
||||||
|
Slab<SocketInfo>,
|
||||||
|
)>,
|
||||||
exit: bool,
|
exit: bool,
|
||||||
shutdown_timeout: u16,
|
shutdown_timeout: u16,
|
||||||
signals: Option<Addr<signal::ProcessSignals>>,
|
signals: Option<Addr<signal::ProcessSignals>>,
|
||||||
@ -77,7 +73,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum ServerCommand {
|
pub(crate) enum ServerCommand {
|
||||||
WorkerDied(usize, Slab<SocketInfo>),
|
WorkerDied(usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<H> Actor for HttpServer<H>
|
impl<H> Actor for HttpServer<H>
|
||||||
@ -114,7 +110,7 @@ where
|
|||||||
factory: Arc::new(f),
|
factory: Arc::new(f),
|
||||||
workers: Vec::new(),
|
workers: Vec::new(),
|
||||||
sockets: Vec::new(),
|
sockets: Vec::new(),
|
||||||
accept: Vec::new(),
|
accept: None,
|
||||||
exit: false,
|
exit: false,
|
||||||
shutdown_timeout: 30,
|
shutdown_timeout: 30,
|
||||||
signals: None,
|
signals: None,
|
||||||
@ -280,22 +276,22 @@ where
|
|||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
|
#[cfg(feature = "rust-tls")]
|
||||||
/// Use listener for accepting incoming tls connection requests
|
/// Use listener for accepting incoming tls connection requests
|
||||||
///
|
///
|
||||||
/// This method sets alpn protocols to "h2" and "http/1.1"
|
/// This method sets alpn protocols to "h2" and "http/1.1"
|
||||||
pub fn listen_ssl(
|
pub fn listen_rustls(
|
||||||
mut self, lst: net::TcpListener, mut builder: Arc<ServerConfig>,
|
mut self, lst: net::TcpListener, mut builder: ServerConfig,
|
||||||
) -> io::Result<Self> {
|
) -> io::Result<Self> {
|
||||||
// alpn support
|
// alpn support
|
||||||
if !self.no_http2 {
|
if !self.no_http2 {
|
||||||
configure_alpn(&mut builder)?;
|
builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
|
||||||
}
|
}
|
||||||
let addr = lst.local_addr().unwrap();
|
let addr = lst.local_addr().unwrap();
|
||||||
self.sockets.push(Socket {
|
self.sockets.push(Socket {
|
||||||
addr,
|
addr,
|
||||||
lst,
|
lst,
|
||||||
tp: StreamHandlerType::Rustls(builder.clone()),
|
tp: StreamHandlerType::Rustls(Arc::new(builder)),
|
||||||
});
|
});
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
@ -378,20 +374,21 @@ where
|
|||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
|
#[cfg(feature = "rust-tls")]
|
||||||
/// Start listening for incoming tls connections.
|
/// Start listening for incoming tls connections.
|
||||||
///
|
///
|
||||||
/// This method sets alpn protocols to "h2" and "http/1.1"
|
/// This method sets alpn protocols to "h2" and "http/1.1"
|
||||||
pub fn bind_ssl<S: net::ToSocketAddrs>(
|
pub fn bind_rustls<S: net::ToSocketAddrs>(
|
||||||
mut self, addr: S, mut builder: Arc<ServerConfig>,
|
mut self, addr: S, mut builder: ServerConfig,
|
||||||
) -> io::Result<Self> {
|
) -> io::Result<Self> {
|
||||||
// alpn support
|
// alpn support
|
||||||
if !self.no_http2 {
|
if !self.no_http2 {
|
||||||
configure_alpn(&mut builder)?;
|
builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let builder = Arc::new(builder);
|
||||||
let sockets = self.bind2(addr)?;
|
let sockets = self.bind2(addr)?;
|
||||||
self.sockets.extend(sockets.into_iter().map(|mut s| {
|
self.sockets.extend(sockets.into_iter().map(move |mut s| {
|
||||||
s.tp = StreamHandlerType::Rustls(builder.clone());
|
s.tp = StreamHandlerType::Rustls(builder.clone());
|
||||||
s
|
s
|
||||||
}));
|
}));
|
||||||
@ -487,17 +484,12 @@ impl<H: IntoHttpHandler> HttpServer<H> {
|
|||||||
let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false);
|
let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false);
|
||||||
let workers = self.start_workers(&settings, &socks);
|
let workers = self.start_workers(&settings, &socks);
|
||||||
|
|
||||||
// start acceptors threads
|
// start accept thread
|
||||||
for (token, sock) in addrs {
|
for (_, sock) in &addrs {
|
||||||
info!("Starting server on http://{}", sock.addr);
|
info!("Starting server on http://{}", sock.addr);
|
||||||
self.accept.push(start_accept_thread(
|
|
||||||
token,
|
|
||||||
sock,
|
|
||||||
tx.clone(),
|
|
||||||
socks.clone(),
|
|
||||||
workers.clone(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
let (r, cmd) = start_accept_thread(addrs, tx.clone(), workers.clone());
|
||||||
|
self.accept = Some((r, cmd, socks));
|
||||||
|
|
||||||
// start http server actor
|
// start http server actor
|
||||||
let signals = self.subscribe_to_signals();
|
let signals = self.subscribe_to_signals();
|
||||||
@ -672,7 +664,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
|
|||||||
|
|
||||||
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
|
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
|
||||||
match msg {
|
match msg {
|
||||||
ServerCommand::WorkerDied(idx, socks) => {
|
ServerCommand::WorkerDied(idx) => {
|
||||||
let mut found = false;
|
let mut found = false;
|
||||||
for i in 0..self.workers.len() {
|
for i in 0..self.workers.len() {
|
||||||
if self.workers[i].0 == idx {
|
if self.workers[i].0 == idx {
|
||||||
@ -700,6 +692,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
|
|||||||
let ka = self.keep_alive;
|
let ka = self.keep_alive;
|
||||||
let factory = Arc::clone(&self.factory);
|
let factory = Arc::clone(&self.factory);
|
||||||
let host = self.host.clone();
|
let host = self.host.clone();
|
||||||
|
let socks = self.accept.as_ref().unwrap().2.clone();
|
||||||
let addr = socks[0].addr;
|
let addr = socks[0].addr;
|
||||||
|
|
||||||
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
|
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
|
||||||
@ -709,7 +702,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
|
|||||||
ctx.add_message_stream(rx);
|
ctx.add_message_stream(rx);
|
||||||
Worker::new(apps, socks, ka, settings)
|
Worker::new(apps, socks, ka, settings)
|
||||||
});
|
});
|
||||||
for item in &self.accept {
|
if let Some(ref item) = &self.accept {
|
||||||
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
|
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
|
||||||
let _ = item.0.set_readiness(mio::Ready::readable());
|
let _ = item.0.set_readiness(mio::Ready::readable());
|
||||||
}
|
}
|
||||||
|
65
src/test.rs
65
src/test.rs
@ -15,10 +15,10 @@ use tokio::runtime::current_thread::Runtime;
|
|||||||
|
|
||||||
#[cfg(feature = "alpn")]
|
#[cfg(feature = "alpn")]
|
||||||
use openssl::ssl::SslAcceptorBuilder;
|
use openssl::ssl::SslAcceptorBuilder;
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(all(feature = "rust-tls"))]
|
||||||
use rustls::ServerConfig;
|
use rustls::ServerConfig;
|
||||||
#[cfg(feature = "rust-tls")]
|
//#[cfg(all(feature = "rust-tls"))]
|
||||||
use std::sync::Arc;
|
//use std::sync::Arc;
|
||||||
|
|
||||||
use application::{App, HttpApplication};
|
use application::{App, HttpApplication};
|
||||||
use body::Binary;
|
use body::Binary;
|
||||||
@ -144,7 +144,7 @@ impl TestServer {
|
|||||||
builder.set_verify(SslVerifyMode::NONE);
|
builder.set_verify(SslVerifyMode::NONE);
|
||||||
ClientConnector::with_connector(builder.build()).start()
|
ClientConnector::with_connector(builder.build()).start()
|
||||||
}
|
}
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
|
||||||
{
|
{
|
||||||
use rustls::ClientConfig;
|
use rustls::ClientConfig;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@ -264,7 +264,7 @@ pub struct TestServerBuilder<S> {
|
|||||||
#[cfg(feature = "alpn")]
|
#[cfg(feature = "alpn")]
|
||||||
ssl: Option<SslAcceptorBuilder>,
|
ssl: Option<SslAcceptorBuilder>,
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
ssl: Option<Arc<ServerConfig>>,
|
rust_ssl: Option<ServerConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: 'static> TestServerBuilder<S> {
|
impl<S: 'static> TestServerBuilder<S> {
|
||||||
@ -275,8 +275,10 @@ impl<S: 'static> TestServerBuilder<S> {
|
|||||||
{
|
{
|
||||||
TestServerBuilder {
|
TestServerBuilder {
|
||||||
state: Box::new(state),
|
state: Box::new(state),
|
||||||
#[cfg(any(feature = "alpn", feature = "rust-tls"))]
|
#[cfg(feature = "alpn")]
|
||||||
ssl: None,
|
ssl: None,
|
||||||
|
#[cfg(feature = "rust-tls")]
|
||||||
|
rust_ssl: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -288,9 +290,9 @@ impl<S: 'static> TestServerBuilder<S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
/// Create ssl server
|
/// Create rust tls server
|
||||||
pub fn ssl(mut self, ssl: Arc<ServerConfig>) -> Self {
|
pub fn rustls(mut self, ssl: ServerConfig) -> Self {
|
||||||
self.ssl = Some(ssl);
|
self.rust_ssl = Some(ssl);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,41 +304,56 @@ impl<S: 'static> TestServerBuilder<S> {
|
|||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
#[cfg(any(feature = "alpn", feature = "rust-tls"))]
|
let mut has_ssl = false;
|
||||||
let ssl = self.ssl.is_some();
|
|
||||||
#[cfg(not(any(feature = "alpn", feature = "rust-tls")))]
|
#[cfg(feature = "alpn")]
|
||||||
let ssl = false;
|
{
|
||||||
|
has_ssl = has_ssl || self.ssl.is_some();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "rust-tls")]
|
||||||
|
{
|
||||||
|
has_ssl = has_ssl || self.rust_ssl.is_some();
|
||||||
|
}
|
||||||
|
|
||||||
// run server in separate thread
|
// run server in separate thread
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
let addr = TestServer::unused_addr();
|
||||||
let local_addr = tcp.local_addr().unwrap();
|
|
||||||
|
|
||||||
let sys = System::new("actix-test-server");
|
let sys = System::new("actix-test-server");
|
||||||
let state = self.state;
|
let state = self.state;
|
||||||
let srv = HttpServer::new(move || {
|
let mut srv = HttpServer::new(move || {
|
||||||
let mut app = TestApp::new(state());
|
let mut app = TestApp::new(state());
|
||||||
config(&mut app);
|
config(&mut app);
|
||||||
vec![app]
|
vec![app]
|
||||||
}).workers(1)
|
}).workers(1)
|
||||||
.disable_signals();
|
.disable_signals();
|
||||||
|
|
||||||
tx.send((System::current(), local_addr, TestServer::get_conn()))
|
tx.send((System::current(), addr, TestServer::get_conn()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
#[cfg(any(feature = "alpn", feature = "rust-tls"))]
|
#[cfg(feature = "alpn")]
|
||||||
{
|
{
|
||||||
let ssl = self.ssl.take();
|
let ssl = self.ssl.take();
|
||||||
if let Some(ssl) = ssl {
|
if let Some(ssl) = ssl {
|
||||||
srv.listen_ssl(tcp, ssl).unwrap().start();
|
let tcp = net::TcpListener::bind(addr).unwrap();
|
||||||
} else {
|
srv = srv.listen_ssl(tcp, ssl).unwrap();
|
||||||
srv.listen(tcp).start();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(not(any(feature = "alpn", feature = "rust-tls")))]
|
#[cfg(feature = "rust-tls")]
|
||||||
{
|
{
|
||||||
srv.listen(tcp).start();
|
let ssl = self.rust_ssl.take();
|
||||||
|
if let Some(ssl) = ssl {
|
||||||
|
let tcp = net::TcpListener::bind(addr).unwrap();
|
||||||
|
srv = srv.listen_rustls(tcp, ssl).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if !has_ssl {
|
||||||
|
let tcp = net::TcpListener::bind(addr).unwrap();
|
||||||
|
srv = srv.listen(tcp);
|
||||||
|
}
|
||||||
|
srv.start();
|
||||||
|
|
||||||
sys.run();
|
sys.run();
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -344,8 +361,8 @@ impl<S: 'static> TestServerBuilder<S> {
|
|||||||
System::set_current(system);
|
System::set_current(system);
|
||||||
TestServer {
|
TestServer {
|
||||||
addr,
|
addr,
|
||||||
ssl,
|
|
||||||
conn,
|
conn,
|
||||||
|
ssl: has_ssl,
|
||||||
rt: Runtime::new().unwrap(),
|
rt: Runtime::new().unwrap(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -153,6 +153,62 @@ fn test_shutdown() {
|
|||||||
let _ = sys.stop();
|
let _ = sys.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(unix)]
|
||||||
|
fn test_panic() {
|
||||||
|
let _ = test::TestServer::unused_addr();
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
thread::spawn(|| {
|
||||||
|
System::run(move || {
|
||||||
|
let srv = server::new(|| {
|
||||||
|
App::new()
|
||||||
|
.resource("/panic", |r| {
|
||||||
|
r.method(http::Method::GET).f(|_| -> &'static str {
|
||||||
|
panic!("error");
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.resource("/", |r| {
|
||||||
|
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
|
||||||
|
})
|
||||||
|
}).workers(1);
|
||||||
|
|
||||||
|
let srv = srv.bind("127.0.0.1:0").unwrap();
|
||||||
|
let addr = srv.addrs()[0];
|
||||||
|
srv.start();
|
||||||
|
let _ = tx.send((addr, System::current()));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
let (addr, sys) = rx.recv().unwrap();
|
||||||
|
System::set_current(sys.clone());
|
||||||
|
|
||||||
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
{
|
||||||
|
let req = client::ClientRequest::get(format!("http://{}/panic", addr).as_str())
|
||||||
|
.finish()
|
||||||
|
.unwrap();
|
||||||
|
let response = rt.block_on(req.send());
|
||||||
|
assert!(response.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
|
||||||
|
.finish()
|
||||||
|
.unwrap();
|
||||||
|
let response = rt.block_on(req.send());
|
||||||
|
assert!(response.is_err());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
|
||||||
|
.finish()
|
||||||
|
.unwrap();
|
||||||
|
let response = rt.block_on(req.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = sys.stop();
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_simple() {
|
fn test_simple() {
|
||||||
let mut srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok()));
|
let mut srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok()));
|
||||||
|
@ -317,13 +317,12 @@ fn test_ws_server_ssl() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[cfg(feature = "rust-tls")]
|
#[cfg(feature = "rust-tls")]
|
||||||
fn test_ws_server_ssl() {
|
fn test_ws_server_rust_tls() {
|
||||||
extern crate rustls;
|
extern crate rustls;
|
||||||
use rustls::{ServerConfig, NoClientAuth};
|
|
||||||
use rustls::internal::pemfile::{certs, rsa_private_keys};
|
use rustls::internal::pemfile::{certs, rsa_private_keys};
|
||||||
use std::io::BufReader;
|
use rustls::{NoClientAuth, ServerConfig};
|
||||||
use std::sync::Arc;
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
|
use std::io::BufReader;
|
||||||
|
|
||||||
// load ssl keys
|
// load ssl keys
|
||||||
let mut config = ServerConfig::new(NoClientAuth::new());
|
let mut config = ServerConfig::new(NoClientAuth::new());
|
||||||
@ -333,7 +332,7 @@ fn test_ws_server_ssl() {
|
|||||||
let mut keys = rsa_private_keys(key_file).unwrap();
|
let mut keys = rsa_private_keys(key_file).unwrap();
|
||||||
config.set_single_cert(cert_chain, keys.remove(0)).unwrap();
|
config.set_single_cert(cert_chain, keys.remove(0)).unwrap();
|
||||||
|
|
||||||
let mut srv = test::TestServer::build().ssl(Arc::new(config)).start(|app| {
|
let mut srv = test::TestServer::build().rustls(config).start(|app| {
|
||||||
app.handler(|req| {
|
app.handler(|req| {
|
||||||
ws::start(
|
ws::start(
|
||||||
req,
|
req,
|
||||||
|
Loading…
Reference in New Issue
Block a user