1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-30 18:44:35 +01:00

Merge branch 'master' of https://github.com/actix/actix-web into feat/432

This commit is contained in:
jrconlin 2018-08-01 09:59:36 -07:00
commit 115f59dd14
No known key found for this signature in database
GPG Key ID: 91B7F708D9FC4D84
9 changed files with 410 additions and 247 deletions

View File

@ -32,12 +32,12 @@ script:
- | - |
if [[ "$TRAVIS_RUST_VERSION" != "stable" ]]; then if [[ "$TRAVIS_RUST_VERSION" != "stable" ]]; then
cargo clean cargo clean
cargo test --features="alpn,tls" -- --nocapture cargo test --features="alpn,tls,rust-tls" -- --nocapture
fi fi
- | - |
if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then
RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install -f cargo-tarpaulin RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install -f cargo-tarpaulin
cargo tarpaulin --features="alpn,tls" --out Xml --no-count cargo tarpaulin --features="alpn,tls,rust-tls" --out Xml --no-count
bash <(curl -s https://codecov.io/bash) bash <(curl -s https://codecov.io/bash)
echo "Uploaded code coverage" echo "Uploaded code coverage"
fi fi
@ -46,7 +46,7 @@ script:
after_success: after_success:
- | - |
if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then
cargo doc --features "alpn, tls, session" --no-deps && cargo doc --features "alpn, tls, rust-tls, session" --no-deps &&
echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html && echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html &&
git clone https://github.com/davisp/ghp-import.git && git clone https://github.com/davisp/ghp-import.git &&
./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc && ./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc &&

View File

@ -83,7 +83,7 @@ cookie = { version="0.11", features=["percent-encode"] }
brotli2 = { version="^0.3.2", optional = true } brotli2 = { version="^0.3.2", optional = true }
flate2 = { version="^1.0.2", optional = true, default-features = false } flate2 = { version="^1.0.2", optional = true, default-features = false }
failure = "=0.1.1" failure = "^0.1.2"
# io # io
mio = "^0.6.13" mio = "^0.6.13"

View File

@ -52,7 +52,8 @@ pub struct Error {
impl Error { impl Error {
/// Deprecated way to reference the underlying response error. /// Deprecated way to reference the underlying response error.
#[deprecated( #[deprecated(
since = "0.6.0", note = "please use `Error::as_response_error()` instead" since = "0.6.0",
note = "please use `Error::as_response_error()` instead"
)] )]
pub fn cause(&self) -> &ResponseError { pub fn cause(&self) -> &ResponseError {
self.cause.as_ref() self.cause.as_ref()
@ -97,21 +98,9 @@ impl Error {
// //
// So we first downcast into that compat, to then further downcast through // So we first downcast into that compat, to then further downcast through
// the failure's Error downcasting system into the original failure. // the failure's Error downcasting system into the original failure.
//
// This currently requires a transmute. This could be avoided if failure
// provides a deref: https://github.com/rust-lang-nursery/failure/pull/213
let compat: Option<&failure::Compat<failure::Error>> = let compat: Option<&failure::Compat<failure::Error>> =
Fail::downcast_ref(self.cause.as_fail()); Fail::downcast_ref(self.cause.as_fail());
if let Some(compat) = compat { compat.and_then(|e| e.get_ref().downcast_ref())
pub struct CompatWrappedError {
error: failure::Error,
}
let compat: &CompatWrappedError =
unsafe { &*(compat as *const _ as *const CompatWrappedError) };
compat.error.downcast_ref()
} else {
None
}
} }
} }

View File

@ -1,22 +1,16 @@
use std::sync::mpsc as sync_mpsc; use std::sync::mpsc as sync_mpsc;
use std::time::Duration; use std::time::{Duration, Instant};
use std::{io, net, thread}; use std::{io, net, thread};
use futures::sync::mpsc; use futures::{sync::mpsc, Future};
use mio; use mio;
use slab::Slab; use slab::Slab;
use tokio_timer::Delay;
#[cfg(feature = "tls")] use actix::{msgs::Execute, Arbiter, System};
use native_tls::TlsAcceptor;
#[cfg(feature = "alpn")]
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
#[cfg(feature = "rust-tls")]
use rustls::ServerConfig;
use super::srv::{ServerCommand, Socket}; use super::srv::{ServerCommand, Socket};
use super::worker::{Conn, SocketInfo}; use super::worker::Conn;
pub(crate) enum Command { pub(crate) enum Command {
Pause, Pause,
@ -25,169 +19,43 @@ pub(crate) enum Command {
Worker(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>), Worker(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>),
} }
struct ServerSocketInfo {
addr: net::SocketAddr,
token: usize,
sock: mio::net::TcpListener,
timeout: Option<Instant>,
}
struct Accept {
poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>,
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
_reg: mio::Registration,
next: usize,
srv: mpsc::UnboundedSender<ServerCommand>,
timer: (mio::Registration, mio::SetReadiness),
}
const CMD: mio::Token = mio::Token(0);
const TIMER: mio::Token = mio::Token(1);
pub(crate) fn start_accept_thread( pub(crate) fn start_accept_thread(
token: usize, sock: Socket, srv: mpsc::UnboundedSender<ServerCommand>, socks: Vec<(usize, Socket)>, srv: mpsc::UnboundedSender<ServerCommand>,
socks: Slab<SocketInfo>, workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
mut workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
) -> (mio::SetReadiness, sync_mpsc::Sender<Command>) { ) -> (mio::SetReadiness, sync_mpsc::Sender<Command>) {
let (tx, rx) = sync_mpsc::channel(); let (tx, rx) = sync_mpsc::channel();
let (reg, readiness) = mio::Registration::new2(); let (reg, readiness) = mio::Registration::new2();
let sys = System::current();
// start accept thread // start accept thread
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
let _ = thread::Builder::new() let _ = thread::Builder::new()
.name(format!("Accept on {}", sock.addr)) .name("actix-web accept loop".to_owned())
.spawn(move || { .spawn(move || {
const SRV: mio::Token = mio::Token(0); System::set_current(sys);
const CMD: mio::Token = mio::Token(1); Accept::new(reg, rx, socks, workers, srv).poll();
let addr = sock.addr;
let mut server = Some(
mio::net::TcpListener::from_std(sock.lst)
.expect("Can not create mio::net::TcpListener"),
);
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Start listening for incoming connections
if let Some(ref srv) = server {
if let Err(err) =
poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register io: {}", err);
}
}
// Start listening for incoming commands
if let Err(err) =
poll.register(&reg, CMD, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
// Create storage for events
let mut events = mio::Events::with_capacity(128);
// Sleep on error
let sleep = Duration::from_millis(100);
let mut next = 0;
loop {
if let Err(err) = poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
for event in events.iter() {
match event.token() {
SRV => if let Some(ref server) = server {
loop {
match server.accept_std() {
Ok((io, addr)) => {
let mut msg = Conn {
io,
token,
peer: Some(addr),
http2: false,
};
while !workers.is_empty() {
match workers[next].1.unbounded_send(msg) {
Ok(_) => (),
Err(err) => {
let _ = srv.unbounded_send(
ServerCommand::WorkerDied(
workers[next].0,
socks.clone(),
),
);
msg = err.into_inner();
workers.swap_remove(next);
if workers.is_empty() {
error!("No workers");
thread::sleep(sleep);
break;
} else if workers.len() <= next {
next = 0;
}
continue;
}
}
next = (next + 1) % workers.len();
break;
}
}
Err(ref e)
if e.kind() == io::ErrorKind::WouldBlock =>
{
break
}
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
// sleep after error
thread::sleep(sleep);
break;
}
}
}
},
CMD => match rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => if let Some(ref server) = server {
if let Err(err) = poll.deregister(server) {
error!(
"Can not deregister server socket {}",
err
);
} else {
info!(
"Paused accepting connections on {}",
addr
);
}
},
Command::Resume => {
if let Some(ref server) = server {
if let Err(err) = poll.register(
server,
SRV,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed",
addr);
}
}
}
Command::Stop => {
if let Some(server) = server.take() {
let _ = poll.deregister(&server);
}
return;
}
Command::Worker(idx, addr) => {
workers.push((idx, addr));
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => (),
sync_mpsc::TryRecvError::Disconnected => {
if let Some(server) = server.take() {
let _ = poll.deregister(&server);
}
return;
}
},
},
_ => unreachable!(),
}
}
}
}); });
(readiness, tx) (readiness, tx)
@ -205,3 +73,244 @@ fn connection_error(e: &io::Error) -> bool {
|| e.kind() == io::ErrorKind::ConnectionAborted || e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset || e.kind() == io::ErrorKind::ConnectionReset
} }
impl Accept {
fn new(
_reg: mio::Registration, rx: sync_mpsc::Receiver<Command>,
socks: Vec<(usize, Socket)>,
workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
srv: mpsc::UnboundedSender<ServerCommand>,
) -> Accept {
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Start listening for incoming commands
if let Err(err) =
poll.register(&_reg, CMD, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
// Start accept
let mut sockets = Slab::new();
for (stoken, sock) in socks {
let server = mio::net::TcpListener::from_std(sock.lst)
.expect("Can not create mio::net::TcpListener");
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
mio::Token(token + 1000),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
}
entry.insert(ServerSocketInfo {
token: stoken,
addr: sock.addr,
sock: server,
timeout: None,
});
}
// Timer
let (tm, tmr) = mio::Registration::new2();
if let Err(err) =
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
Accept {
poll,
rx,
_reg,
sockets,
workers,
srv,
next: 0,
timer: (tm, tmr),
}
}
fn poll(&mut self) {
// Create storage for events
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(err) = self.poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
for event in events.iter() {
let token = event.token();
match token {
CMD => if !self.process_cmd() {
return;
},
TIMER => self.process_timer(),
_ => self.accept(token),
}
}
}
}
fn process_timer(&mut self) {
let now = Instant::now();
for (token, info) in self.sockets.iter_mut() {
if let Some(inst) = info.timeout.take() {
if now > inst {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + 1000),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
} else {
info.timeout = Some(inst);
}
}
}
}
fn process_cmd(&mut self) -> bool {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => {
for (_, info) in self.sockets.iter_mut() {
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
}
Command::Resume => {
for (token, info) in self.sockets.iter() {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + 1000),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
Command::Stop => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
Command::Worker(idx, addr) => {
self.workers.push((idx, addr));
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => break,
sync_mpsc::TryRecvError::Disconnected => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
},
}
}
true
}
fn accept(&mut self, token: mio::Token) {
let token = usize::from(token);
if token < 1000 {
return;
}
if let Some(info) = self.sockets.get_mut(token - 1000) {
loop {
match info.sock.accept_std() {
Ok((io, addr)) => {
let mut msg = Conn {
io,
token: info.token,
peer: Some(addr),
http2: false,
};
while !self.workers.is_empty() {
match self.workers[self.next].1.unbounded_send(msg) {
Ok(_) => (),
Err(err) => {
let _ = self.srv.unbounded_send(
ServerCommand::WorkerDied(
self.workers[self.next].0,
),
);
msg = err.into_inner();
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
thread::sleep(Duration::from_millis(100));
break;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
self.next = (self.next + 1) % self.workers.len();
break;
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
}
// sleep after error
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone();
System::current().arbiter().do_send(Execute::new(
move || -> Result<(), ()> {
Arbiter::spawn(
Delay::new(
Instant::now() + Duration::from_millis(510),
).map_err(|_| ())
.and_then(move |_| {
let _ =
r.set_readiness(mio::Ready::readable());
Ok(())
}),
);
Ok(())
},
));
break;
}
}
}
}
}
}

View File

@ -315,10 +315,10 @@ impl IoStream for TlsStream<TcpStream> {
#[cfg(feature = "rust-tls")] #[cfg(feature = "rust-tls")]
use rustls::{ClientSession, ServerSession}; use rustls::{ClientSession, ServerSession};
#[cfg(feature = "rust-tls")] #[cfg(feature = "rust-tls")]
use tokio_rustls::TlsStream; use tokio_rustls::TlsStream as RustlsStream;
#[cfg(feature = "rust-tls")] #[cfg(feature = "rust-tls")]
impl IoStream for TlsStream<TcpStream, ClientSession> { impl IoStream for RustlsStream<TcpStream, ClientSession> {
#[inline] #[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = <Self as AsyncWrite>::shutdown(self); let _ = <Self as AsyncWrite>::shutdown(self);
@ -337,7 +337,7 @@ impl IoStream for TlsStream<TcpStream, ClientSession> {
} }
#[cfg(feature = "rust-tls")] #[cfg(feature = "rust-tls")]
impl IoStream for TlsStream<TcpStream, ServerSession> { impl IoStream for RustlsStream<TcpStream, ServerSession> {
#[inline] #[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = <Self as AsyncWrite>::shutdown(self); let _ = <Self as AsyncWrite>::shutdown(self);

View File

@ -46,14 +46,6 @@ fn configure_alpn(builder: &mut SslAcceptorBuilder) -> io::Result<()> {
Ok(()) Ok(())
} }
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
fn configure_alpn(builder: &mut Arc<ServerConfig>) -> io::Result<()> {
Arc::<ServerConfig>::get_mut(builder)
.unwrap()
.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
Ok(())
}
/// An HTTP Server /// An HTTP Server
pub struct HttpServer<H> pub struct HttpServer<H>
where where
@ -68,7 +60,11 @@ where
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Worker<H::Handler>>)>, workers: Vec<(usize, Addr<Worker<H::Handler>>)>,
sockets: Vec<Socket>, sockets: Vec<Socket>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>, accept: Option<(
mio::SetReadiness,
sync_mpsc::Sender<Command>,
Slab<SocketInfo>,
)>,
exit: bool, exit: bool,
shutdown_timeout: u16, shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>, signals: Option<Addr<signal::ProcessSignals>>,
@ -77,7 +73,7 @@ where
} }
pub(crate) enum ServerCommand { pub(crate) enum ServerCommand {
WorkerDied(usize, Slab<SocketInfo>), WorkerDied(usize),
} }
impl<H> Actor for HttpServer<H> impl<H> Actor for HttpServer<H>
@ -114,7 +110,7 @@ where
factory: Arc::new(f), factory: Arc::new(f),
workers: Vec::new(), workers: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
accept: Vec::new(), accept: None,
exit: false, exit: false,
shutdown_timeout: 30, shutdown_timeout: 30,
signals: None, signals: None,
@ -280,22 +276,22 @@ where
Ok(self) Ok(self)
} }
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))] #[cfg(feature = "rust-tls")]
/// Use listener for accepting incoming tls connection requests /// Use listener for accepting incoming tls connection requests
/// ///
/// This method sets alpn protocols to "h2" and "http/1.1" /// This method sets alpn protocols to "h2" and "http/1.1"
pub fn listen_ssl( pub fn listen_rustls(
mut self, lst: net::TcpListener, mut builder: Arc<ServerConfig>, mut self, lst: net::TcpListener, mut builder: ServerConfig,
) -> io::Result<Self> { ) -> io::Result<Self> {
// alpn support // alpn support
if !self.no_http2 { if !self.no_http2 {
configure_alpn(&mut builder)?; builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
} }
let addr = lst.local_addr().unwrap(); let addr = lst.local_addr().unwrap();
self.sockets.push(Socket { self.sockets.push(Socket {
addr, addr,
lst, lst,
tp: StreamHandlerType::Rustls(builder.clone()), tp: StreamHandlerType::Rustls(Arc::new(builder)),
}); });
Ok(self) Ok(self)
} }
@ -378,20 +374,21 @@ where
Ok(self) Ok(self)
} }
#[cfg(all(feature = "rust-tls", not(feature = "alpn")))] #[cfg(feature = "rust-tls")]
/// Start listening for incoming tls connections. /// Start listening for incoming tls connections.
/// ///
/// This method sets alpn protocols to "h2" and "http/1.1" /// This method sets alpn protocols to "h2" and "http/1.1"
pub fn bind_ssl<S: net::ToSocketAddrs>( pub fn bind_rustls<S: net::ToSocketAddrs>(
mut self, addr: S, mut builder: Arc<ServerConfig>, mut self, addr: S, mut builder: ServerConfig,
) -> io::Result<Self> { ) -> io::Result<Self> {
// alpn support // alpn support
if !self.no_http2 { if !self.no_http2 {
configure_alpn(&mut builder)?; builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
} }
let builder = Arc::new(builder);
let sockets = self.bind2(addr)?; let sockets = self.bind2(addr)?;
self.sockets.extend(sockets.into_iter().map(|mut s| { self.sockets.extend(sockets.into_iter().map(move |mut s| {
s.tp = StreamHandlerType::Rustls(builder.clone()); s.tp = StreamHandlerType::Rustls(builder.clone());
s s
})); }));
@ -487,17 +484,12 @@ impl<H: IntoHttpHandler> HttpServer<H> {
let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false); let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false);
let workers = self.start_workers(&settings, &socks); let workers = self.start_workers(&settings, &socks);
// start acceptors threads // start accept thread
for (token, sock) in addrs { for (_, sock) in &addrs {
info!("Starting server on http://{}", sock.addr); info!("Starting server on http://{}", sock.addr);
self.accept.push(start_accept_thread(
token,
sock,
tx.clone(),
socks.clone(),
workers.clone(),
));
} }
let (r, cmd) = start_accept_thread(addrs, tx.clone(), workers.clone());
self.accept = Some((r, cmd, socks));
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
@ -672,7 +664,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) { fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg { match msg {
ServerCommand::WorkerDied(idx, socks) => { ServerCommand::WorkerDied(idx) => {
let mut found = false; let mut found = false;
for i in 0..self.workers.len() { for i in 0..self.workers.len() {
if self.workers[i].0 == idx { if self.workers[i].0 == idx {
@ -700,6 +692,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
let ka = self.keep_alive; let ka = self.keep_alive;
let factory = Arc::clone(&self.factory); let factory = Arc::clone(&self.factory);
let host = self.host.clone(); let host = self.host.clone();
let socks = self.accept.as_ref().unwrap().2.clone();
let addr = socks[0].addr; let addr = socks[0].addr;
let addr = Arbiter::start(move |ctx: &mut Context<_>| { let addr = Arbiter::start(move |ctx: &mut Context<_>| {
@ -709,7 +702,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
ctx.add_message_stream(rx); ctx.add_message_stream(rx);
Worker::new(apps, socks, ka, settings) Worker::new(apps, socks, ka, settings)
}); });
for item in &self.accept { if let Some(ref item) = &self.accept {
let _ = item.1.send(Command::Worker(new_idx, tx.clone())); let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
let _ = item.0.set_readiness(mio::Ready::readable()); let _ = item.0.set_readiness(mio::Ready::readable());
} }

View File

@ -15,10 +15,10 @@ use tokio::runtime::current_thread::Runtime;
#[cfg(feature = "alpn")] #[cfg(feature = "alpn")]
use openssl::ssl::SslAcceptorBuilder; use openssl::ssl::SslAcceptorBuilder;
#[cfg(feature = "rust-tls")] #[cfg(all(feature = "rust-tls"))]
use rustls::ServerConfig; use rustls::ServerConfig;
#[cfg(feature = "rust-tls")] //#[cfg(all(feature = "rust-tls"))]
use std::sync::Arc; //use std::sync::Arc;
use application::{App, HttpApplication}; use application::{App, HttpApplication};
use body::Binary; use body::Binary;
@ -144,7 +144,7 @@ impl TestServer {
builder.set_verify(SslVerifyMode::NONE); builder.set_verify(SslVerifyMode::NONE);
ClientConnector::with_connector(builder.build()).start() ClientConnector::with_connector(builder.build()).start()
} }
#[cfg(feature = "rust-tls")] #[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
{ {
use rustls::ClientConfig; use rustls::ClientConfig;
use std::fs::File; use std::fs::File;
@ -264,7 +264,7 @@ pub struct TestServerBuilder<S> {
#[cfg(feature = "alpn")] #[cfg(feature = "alpn")]
ssl: Option<SslAcceptorBuilder>, ssl: Option<SslAcceptorBuilder>,
#[cfg(feature = "rust-tls")] #[cfg(feature = "rust-tls")]
ssl: Option<Arc<ServerConfig>>, rust_ssl: Option<ServerConfig>,
} }
impl<S: 'static> TestServerBuilder<S> { impl<S: 'static> TestServerBuilder<S> {
@ -275,8 +275,10 @@ impl<S: 'static> TestServerBuilder<S> {
{ {
TestServerBuilder { TestServerBuilder {
state: Box::new(state), state: Box::new(state),
#[cfg(any(feature = "alpn", feature = "rust-tls"))] #[cfg(feature = "alpn")]
ssl: None, ssl: None,
#[cfg(feature = "rust-tls")]
rust_ssl: None,
} }
} }
@ -288,9 +290,9 @@ impl<S: 'static> TestServerBuilder<S> {
} }
#[cfg(feature = "rust-tls")] #[cfg(feature = "rust-tls")]
/// Create ssl server /// Create rust tls server
pub fn ssl(mut self, ssl: Arc<ServerConfig>) -> Self { pub fn rustls(mut self, ssl: ServerConfig) -> Self {
self.ssl = Some(ssl); self.rust_ssl = Some(ssl);
self self
} }
@ -302,41 +304,56 @@ impl<S: 'static> TestServerBuilder<S> {
{ {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
#[cfg(any(feature = "alpn", feature = "rust-tls"))] let mut has_ssl = false;
let ssl = self.ssl.is_some();
#[cfg(not(any(feature = "alpn", feature = "rust-tls")))] #[cfg(feature = "alpn")]
let ssl = false; {
has_ssl = has_ssl || self.ssl.is_some();
}
#[cfg(feature = "rust-tls")]
{
has_ssl = has_ssl || self.rust_ssl.is_some();
}
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let addr = TestServer::unused_addr();
let local_addr = tcp.local_addr().unwrap();
let sys = System::new("actix-test-server"); let sys = System::new("actix-test-server");
let state = self.state; let state = self.state;
let srv = HttpServer::new(move || { let mut srv = HttpServer::new(move || {
let mut app = TestApp::new(state()); let mut app = TestApp::new(state());
config(&mut app); config(&mut app);
vec![app] vec![app]
}).workers(1) }).workers(1)
.disable_signals(); .disable_signals();
tx.send((System::current(), local_addr, TestServer::get_conn())) tx.send((System::current(), addr, TestServer::get_conn()))
.unwrap(); .unwrap();
#[cfg(any(feature = "alpn", feature = "rust-tls"))] #[cfg(feature = "alpn")]
{ {
let ssl = self.ssl.take(); let ssl = self.ssl.take();
if let Some(ssl) = ssl { if let Some(ssl) = ssl {
srv.listen_ssl(tcp, ssl).unwrap().start(); let tcp = net::TcpListener::bind(addr).unwrap();
} else { srv = srv.listen_ssl(tcp, ssl).unwrap();
srv.listen(tcp).start();
} }
} }
#[cfg(not(any(feature = "alpn", feature = "rust-tls")))] #[cfg(feature = "rust-tls")]
{ {
srv.listen(tcp).start(); let ssl = self.rust_ssl.take();
if let Some(ssl) = ssl {
let tcp = net::TcpListener::bind(addr).unwrap();
srv = srv.listen_rustls(tcp, ssl).unwrap();
}
} }
if !has_ssl {
let tcp = net::TcpListener::bind(addr).unwrap();
srv = srv.listen(tcp);
}
srv.start();
sys.run(); sys.run();
}); });
@ -344,8 +361,8 @@ impl<S: 'static> TestServerBuilder<S> {
System::set_current(system); System::set_current(system);
TestServer { TestServer {
addr, addr,
ssl,
conn, conn,
ssl: has_ssl,
rt: Runtime::new().unwrap(), rt: Runtime::new().unwrap(),
} }
} }

View File

@ -153,6 +153,62 @@ fn test_shutdown() {
let _ = sys.stop(); let _ = sys.stop();
} }
#[test]
#[cfg(unix)]
fn test_panic() {
let _ = test::TestServer::unused_addr();
let (tx, rx) = mpsc::channel();
thread::spawn(|| {
System::run(move || {
let srv = server::new(|| {
App::new()
.resource("/panic", |r| {
r.method(http::Method::GET).f(|_| -> &'static str {
panic!("error");
});
})
.resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})
}).workers(1);
let srv = srv.bind("127.0.0.1:0").unwrap();
let addr = srv.addrs()[0];
srv.start();
let _ = tx.send((addr, System::current()));
});
});
let (addr, sys) = rx.recv().unwrap();
System::set_current(sys.clone());
let mut rt = Runtime::new().unwrap();
{
let req = client::ClientRequest::get(format!("http://{}/panic", addr).as_str())
.finish()
.unwrap();
let response = rt.block_on(req.send());
assert!(response.is_err());
}
{
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
let response = rt.block_on(req.send());
assert!(response.is_err());
}
{
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
let response = rt.block_on(req.send()).unwrap();
assert!(response.status().is_success());
}
let _ = sys.stop();
}
#[test] #[test]
fn test_simple() { fn test_simple() {
let mut srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok())); let mut srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok()));

View File

@ -277,13 +277,12 @@ fn test_ws_server_ssl() {
#[test] #[test]
#[cfg(feature = "rust-tls")] #[cfg(feature = "rust-tls")]
fn test_ws_server_ssl() { fn test_ws_server_rust_tls() {
extern crate rustls; extern crate rustls;
use rustls::{ServerConfig, NoClientAuth};
use rustls::internal::pemfile::{certs, rsa_private_keys}; use rustls::internal::pemfile::{certs, rsa_private_keys};
use std::io::BufReader; use rustls::{NoClientAuth, ServerConfig};
use std::sync::Arc;
use std::fs::File; use std::fs::File;
use std::io::BufReader;
// load ssl keys // load ssl keys
let mut config = ServerConfig::new(NoClientAuth::new()); let mut config = ServerConfig::new(NoClientAuth::new());
@ -293,7 +292,7 @@ fn test_ws_server_ssl() {
let mut keys = rsa_private_keys(key_file).unwrap(); let mut keys = rsa_private_keys(key_file).unwrap();
config.set_single_cert(cert_chain, keys.remove(0)).unwrap(); config.set_single_cert(cert_chain, keys.remove(0)).unwrap();
let mut srv = test::TestServer::build().ssl(Arc::new(config)).start(|app| { let mut srv = test::TestServer::build().rustls(config).start(|app| {
app.handler(|req| { app.handler(|req| {
ws::start( ws::start(
req, req,