1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-23 23:51:06 +01:00

Allow to use ssl and non-ssl connections with the same HttpServer #206

This commit is contained in:
Nikolay Kim 2018-04-30 19:51:55 -07:00
parent bfd46e6a71
commit d43ca96c5c
6 changed files with 174 additions and 171 deletions

View File

@ -85,6 +85,7 @@ bytes = "0.4"
byteorder = "1"
futures = "0.1"
futures-cpupool = "0.1"
slab = "0.4"
tokio-io = "0.1"
tokio-core = "0.1"

View File

@ -3,6 +3,9 @@
* `ws::Message::Close` now includes optional close reason.
`ws::CloseCode::Status` and `ws::CloseCode::Empty` have been removed.
* `HttpServer::start_ssl()` and `HttpServer::start_tls()` deprecated.
Use `HttpServer::bind_ssl()` and `HttpServer::bind_tls()` instead.
## Migration from 0.4 to 0.5

View File

@ -97,6 +97,7 @@ extern crate mime_guess;
extern crate mio;
extern crate net2;
extern crate rand;
extern crate slab;
extern crate tokio_core;
extern crate tokio_io;
extern crate url;

View File

@ -71,7 +71,7 @@ impl<S: 'static> Scope<S> {
///
/// ```rust
/// # extern crate actix_web;
/// use actix_web::{http, App, HttpRequest, HttpResponse, Path};
/// use actix_web::{App, HttpRequest};
///
/// struct AppState;
///

View File

@ -10,6 +10,7 @@ use futures::{Future, Sink, Stream};
use mio;
use net2::TcpBuilder;
use num_cpus;
use slab::Slab;
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "tls")]
@ -20,7 +21,7 @@ use openssl::ssl::{AlpnError, SslAcceptorBuilder};
use super::channel::{HttpChannel, WrapperStream};
use super::settings::{ServerSettings, WorkerSettings};
use super::worker::{Conn, StopWorker, StreamHandlerType, Worker};
use super::worker::{Conn, SocketInfo, StopWorker, StreamHandlerType, Worker};
use super::{IntoHttpHandler, IoStream, KeepAlive};
use super::{PauseServer, ResumeServer, StopServer};
@ -37,7 +38,7 @@ where
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Syn, Worker<H::Handler>>)>,
sockets: Vec<(net::SocketAddr, net::TcpListener)>,
sockets: Vec<Socket>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool,
shutdown_timeout: u16,
@ -57,14 +58,8 @@ where
{
}
#[derive(Clone)]
struct Info {
addr: net::SocketAddr,
handler: StreamHandlerType,
}
enum ServerCommand {
WorkerDied(usize, Info),
WorkerDied(usize, Slab<SocketInfo>),
}
impl<H> Actor for HttpServer<H>
@ -74,6 +69,12 @@ where
type Context = Context<Self>;
}
struct Socket {
lst: net::TcpListener,
addr: net::SocketAddr,
tp: StreamHandlerType,
}
impl<H> HttpServer<H>
where
H: IntoHttpHandler + 'static,
@ -187,7 +188,7 @@ where
/// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.iter().map(|s| s.0).collect()
self.sockets.iter().map(|s| s.addr).collect()
}
/// Use listener for accepting incoming connection requests
@ -195,21 +196,29 @@ where
/// HttpServer does not change any configuration for TcpListener,
/// it needs to be configured before passing it to listen() method.
pub fn listen(mut self, lst: net::TcpListener) -> Self {
self.sockets.push((lst.local_addr().unwrap(), lst));
let addr = lst.local_addr().unwrap();
self.sockets.push(Socket {
addr,
lst,
tp: StreamHandlerType::Normal,
});
self
}
/// The socket address to bind
///
/// To mind multiple addresses this method can be call multiple times.
pub fn bind<S: net::ToSocketAddrs>(mut self, addr: S) -> io::Result<Self> {
fn bind2<S: net::ToSocketAddrs>(&mut self, addr: S) -> io::Result<Vec<Socket>> {
let mut err = None;
let mut succ = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, self.backlog) {
Ok(lst) => {
succ = true;
self.sockets.push((lst.local_addr().unwrap(), lst));
let addr = lst.local_addr().unwrap();
sockets.push(Socket {
lst,
addr,
tp: StreamHandlerType::Normal,
});
}
Err(e) => err = Some(e),
}
@ -225,12 +234,65 @@ where
))
}
} else {
Ok(self)
Ok(sockets)
}
}
/// The socket address to bind
///
/// To mind multiple addresses this method can be call multiple times.
pub fn bind<S: net::ToSocketAddrs>(mut self, addr: S) -> io::Result<Self> {
let sockets = self.bind2(addr)?;
self.sockets.extend(sockets);
Ok(self)
}
#[cfg(feature = "tls")]
/// The ssl socket address to bind
///
/// To mind multiple addresses this method can be call multiple times.
pub fn bind_tls<S: net::ToSocketAddrs>(
mut self, addr: S, acceptor: TlsAcceptor,
) -> io::Result<Self> {
let sockets = self.bind2(addr)?;
self.sockets.extend(sockets.into_iter().map(|mut s| {
s.tp = StreamHandlerType::Tls(acceptor.clone());
s
}));
Ok(self)
}
#[cfg(feature = "alpn")]
/// Start listening for incoming tls connections.
///
/// This method sets alpn protocols to "h2" and "http/1.1"
pub fn bind_ssl<S: net::ToSocketAddrs>(
mut self, addr: S, mut builder: SslAcceptorBuilder,
) -> io::Result<Self> {
// alpn support
if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
}
let acceptor = builder.build();
let sockets = self.bind2(addr)?;
self.sockets.extend(sockets.into_iter().map(|mut s| {
s.tp = StreamHandlerType::Alpn(acceptor.clone());
s
}));
Ok(self)
}
fn start_workers(
&mut self, settings: &ServerSettings, handler: &StreamHandlerType,
&mut self, settings: &ServerSettings, sockets: &Slab<SocketInfo>,
) -> Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)> {
// start workers
let mut workers = Vec::new();
@ -238,8 +300,8 @@ where
let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let h = handler.clone();
let ka = self.keep_alive;
let socks = sockets.clone();
let factory = Arc::clone(&self.factory);
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let apps: Vec<_> = (*factory)()
@ -247,7 +309,7 @@ where
.map(|h| h.into_handler(s.clone()))
.collect();
ctx.add_message_stream(rx);
Worker::new(apps, h, ka)
Worker::new(apps, socks, ka)
});
workers.push((idx, tx));
self.workers.push((idx, addr));
@ -304,24 +366,32 @@ impl<H: IntoHttpHandler> HttpServer<H> {
panic!("HttpServer::bind() has to be called before start()");
} else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info {
addr: addrs[0].0,
handler: StreamHandlerType::Normal,
};
let mut socks = Slab::new();
let mut addrs: Vec<(usize, Socket)> = Vec::new();
for socket in self.sockets.drain(..) {
let entry = socks.vacant_entry();
let token = entry.key();
entry.insert(SocketInfo {
addr: socket.addr,
htype: socket.tp.clone(),
});
addrs.push((token, socket));
}
let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false);
let workers = self.start_workers(&settings, &socks);
// start acceptors threads
for (addr, sock) in addrs {
info!("Starting server on http://{}", addr);
for (token, sock) in addrs {
info!("Starting server on http://{}", sock.addr);
self.accept.push(start_accept_thread(
token,
sock,
addr,
self.backlog,
tx.clone(),
info.clone(),
socks.clone(),
workers.clone(),
));
}
@ -373,55 +443,30 @@ impl<H: IntoHttpHandler> HttpServer<H> {
}
}
#[doc(hidden)]
#[cfg(feature = "tls")]
#[deprecated(
since = "0.6.0", note = "please use `actix_web::HttpServer::bind_tls` instead"
)]
impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming tls connections.
pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result<Addr<Syn, Self>> {
if self.sockets.is_empty() {
Err(io::Error::new(
io::ErrorKind::Other,
"No socket addresses are bound",
))
} else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers =
self.start_workers(&settings, &StreamHandlerType::Tls(acceptor.clone()));
let info = Info {
addr: addrs[0].0,
handler: StreamHandlerType::Tls(acceptor),
};
// start acceptors threads
for (addr, sock) in addrs {
info!("Starting server on https://{}", addr);
self.accept.push(start_accept_thread(
sock,
addr,
self.backlog,
tx.clone(),
info.clone(),
workers.clone(),
));
for sock in &mut self.sockets {
match sock.tp {
StreamHandlerType::Normal => (),
_ => continue,
}
// start http server actor
let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::create(|ctx| {
ctx.add_stream(rx);
self
});
signals.map(|signals| {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
});
Ok(addr)
sock.tp = StreamHandlerType::Tls(acceptor.clone());
}
Ok(self.start())
}
}
#[doc(hidden)]
#[cfg(feature = "alpn")]
#[deprecated(
since = "0.6.0", note = "please use `actix_web::HttpServer::bind_ssl` instead"
)]
impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming tls connections.
///
@ -429,63 +474,28 @@ impl<H: IntoHttpHandler> HttpServer<H> {
pub fn start_ssl(
mut self, mut builder: SslAcceptorBuilder,
) -> io::Result<Addr<Syn, Self>> {
if self.sockets.is_empty() {
Err(io::Error::new(
io::ErrorKind::Other,
"No socket addresses are bound",
))
} else {
// alpn support
if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
}
let (tx, rx) = mpsc::unbounded();
let acceptor = builder.build();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(
&settings,
&StreamHandlerType::Alpn(acceptor.clone()),
);
let info = Info {
addr: addrs[0].0,
handler: StreamHandlerType::Alpn(acceptor),
};
// start acceptors threads
for (addr, sock) in addrs {
info!("Starting server on https://{}", addr);
self.accept.push(start_accept_thread(
sock,
addr,
self.backlog,
tx.clone(),
info.clone(),
workers.clone(),
));
}
// start http server actor
let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::create(|ctx| {
ctx.add_stream(rx);
self
// alpn support
if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
signals.map(|signals| {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
});
Ok(addr)
}
let acceptor = builder.build();
for sock in &mut self.sockets {
match sock.tp {
StreamHandlerType::Normal => (),
_ => continue,
}
sock.tp = StreamHandlerType::Alpn(acceptor.clone());
}
Ok(self.start())
}
}
@ -499,32 +509,6 @@ impl<H: IntoHttpHandler> HttpServer<H> {
T: AsyncRead + AsyncWrite + 'static,
A: 'static,
{
let (tx, rx) = mpsc::unbounded();
if !self.sockets.is_empty() {
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain(..).collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info {
addr: addrs[0].0,
handler: StreamHandlerType::Normal,
};
// start acceptors threads
for (addr, sock) in addrs {
info!("Starting server on http://{}", addr);
self.accept.push(start_accept_thread(
sock,
addr,
self.backlog,
tx.clone(),
info.clone(),
workers.clone(),
));
}
}
// set server settings
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let settings = ServerSettings::new(Some(addr), &self.host, secure);
@ -537,9 +521,9 @@ impl<H: IntoHttpHandler> HttpServer<H> {
// start server
let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = HttpServer::create(move |ctx| {
ctx.add_stream(rx);
ctx.add_message_stream(stream.map_err(|_| ()).map(move |(t, _)| Conn {
io: WrapperStream::new(t),
token: 0,
peer: None,
http2: false,
}));
@ -585,7 +569,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx, info) => {
ServerCommand::WorkerDied(idx, socks) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
@ -610,11 +594,10 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
break;
}
let h = info.handler;
let ka = self.keep_alive;
let factory = Arc::clone(&self.factory);
let settings =
ServerSettings::new(Some(info.addr), &self.host, false);
ServerSettings::new(Some(socks[0].addr), &self.host, false);
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let apps: Vec<_> = (*factory)()
@ -622,7 +605,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
.map(|h| h.into_handler(settings.clone()))
.collect();
ctx.add_message_stream(rx);
Worker::new(apps, h, ka)
Worker::new(apps, socks, ka)
});
for item in &self.accept {
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
@ -738,8 +721,8 @@ enum Command {
}
fn start_accept_thread(
sock: net::TcpListener, addr: net::SocketAddr, backlog: i32,
srv: mpsc::UnboundedSender<ServerCommand>, info: Info,
token: usize, sock: Socket, backlog: i32, srv: mpsc::UnboundedSender<ServerCommand>,
socks: Slab<SocketInfo>,
mut workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
) -> (mio::SetReadiness, sync_mpsc::Sender<Command>) {
let (tx, rx) = sync_mpsc::channel();
@ -748,13 +731,14 @@ fn start_accept_thread(
// start accept thread
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
let _ = thread::Builder::new()
.name(format!("Accept on {}", addr))
.name(format!("Accept on {}", sock.addr))
.spawn(move || {
const SRV: mio::Token = mio::Token(0);
const CMD: mio::Token = mio::Token(1);
let addr = sock.addr;
let mut server = Some(
mio::net::TcpListener::from_std(sock)
mio::net::TcpListener::from_std(sock.lst)
.expect("Can not create mio::net::TcpListener"),
);
@ -800,9 +784,10 @@ fn start_accept_thread(
SRV => if let Some(ref server) = server {
loop {
match server.accept_std() {
Ok((sock, addr)) => {
Ok((io, addr)) => {
let mut msg = Conn {
io: sock,
io,
token,
peer: Some(addr),
http2: false,
};
@ -813,7 +798,7 @@ fn start_accept_thread(
let _ = srv.unbounded_send(
ServerCommand::WorkerDied(
workers[next].0,
info.clone(),
socks.clone(),
),
);
msg = err.into_inner();

View File

@ -1,6 +1,7 @@
use futures::unsync::oneshot;
use futures::Future;
use net2::TcpStreamExt;
use slab::Slab;
use std::rc::Rc;
use std::{net, time};
use tokio_core::net::TcpStream;
@ -29,10 +30,17 @@ use server::{HttpHandler, KeepAlive};
#[derive(Message)]
pub(crate) struct Conn<T> {
pub io: T,
pub token: usize,
pub peer: Option<net::SocketAddr>,
pub http2: bool,
}
#[derive(Clone)]
pub(crate) struct SocketInfo {
pub addr: net::SocketAddr,
pub htype: StreamHandlerType,
}
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
pub(crate) struct StopWorker {
@ -53,13 +61,13 @@ where
{
settings: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
socks: Slab<SocketInfo>,
tcp_ka: Option<time::Duration>,
}
impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new(
h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive,
h: Vec<H>, socks: Slab<SocketInfo>, keep_alive: KeepAlive,
) -> Worker<H> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
@ -70,7 +78,7 @@ impl<H: HttpHandler + 'static> Worker<H> {
Worker {
settings: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler,
socks,
tcp_ka,
}
}
@ -124,8 +132,11 @@ where
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
error!("Can not set socket keep-alive option");
}
self.handler
.handle(Rc::clone(&self.settings), &self.hnd, msg);
self.socks.get_mut(msg.token).unwrap().htype.handle(
Rc::clone(&self.settings),
&self.hnd,
msg,
);
}
}
@ -177,7 +188,9 @@ impl StreamHandlerType {
}
#[cfg(feature = "tls")]
StreamHandlerType::Tls(ref acceptor) => {
let Conn { io, peer, http2 } = msg;
let Conn {
io, peer, http2, ..
} = msg;
let _ = io.set_nodelay(true);
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");