From 58230b15b9cd67a98e65a074652bd384e24757f6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 31 Jul 2018 19:51:26 -0700 Subject: [PATCH] use one thread for accept loop; refactor rust-tls support --- .travis.yml | 6 +- src/server/accept.rs | 439 +++++++++++++++++++++++++++---------------- src/server/mod.rs | 6 +- src/server/srv.rs | 57 +++--- src/test.rs | 65 ++++--- tests/test_server.rs | 56 ++++++ tests/test_ws.rs | 9 +- 7 files changed, 406 insertions(+), 232 deletions(-) diff --git a/.travis.yml b/.travis.yml index 54a86aa7..f03c9523 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,12 +32,12 @@ script: - | if [[ "$TRAVIS_RUST_VERSION" != "stable" ]]; then cargo clean - cargo test --features="alpn,tls" -- --nocapture + cargo test --features="alpn,tls,rust-tls" -- --nocapture fi - | if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install -f cargo-tarpaulin - cargo tarpaulin --features="alpn,tls" --out Xml --no-count + cargo tarpaulin --features="alpn,tls,rust-tls" --out Xml --no-count bash <(curl -s https://codecov.io/bash) echo "Uploaded code coverage" fi @@ -46,7 +46,7 @@ script: after_success: - | if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then - cargo doc --features "alpn, tls, session" --no-deps && + cargo doc --features "alpn, tls, rust-tls, session" --no-deps && echo "" > target/doc/index.html && git clone https://github.com/davisp/ghp-import.git && ./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc && diff --git a/src/server/accept.rs b/src/server/accept.rs index a91ca814..75280560 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -1,22 +1,16 @@ use std::sync::mpsc as sync_mpsc; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{io, net, thread}; -use futures::sync::mpsc; +use futures::{sync::mpsc, Future}; use mio; use slab::Slab; +use tokio_timer::Delay; -#[cfg(feature = "tls")] -use native_tls::TlsAcceptor; - -#[cfg(feature = "alpn")] -use openssl::ssl::{AlpnError, SslAcceptorBuilder}; - -#[cfg(feature = "rust-tls")] -use rustls::ServerConfig; +use actix::{msgs::Execute, Arbiter, System}; use super::srv::{ServerCommand, Socket}; -use super::worker::{Conn, SocketInfo}; +use super::worker::Conn; pub(crate) enum Command { Pause, @@ -25,169 +19,43 @@ pub(crate) enum Command { Worker(usize, mpsc::UnboundedSender>), } +struct ServerSocketInfo { + addr: net::SocketAddr, + token: usize, + sock: mio::net::TcpListener, + timeout: Option, +} + +struct Accept { + poll: mio::Poll, + rx: sync_mpsc::Receiver, + sockets: Slab, + workers: Vec<(usize, mpsc::UnboundedSender>)>, + _reg: mio::Registration, + next: usize, + srv: mpsc::UnboundedSender, + timer: (mio::Registration, mio::SetReadiness), +} + +const CMD: mio::Token = mio::Token(0); +const TIMER: mio::Token = mio::Token(1); + pub(crate) fn start_accept_thread( - token: usize, sock: Socket, srv: mpsc::UnboundedSender, - socks: Slab, - mut workers: Vec<(usize, mpsc::UnboundedSender>)>, + socks: Vec<(usize, Socket)>, srv: mpsc::UnboundedSender, + workers: Vec<(usize, mpsc::UnboundedSender>)>, ) -> (mio::SetReadiness, sync_mpsc::Sender) { let (tx, rx) = sync_mpsc::channel(); let (reg, readiness) = mio::Registration::new2(); + let sys = System::current(); + // start accept thread #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] let _ = thread::Builder::new() - .name(format!("Accept on {}", sock.addr)) + .name("actix-web accept loop".to_owned()) .spawn(move || { - const SRV: mio::Token = mio::Token(0); - const CMD: mio::Token = mio::Token(1); - - let addr = sock.addr; - let mut server = Some( - mio::net::TcpListener::from_std(sock.lst) - .expect("Can not create mio::net::TcpListener"), - ); - - // Create a poll instance - let poll = match mio::Poll::new() { - Ok(poll) => poll, - Err(err) => panic!("Can not create mio::Poll: {}", err), - }; - - // Start listening for incoming connections - if let Some(ref srv) = server { - if let Err(err) = - poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register io: {}", err); - } - } - - // Start listening for incoming commands - if let Err(err) = - poll.register(®, CMD, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register Registration: {}", err); - } - - // Create storage for events - let mut events = mio::Events::with_capacity(128); - - // Sleep on error - let sleep = Duration::from_millis(100); - - let mut next = 0; - loop { - if let Err(err) = poll.poll(&mut events, None) { - panic!("Poll error: {}", err); - } - - for event in events.iter() { - match event.token() { - SRV => if let Some(ref server) = server { - loop { - match server.accept_std() { - Ok((io, addr)) => { - let mut msg = Conn { - io, - token, - peer: Some(addr), - http2: false, - }; - while !workers.is_empty() { - match workers[next].1.unbounded_send(msg) { - Ok(_) => (), - Err(err) => { - let _ = srv.unbounded_send( - ServerCommand::WorkerDied( - workers[next].0, - socks.clone(), - ), - ); - msg = err.into_inner(); - workers.swap_remove(next); - if workers.is_empty() { - error!("No workers"); - thread::sleep(sleep); - break; - } else if workers.len() <= next { - next = 0; - } - continue; - } - } - next = (next + 1) % workers.len(); - break; - } - } - Err(ref e) - if e.kind() == io::ErrorKind::WouldBlock => - { - break - } - Err(ref e) if connection_error(e) => continue, - Err(e) => { - error!("Error accepting connection: {}", e); - // sleep after error - thread::sleep(sleep); - break; - } - } - } - }, - CMD => match rx.try_recv() { - Ok(cmd) => match cmd { - Command::Pause => if let Some(ref server) = server { - if let Err(err) = poll.deregister(server) { - error!( - "Can not deregister server socket {}", - err - ); - } else { - info!( - "Paused accepting connections on {}", - addr - ); - } - }, - Command::Resume => { - if let Some(ref server) = server { - if let Err(err) = poll.register( - server, - SRV, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - error!("Can not resume socket accept process: {}", err); - } else { - info!("Accepting connections on {} has been resumed", - addr); - } - } - } - Command::Stop => { - if let Some(server) = server.take() { - let _ = poll.deregister(&server); - } - return; - } - Command::Worker(idx, addr) => { - workers.push((idx, addr)); - } - }, - Err(err) => match err { - sync_mpsc::TryRecvError::Empty => (), - sync_mpsc::TryRecvError::Disconnected => { - if let Some(server) = server.take() { - let _ = poll.deregister(&server); - } - return; - } - }, - }, - _ => unreachable!(), - } - } - } + System::set_current(sys); + Accept::new(reg, rx, socks, workers, srv).poll(); }); (readiness, tx) @@ -205,3 +73,244 @@ fn connection_error(e: &io::Error) -> bool { || e.kind() == io::ErrorKind::ConnectionAborted || e.kind() == io::ErrorKind::ConnectionReset } + +impl Accept { + fn new( + _reg: mio::Registration, rx: sync_mpsc::Receiver, + socks: Vec<(usize, Socket)>, + workers: Vec<(usize, mpsc::UnboundedSender>)>, + srv: mpsc::UnboundedSender, + ) -> Accept { + // Create a poll instance + let poll = match mio::Poll::new() { + Ok(poll) => poll, + Err(err) => panic!("Can not create mio::Poll: {}", err), + }; + + // Start listening for incoming commands + if let Err(err) = + poll.register(&_reg, CMD, mio::Ready::readable(), mio::PollOpt::edge()) + { + panic!("Can not register Registration: {}", err); + } + + // Start accept + let mut sockets = Slab::new(); + for (stoken, sock) in socks { + let server = mio::net::TcpListener::from_std(sock.lst) + .expect("Can not create mio::net::TcpListener"); + + let entry = sockets.vacant_entry(); + let token = entry.key(); + + // Start listening for incoming connections + if let Err(err) = poll.register( + &server, + mio::Token(token + 1000), + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + panic!("Can not register io: {}", err); + } + + entry.insert(ServerSocketInfo { + token: stoken, + addr: sock.addr, + sock: server, + timeout: None, + }); + } + + // Timer + let (tm, tmr) = mio::Registration::new2(); + if let Err(err) = + poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge()) + { + panic!("Can not register Registration: {}", err); + } + + Accept { + poll, + rx, + _reg, + sockets, + workers, + srv, + next: 0, + timer: (tm, tmr), + } + } + + fn poll(&mut self) { + // Create storage for events + let mut events = mio::Events::with_capacity(128); + + loop { + if let Err(err) = self.poll.poll(&mut events, None) { + panic!("Poll error: {}", err); + } + + for event in events.iter() { + let token = event.token(); + match token { + CMD => if !self.process_cmd() { + return; + }, + TIMER => self.process_timer(), + _ => self.accept(token), + } + } + } + } + + fn process_timer(&mut self) { + let now = Instant::now(); + for (token, info) in self.sockets.iter_mut() { + if let Some(inst) = info.timeout.take() { + if now > inst { + if let Err(err) = self.poll.register( + &info.sock, + mio::Token(token + 1000), + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + error!("Can not register server socket {}", err); + } else { + info!("Resume accepting connections on {}", info.addr); + } + } else { + info.timeout = Some(inst); + } + } + } + } + + fn process_cmd(&mut self) -> bool { + loop { + match self.rx.try_recv() { + Ok(cmd) => match cmd { + Command::Pause => { + for (_, info) in self.sockets.iter_mut() { + if let Err(err) = self.poll.deregister(&info.sock) { + error!("Can not deregister server socket {}", err); + } else { + info!("Paused accepting connections on {}", info.addr); + } + } + } + Command::Resume => { + for (token, info) in self.sockets.iter() { + if let Err(err) = self.poll.register( + &info.sock, + mio::Token(token + 1000), + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + error!("Can not resume socket accept process: {}", err); + } else { + info!( + "Accepting connections on {} has been resumed", + info.addr + ); + } + } + } + Command::Stop => { + for (_, info) in self.sockets.iter() { + let _ = self.poll.deregister(&info.sock); + } + return false; + } + Command::Worker(idx, addr) => { + self.workers.push((idx, addr)); + } + }, + Err(err) => match err { + sync_mpsc::TryRecvError::Empty => break, + sync_mpsc::TryRecvError::Disconnected => { + for (_, info) in self.sockets.iter() { + let _ = self.poll.deregister(&info.sock); + } + return false; + } + }, + } + } + true + } + + fn accept(&mut self, token: mio::Token) { + let token = usize::from(token); + if token < 1000 { + return; + } + + if let Some(info) = self.sockets.get_mut(token - 1000) { + loop { + match info.sock.accept_std() { + Ok((io, addr)) => { + let mut msg = Conn { + io, + token: info.token, + peer: Some(addr), + http2: false, + }; + while !self.workers.is_empty() { + match self.workers[self.next].1.unbounded_send(msg) { + Ok(_) => (), + Err(err) => { + let _ = self.srv.unbounded_send( + ServerCommand::WorkerDied( + self.workers[self.next].0, + ), + ); + msg = err.into_inner(); + self.workers.swap_remove(self.next); + if self.workers.is_empty() { + error!("No workers"); + thread::sleep(Duration::from_millis(100)); + break; + } else if self.workers.len() <= self.next { + self.next = 0; + } + continue; + } + } + self.next = (self.next + 1) % self.workers.len(); + break; + } + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(ref e) if connection_error(e) => continue, + Err(e) => { + error!("Error accepting connection: {}", e); + if let Err(err) = self.poll.deregister(&info.sock) { + error!("Can not deregister server socket {}", err); + } + + // sleep after error + info.timeout = Some(Instant::now() + Duration::from_millis(500)); + + let r = self.timer.1.clone(); + System::current().arbiter().do_send(Execute::new( + move || -> Result<(), ()> { + Arbiter::spawn( + Delay::new( + Instant::now() + Duration::from_millis(510), + ).map_err(|_| ()) + .and_then(move |_| { + let _ = + r.set_readiness(mio::Ready::readable()); + Ok(()) + }), + ); + Ok(()) + }, + )); + break; + } + } + } + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index a4f5e87d..429e293f 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -315,10 +315,10 @@ impl IoStream for TlsStream { #[cfg(feature = "rust-tls")] use rustls::{ClientSession, ServerSession}; #[cfg(feature = "rust-tls")] -use tokio_rustls::TlsStream; +use tokio_rustls::TlsStream as RustlsStream; #[cfg(feature = "rust-tls")] -impl IoStream for TlsStream { +impl IoStream for RustlsStream { #[inline] fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { let _ = ::shutdown(self); @@ -337,7 +337,7 @@ impl IoStream for TlsStream { } #[cfg(feature = "rust-tls")] -impl IoStream for TlsStream { +impl IoStream for RustlsStream { #[inline] fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { let _ = ::shutdown(self); diff --git a/src/server/srv.rs b/src/server/srv.rs index a054d5a7..e776f742 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -46,14 +46,6 @@ fn configure_alpn(builder: &mut SslAcceptorBuilder) -> io::Result<()> { Ok(()) } -#[cfg(all(feature = "rust-tls", not(feature = "alpn")))] -fn configure_alpn(builder: &mut Arc) -> io::Result<()> { - Arc::::get_mut(builder) - .unwrap() - .set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]); - Ok(()) -} - /// An HTTP Server pub struct HttpServer where @@ -68,7 +60,11 @@ where #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] workers: Vec<(usize, Addr>)>, sockets: Vec, - accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, + accept: Option<( + mio::SetReadiness, + sync_mpsc::Sender, + Slab, + )>, exit: bool, shutdown_timeout: u16, signals: Option>, @@ -77,7 +73,7 @@ where } pub(crate) enum ServerCommand { - WorkerDied(usize, Slab), + WorkerDied(usize), } impl Actor for HttpServer @@ -114,7 +110,7 @@ where factory: Arc::new(f), workers: Vec::new(), sockets: Vec::new(), - accept: Vec::new(), + accept: None, exit: false, shutdown_timeout: 30, signals: None, @@ -280,22 +276,22 @@ where Ok(self) } - #[cfg(all(feature = "rust-tls", not(feature = "alpn")))] + #[cfg(feature = "rust-tls")] /// Use listener for accepting incoming tls connection requests /// /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn listen_ssl( - mut self, lst: net::TcpListener, mut builder: Arc, + pub fn listen_rustls( + mut self, lst: net::TcpListener, mut builder: ServerConfig, ) -> io::Result { // alpn support if !self.no_http2 { - configure_alpn(&mut builder)?; + builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]); } let addr = lst.local_addr().unwrap(); self.sockets.push(Socket { addr, lst, - tp: StreamHandlerType::Rustls(builder.clone()), + tp: StreamHandlerType::Rustls(Arc::new(builder)), }); Ok(self) } @@ -378,20 +374,21 @@ where Ok(self) } - #[cfg(all(feature = "rust-tls", not(feature = "alpn")))] + #[cfg(feature = "rust-tls")] /// Start listening for incoming tls connections. /// /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn bind_ssl( - mut self, addr: S, mut builder: Arc, + pub fn bind_rustls( + mut self, addr: S, mut builder: ServerConfig, ) -> io::Result { // alpn support if !self.no_http2 { - configure_alpn(&mut builder)?; + builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]); } + let builder = Arc::new(builder); let sockets = self.bind2(addr)?; - self.sockets.extend(sockets.into_iter().map(|mut s| { + self.sockets.extend(sockets.into_iter().map(move |mut s| { s.tp = StreamHandlerType::Rustls(builder.clone()); s })); @@ -487,17 +484,12 @@ impl HttpServer { let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false); let workers = self.start_workers(&settings, &socks); - // start acceptors threads - for (token, sock) in addrs { + // start accept thread + for (_, sock) in &addrs { info!("Starting server on http://{}", sock.addr); - self.accept.push(start_accept_thread( - token, - sock, - tx.clone(), - socks.clone(), - workers.clone(), - )); } + let (r, cmd) = start_accept_thread(addrs, tx.clone(), workers.clone()); + self.accept = Some((r, cmd, socks)); // start http server actor let signals = self.subscribe_to_signals(); @@ -672,7 +664,7 @@ impl StreamHandler for HttpServer { fn handle(&mut self, msg: ServerCommand, _: &mut Context) { match msg { - ServerCommand::WorkerDied(idx, socks) => { + ServerCommand::WorkerDied(idx) => { let mut found = false; for i in 0..self.workers.len() { if self.workers[i].0 == idx { @@ -700,6 +692,7 @@ impl StreamHandler for HttpServer { let ka = self.keep_alive; let factory = Arc::clone(&self.factory); let host = self.host.clone(); + let socks = self.accept.as_ref().unwrap().2.clone(); let addr = socks[0].addr; let addr = Arbiter::start(move |ctx: &mut Context<_>| { @@ -709,7 +702,7 @@ impl StreamHandler for HttpServer { ctx.add_message_stream(rx); Worker::new(apps, socks, ka, settings) }); - for item in &self.accept { + if let Some(ref item) = &self.accept { let _ = item.1.send(Command::Worker(new_idx, tx.clone())); let _ = item.0.set_readiness(mio::Ready::readable()); } diff --git a/src/test.rs b/src/test.rs index f94732dd..5c520a75 100644 --- a/src/test.rs +++ b/src/test.rs @@ -15,10 +15,10 @@ use tokio::runtime::current_thread::Runtime; #[cfg(feature = "alpn")] use openssl::ssl::SslAcceptorBuilder; -#[cfg(feature = "rust-tls")] +#[cfg(all(feature = "rust-tls"))] use rustls::ServerConfig; -#[cfg(feature = "rust-tls")] -use std::sync::Arc; +//#[cfg(all(feature = "rust-tls"))] +//use std::sync::Arc; use application::{App, HttpApplication}; use body::Binary; @@ -144,7 +144,7 @@ impl TestServer { builder.set_verify(SslVerifyMode::NONE); ClientConnector::with_connector(builder.build()).start() } - #[cfg(feature = "rust-tls")] + #[cfg(all(feature = "rust-tls", not(feature = "alpn")))] { use rustls::ClientConfig; use std::fs::File; @@ -256,7 +256,7 @@ pub struct TestServerBuilder { #[cfg(feature = "alpn")] ssl: Option, #[cfg(feature = "rust-tls")] - ssl: Option>, + rust_ssl: Option, } impl TestServerBuilder { @@ -267,8 +267,10 @@ impl TestServerBuilder { { TestServerBuilder { state: Box::new(state), - #[cfg(any(feature = "alpn", feature = "rust-tls"))] + #[cfg(feature = "alpn")] ssl: None, + #[cfg(feature = "rust-tls")] + rust_ssl: None, } } @@ -280,9 +282,9 @@ impl TestServerBuilder { } #[cfg(feature = "rust-tls")] - /// Create ssl server - pub fn ssl(mut self, ssl: Arc) -> Self { - self.ssl = Some(ssl); + /// Create rust tls server + pub fn rustls(mut self, ssl: ServerConfig) -> Self { + self.rust_ssl = Some(ssl); self } @@ -294,41 +296,56 @@ impl TestServerBuilder { { let (tx, rx) = mpsc::channel(); - #[cfg(any(feature = "alpn", feature = "rust-tls"))] - let ssl = self.ssl.is_some(); - #[cfg(not(any(feature = "alpn", feature = "rust-tls")))] - let ssl = false; + let mut has_ssl = false; + + #[cfg(feature = "alpn")] + { + has_ssl = has_ssl || self.ssl.is_some(); + } + + #[cfg(feature = "rust-tls")] + { + has_ssl = has_ssl || self.rust_ssl.is_some(); + } // run server in separate thread thread::spawn(move || { - let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); - let local_addr = tcp.local_addr().unwrap(); + let addr = TestServer::unused_addr(); let sys = System::new("actix-test-server"); let state = self.state; - let srv = HttpServer::new(move || { + let mut srv = HttpServer::new(move || { let mut app = TestApp::new(state()); config(&mut app); vec![app] }).workers(1) .disable_signals(); - tx.send((System::current(), local_addr, TestServer::get_conn())) + tx.send((System::current(), addr, TestServer::get_conn())) .unwrap(); - #[cfg(any(feature = "alpn", feature = "rust-tls"))] + #[cfg(feature = "alpn")] { let ssl = self.ssl.take(); if let Some(ssl) = ssl { - srv.listen_ssl(tcp, ssl).unwrap().start(); - } else { - srv.listen(tcp).start(); + let tcp = net::TcpListener::bind(addr).unwrap(); + srv = srv.listen_ssl(tcp, ssl).unwrap(); } } - #[cfg(not(any(feature = "alpn", feature = "rust-tls")))] + #[cfg(feature = "rust-tls")] { - srv.listen(tcp).start(); + let ssl = self.rust_ssl.take(); + if let Some(ssl) = ssl { + let tcp = net::TcpListener::bind(addr).unwrap(); + srv = srv.listen_rustls(tcp, ssl).unwrap(); + } } + if !has_ssl { + let tcp = net::TcpListener::bind(addr).unwrap(); + srv = srv.listen(tcp); + } + srv.start(); + sys.run(); }); @@ -336,8 +353,8 @@ impl TestServerBuilder { System::set_current(system); TestServer { addr, - ssl, conn, + ssl: has_ssl, rt: Runtime::new().unwrap(), } } diff --git a/tests/test_server.rs b/tests/test_server.rs index 82a318e5..3a825928 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -153,6 +153,62 @@ fn test_shutdown() { let _ = sys.stop(); } +#[test] +#[cfg(unix)] +fn test_panic() { + let _ = test::TestServer::unused_addr(); + let (tx, rx) = mpsc::channel(); + + thread::spawn(|| { + System::run(move || { + let srv = server::new(|| { + App::new() + .resource("/panic", |r| { + r.method(http::Method::GET).f(|_| -> &'static str { + panic!("error"); + }); + }) + .resource("/", |r| { + r.method(http::Method::GET).f(|_| HttpResponse::Ok()) + }) + }).workers(1); + + let srv = srv.bind("127.0.0.1:0").unwrap(); + let addr = srv.addrs()[0]; + srv.start(); + let _ = tx.send((addr, System::current())); + }); + }); + let (addr, sys) = rx.recv().unwrap(); + System::set_current(sys.clone()); + + let mut rt = Runtime::new().unwrap(); + { + let req = client::ClientRequest::get(format!("http://{}/panic", addr).as_str()) + .finish() + .unwrap(); + let response = rt.block_on(req.send()); + assert!(response.is_err()); + } + + { + let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) + .finish() + .unwrap(); + let response = rt.block_on(req.send()); + assert!(response.is_err()); + } + { + let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) + .finish() + .unwrap(); + let response = rt.block_on(req.send()).unwrap(); + assert!(response.status().is_success()); + } + + let _ = sys.stop(); +} + #[test] fn test_simple() { let mut srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok())); diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 1ed80bf7..94f38978 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -277,13 +277,12 @@ fn test_ws_server_ssl() { #[test] #[cfg(feature = "rust-tls")] -fn test_ws_server_ssl() { +fn test_ws_server_rust_tls() { extern crate rustls; - use rustls::{ServerConfig, NoClientAuth}; use rustls::internal::pemfile::{certs, rsa_private_keys}; - use std::io::BufReader; - use std::sync::Arc; + use rustls::{NoClientAuth, ServerConfig}; use std::fs::File; + use std::io::BufReader; // load ssl keys let mut config = ServerConfig::new(NoClientAuth::new()); @@ -293,7 +292,7 @@ fn test_ws_server_ssl() { let mut keys = rsa_private_keys(key_file).unwrap(); config.set_single_cert(cert_chain, keys.remove(0)).unwrap(); - let mut srv = test::TestServer::build().ssl(Arc::new(config)).start(|app| { + let mut srv = test::TestServer::build().rustls(config).start(|app| { app.handler(|req| { ws::start( req,