diff --git a/.travis.yml b/.travis.yml index 494a6a300..e2d70678e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,12 +32,12 @@ script: - | if [[ "$TRAVIS_RUST_VERSION" != "stable" ]]; then cargo clean - cargo test --features="" -- --nocapture + cargo test --features="ssl" -- --nocapture fi - | if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install -f cargo-tarpaulin - cargo tarpaulin --features="" --out Xml --no-count + cargo tarpaulin --features="ssl" --out Xml --no-count bash <(curl -s https://codecov.io/bash) echo "Uploaded code coverage" fi @@ -46,7 +46,7 @@ script: after_success: - | if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then - cargo doc --features "session" --no-deps && + cargo doc --features "ssl,session" --no-deps && echo "" > target/doc/index.html && git clone https://github.com/davisp/ghp-import.git && ./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc && diff --git a/Cargo.toml b/Cargo.toml index d4ea4fc1e..536806316 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,9 @@ default = ["session", "brotli", "flate2-c"] tls = ["native-tls", "tokio-tls"] # openssl +ssl = ["openssl", "tokio-openssl", "actix-net/ssl"] + +# deprecated, use "ssl" alpn = ["openssl", "tokio-openssl", "actix-net/ssl"] # rustls diff --git a/src/lib.rs b/src/lib.rs index 1dfe143ef..099b0b16c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,8 +64,8 @@ //! ## Package feature //! //! * `tls` - enables ssl support via `native-tls` crate -//! * `alpn` - enables ssl support via `openssl` crate, require for `http/2` -//! support +//! * `ssl` - enables ssl support via `openssl` crate, supports `http/2` +//! * `rust-tls` - enables ssl support via `rustls` crate, supports `http/2` //! * `uds` - enables support for making client requests via Unix Domain Sockets. //! Unix only. Not necessary for *serving* requests. //! * `session` - enables session support, includes `ring` crate as diff --git a/src/server/h1.rs b/src/server/h1.rs index 82ab914a5..1d2ddbe2d 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -517,15 +517,14 @@ mod tests { use httpmessage::HttpMessage; use server::h1decoder::Message; use server::settings::{ServerSettings, WorkerSettings}; - use server::{Connections, KeepAlive, Request}; + use server::{KeepAlive, Request}; - fn wrk_settings() -> Rc> { - Rc::new(WorkerSettings::::new( + fn wrk_settings() -> WorkerSettings { + WorkerSettings::::new( Vec::new(), KeepAlive::Os, ServerSettings::default(), - Connections::default(), - )) + ) } impl Message { @@ -644,9 +643,9 @@ mod tests { fn test_req_parse1() { let buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n"); let readbuf = BytesMut::new(); - let settings = Rc::new(wrk_settings()); + let settings = wrk_settings(); - let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None); + let mut h1 = Http1::new(settings.clone(), buf, None, readbuf, false, None); h1.poll_io(); h1.poll_io(); assert_eq!(h1.tasks.len(), 1); @@ -657,9 +656,9 @@ mod tests { let buf = Buffer::new(""); let readbuf = BytesMut::from(Vec::::from(&b"GET /test HTTP/1.1\r\n\r\n"[..])); - let settings = Rc::new(wrk_settings()); + let settings = wrk_settings(); - let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true, None); + let mut h1 = Http1::new(settings.clone(), buf, None, readbuf, true, None); h1.poll_io(); assert_eq!(h1.tasks.len(), 1); } @@ -668,9 +667,9 @@ mod tests { fn test_req_parse_err() { let buf = Buffer::new("GET /test HTTP/1\r\n\r\n"); let readbuf = BytesMut::new(); - let settings = Rc::new(wrk_settings()); + let settings = wrk_settings(); - let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None); + let mut h1 = Http1::new(settings.clone(), buf, None, readbuf, false, None); h1.poll_io(); h1.poll_io(); assert!(h1.flags.contains(Flags::ERROR)); diff --git a/src/server/http.rs b/src/server/http.rs index b55842fa3..725cfbac0 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -2,19 +2,19 @@ use std::marker::PhantomData; use std::sync::Arc; use std::{io, mem, net, time}; -use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System}; +use actix::{Actor, Addr, AsyncContext, Context, Handler, System}; +use actix_net::{ssl, NewService, NewServiceExt, Server, Service}; use futures::future::{ok, FutureResult}; use futures::{Async, Poll, Stream}; use net2::TcpBuilder; use num_cpus; - -use actix_net::{ssl, NewService, Server, Service}; +use tokio_tcp::TcpStream; //#[cfg(feature = "tls")] //use native_tls::TlsAcceptor; -#[cfg(feature = "alpn")] +#[cfg(any(feature = "alpn", feature = "ssl"))] use openssl::ssl::SslAcceptorBuilder; //#[cfg(feature = "rust-tls")] @@ -25,9 +25,10 @@ use super::settings::{ServerSettings, WorkerSettings}; use super::{HttpHandler, IntoHttpHandler, IoStream, KeepAlive}; struct Socket { + scheme: &'static str, lst: net::TcpListener, addr: net::SocketAddr, - handler: Box>, + handler: Box>, } /// An HTTP Server @@ -194,10 +195,7 @@ where /// and the user should be presented with an enumeration of which /// socket requires which protocol. pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> { - self.sockets - .iter() - .map(|s| (s.addr, s.handler.scheme())) - .collect() + self.sockets.iter().map(|s| (s.addr, s.scheme)).collect() } /// Use listener for accepting incoming connection requests @@ -209,7 +207,8 @@ where self.sockets.push(Socket { lst, addr, - handler: Box::new(SimpleHandler { + scheme: "http", + handler: Box::new(SimpleFactory { addr, factory: self.factory.clone(), }), @@ -218,22 +217,28 @@ where self } - // #[doc(hidden)] - // /// Use listener for accepting incoming connection requests - // pub fn listen_with(mut self, lst: net::TcpListener, acceptor: A) -> Self - // where - // A: AcceptorService + Send + 'static, - // { - // let token = Token(self.handlers.len()); - // let addr = lst.local_addr().unwrap(); - // self.handlers.push(Box::new(StreamHandler::new( - // lst.local_addr().unwrap(), - // acceptor, - // ))); - // self.sockets.push(Socket { lst, addr, token }); + #[doc(hidden)] + /// Use listener for accepting incoming connection requests + pub(crate) fn listen_with(mut self, lst: net::TcpListener, acceptor: F) -> Self + where + F: Fn() -> T + Send + Clone + 'static, + T: NewService + Clone + 'static, + T::Response: IoStream, + { + let addr = lst.local_addr().unwrap(); + self.sockets.push(Socket { + lst, + addr, + scheme: "https", + handler: Box::new(AcceptorFactory { + addr, + acceptor, + factory: self.factory.clone(), + }), + }); - // self - // } + self + } // #[cfg(feature = "tls")] // /// Use listener for accepting incoming tls connection requests @@ -246,24 +251,27 @@ where // self.listen_with(lst, NativeTlsAcceptor::new(acceptor)) // } - // #[cfg(feature = "alpn")] - // /// Use listener for accepting incoming tls connection requests - // /// - // /// This method sets alpn protocols to "h2" and "http/1.1" - // pub fn listen_ssl( - // self, lst: net::TcpListener, builder: SslAcceptorBuilder, - // ) -> io::Result { - // use super::{OpensslAcceptor, ServerFlags}; + #[cfg(any(feature = "alpn", feature = "ssl"))] + /// Use listener for accepting incoming tls connection requests + /// + /// This method sets alpn protocols to "h2" and "http/1.1" + pub fn listen_ssl( + self, lst: net::TcpListener, builder: SslAcceptorBuilder, + ) -> io::Result { + use super::{openssl_acceptor_with_flags, ServerFlags}; - // alpn support - // let flags = if self.no_http2 { - // ServerFlags::HTTP1 - // } else { - // ServerFlags::HTTP1 | ServerFlags::HTTP2 - // }; + let flags = if self.no_http2 { + ServerFlags::HTTP1 + } else { + ServerFlags::HTTP1 | ServerFlags::HTTP2 + }; - // Ok(self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?)) - // } + let acceptor = openssl_acceptor_with_flags(builder, flags)?; + + Ok(self.listen_with(lst, move || { + ssl::OpensslAcceptor::new(acceptor.clone()).map_err(|_| ()) + })) + } // #[cfg(feature = "rust-tls")] // /// Use listener for accepting incoming tls connection requests @@ -400,60 +408,6 @@ where // } } -struct HttpService -where - H: HttpHandler, - F: IntoHttpHandler, - Io: IoStream, -{ - factory: Arc Vec + Send + Sync>, - addr: net::SocketAddr, - host: Option, - keep_alive: KeepAlive, - _t: PhantomData<(H, Io)>, -} - -impl NewService for HttpService -where - H: HttpHandler, - F: IntoHttpHandler, - Io: IoStream, -{ - type Request = Io; - type Response = (); - type Error = (); - type InitError = (); - type Service = HttpServiceHandler; - type Future = FutureResult; - - fn new_service(&self) -> Self::Future { - let s = ServerSettings::new(Some(self.addr), &self.host, false); - let apps: Vec<_> = (*self.factory)() - .into_iter() - .map(|h| h.into_handler()) - .collect(); - - ok(HttpServiceHandler::new(apps, self.keep_alive, s)) - } -} - -impl Clone for HttpService -where - H: HttpHandler, - F: IntoHttpHandler, - Io: IoStream, -{ - fn clone(&self) -> HttpService { - HttpService { - addr: self.addr, - factory: self.factory.clone(), - host: self.host.clone(), - keep_alive: self.keep_alive, - _t: PhantomData, - } - } -} - impl HttpServer { /// Start listening for incoming connections. /// @@ -500,8 +454,9 @@ impl HttpServer { for socket in sockets { let Socket { lst, - addr: _, handler, + addr: _, + scheme: _, } = socket; srv = handler.register(srv, lst, self.host.clone(), self.keep_alive); } @@ -597,6 +552,43 @@ impl HttpServer { // } // } +struct HttpService +where + H: HttpHandler, + F: IntoHttpHandler, + Io: IoStream, +{ + factory: Arc Vec + Send + Sync>, + addr: net::SocketAddr, + host: Option, + keep_alive: KeepAlive, + _t: PhantomData<(H, Io)>, +} + +impl NewService for HttpService +where + H: HttpHandler, + F: IntoHttpHandler, + Io: IoStream, +{ + type Request = Io; + type Response = (); + type Error = (); + type InitError = (); + type Service = HttpServiceHandler; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + let s = ServerSettings::new(Some(self.addr), &self.host, false); + let apps: Vec<_> = (*self.factory)() + .into_iter() + .map(|h| h.into_handler()) + .collect(); + + ok(HttpServiceHandler::new(apps, self.keep_alive, s)) + } +} + struct HttpServiceHandler where H: HttpHandler, @@ -656,21 +648,17 @@ where // } } -trait IoStreamHandler: Send +trait ServiceFactory where H: IntoHttpHandler, { - fn addr(&self) -> net::SocketAddr; - - fn scheme(&self) -> &'static str; - fn register( &self, server: Server, lst: net::TcpListener, host: Option, keep_alive: KeepAlive, ) -> Server; } -struct SimpleHandler +struct SimpleFactory where H: IntoHttpHandler, { @@ -678,27 +666,19 @@ where pub factory: Arc Vec + Send + Sync>, } -impl Clone for SimpleHandler { +impl Clone for SimpleFactory { fn clone(&self) -> Self { - SimpleHandler { + SimpleFactory { addr: self.addr, factory: self.factory.clone(), } } } -impl IoStreamHandler for SimpleHandler +impl ServiceFactory for SimpleFactory where H: IntoHttpHandler + 'static, { - fn addr(&self) -> net::SocketAddr { - self.addr - } - - fn scheme(&self) -> &'static str { - "http" - } - fn register( &self, server: Server, lst: net::TcpListener, host: Option, keep_alive: KeepAlive, @@ -716,6 +696,59 @@ where } } +struct AcceptorFactory +where + F: Fn() -> T + Send + Clone + 'static, + T: NewService, + H: IntoHttpHandler, +{ + pub addr: net::SocketAddr, + pub acceptor: F, + pub factory: Arc Vec + Send + Sync>, +} + +impl Clone for AcceptorFactory +where + F: Fn() -> T + Send + Clone + 'static, + T: NewService, + H: IntoHttpHandler, +{ + fn clone(&self) -> Self { + AcceptorFactory { + addr: self.addr, + acceptor: self.acceptor.clone(), + factory: self.factory.clone(), + } + } +} + +impl ServiceFactory for AcceptorFactory +where + F: Fn() -> T + Send + Clone + 'static, + H: IntoHttpHandler + 'static, + T: NewService + Clone + 'static, + T::Response: IoStream, +{ + fn register( + &self, server: Server, lst: net::TcpListener, host: Option, + keep_alive: KeepAlive, + ) -> Server { + let addr = self.addr; + let factory = self.factory.clone(); + let acceptor = self.acceptor.clone(); + + server.listen(lst, move || { + (acceptor)().and_then(HttpService { + keep_alive, + addr, + host: host.clone(), + factory: factory.clone(), + _t: PhantomData, + }) + }) + } +} + fn create_tcp_listener( addr: net::SocketAddr, backlog: i32, ) -> io::Result { diff --git a/src/server/mod.rs b/src/server/mod.rs index 25eca3a71..111cc87a4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -115,6 +115,8 @@ use futures::{Async, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::TcpStream; +pub use actix_net::{PauseServer, ResumeServer, StopServer}; + mod channel; mod error; pub(crate) mod h1; @@ -128,9 +130,9 @@ pub(crate) mod input; pub(crate) mod message; pub(crate) mod output; pub(crate) mod settings; -mod ssl; -use actix::Message; +mod ssl; +pub use self::ssl::*; pub use self::http::HttpServer; pub use self::message::Request; @@ -221,40 +223,6 @@ impl From> for KeepAlive { } } -/// Pause accepting incoming connections -/// -/// If socket contains some pending connection, they might be dropped. -/// All opened connection remains active. -#[derive(Message)] -pub struct PauseServer; - -/// Resume accepting incoming connections -#[derive(Message)] -pub struct ResumeServer; - -/// Stop incoming connection processing, stop all workers and exit. -/// -/// If server starts with `spawn()` method, then spawned thread get terminated. -pub struct StopServer { - /// Whether to try and shut down gracefully - pub graceful: bool, -} - -impl Message for StopServer { - type Result = Result<(), ()>; -} - -/// Socket id token -#[doc(hidden)] -#[derive(Clone, Copy)] -pub struct Token(usize); - -impl Token { - pub(crate) fn new(val: usize) -> Token { - Token(val) - } -} - /// Low level http request handler #[allow(unused_variables)] pub trait HttpHandler: 'static { diff --git a/src/server/settings.rs b/src/server/settings.rs index 439d0e755..47da515a0 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -303,6 +303,8 @@ impl SharedBytesPool { #[cfg(test)] mod tests { use super::*; + use futures::future; + use tokio::runtime::current_thread; #[test] fn test_date_len() { @@ -311,16 +313,20 @@ mod tests { #[test] fn test_date() { - let settings = WorkerSettings::<()>::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - Connections::default(), - ); - let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf1, true); - let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf2, true); - assert_eq!(buf1, buf2); + let mut rt = current_thread::Runtime::new().unwrap(); + + let _ = rt.block_on(future::lazy(|| { + let settings = WorkerSettings::<()>::new( + Vec::new(), + KeepAlive::Os, + ServerSettings::default(), + ); + let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); + settings.set_date(&mut buf1, true); + let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); + settings.set_date(&mut buf2, true); + assert_eq!(buf1, buf2); + future::ok::<_, ()>(()) + })); } } diff --git a/src/server/ssl/mod.rs b/src/server/ssl/mod.rs index bd931fb82..7101de78a 100644 --- a/src/server/ssl/mod.rs +++ b/src/server/ssl/mod.rs @@ -1,14 +1,14 @@ -#[cfg(feature = "alpn")] +#[cfg(any(feature = "alpn", feature = "ssl"))] mod openssl; -#[cfg(feature = "alpn")] -pub use self::openssl::OpensslAcceptor; +#[cfg(any(feature = "alpn", feature = "ssl"))] +pub use self::openssl::*; -#[cfg(feature = "tls")] -mod nativetls; -#[cfg(feature = "tls")] -pub use self::nativetls::{NativeTlsAcceptor, TlsStream}; +//#[cfg(feature = "tls")] +//mod nativetls; +//#[cfg(feature = "tls")] +//pub use self::nativetls::{NativeTlsAcceptor, TlsStream}; -#[cfg(feature = "rust-tls")] -mod rustls; -#[cfg(feature = "rust-tls")] -pub use self::rustls::RustlsAcceptor; +//#[cfg(feature = "rust-tls")] +//mod rustls; +//#[cfg(feature = "rust-tls")] +//pub use self::rustls::RustlsAcceptor; diff --git a/src/server/ssl/openssl.rs b/src/server/ssl/openssl.rs index 996c510dc..343155233 100644 --- a/src/server/ssl/openssl.rs +++ b/src/server/ssl/openssl.rs @@ -1,80 +1,41 @@ use std::net::Shutdown; use std::{io, time}; -use futures::{Future, Poll}; use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; -use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; +use tokio_openssl::SslStream; -use server::{AcceptorService, IoStream, ServerFlags}; +use server::{IoStream, ServerFlags}; -#[derive(Clone)] -/// Support `SSL` connections via openssl package -/// -/// `alpn` feature enables `OpensslAcceptor` type -pub struct OpensslAcceptor { - acceptor: SslAcceptor, +/// Configure `SslAcceptorBuilder` with enabled `HTTP/2` and `HTTP1.1` support. +pub fn openssl_acceptor(builder: SslAcceptorBuilder) -> io::Result { + openssl_acceptor_with_flags(builder, ServerFlags::HTTP1 | ServerFlags::HTTP2) } -impl OpensslAcceptor { - /// Create `OpensslAcceptor` with enabled `HTTP/2` and `HTTP1.1` support. - pub fn new(builder: SslAcceptorBuilder) -> io::Result { - OpensslAcceptor::with_flags(builder, ServerFlags::HTTP1 | ServerFlags::HTTP2) +/// Configure `SslAcceptorBuilder` with custom server flags. +pub fn openssl_acceptor_with_flags( + mut builder: SslAcceptorBuilder, flags: ServerFlags, +) -> io::Result { + let mut protos = Vec::new(); + if flags.contains(ServerFlags::HTTP1) { + protos.extend(b"\x08http/1.1"); + } + if flags.contains(ServerFlags::HTTP2) { + protos.extend(b"\x02h2"); + builder.set_alpn_select_callback(|_, protos| { + const H2: &[u8] = b"\x02h2"; + if protos.windows(3).any(|window| window == H2) { + Ok(b"h2") + } else { + Err(AlpnError::NOACK) + } + }); } - /// Create `OpensslAcceptor` with custom server flags. - pub fn with_flags( - mut builder: SslAcceptorBuilder, flags: ServerFlags, - ) -> io::Result { - let mut protos = Vec::new(); - if flags.contains(ServerFlags::HTTP1) { - protos.extend(b"\x08http/1.1"); - } - if flags.contains(ServerFlags::HTTP2) { - protos.extend(b"\x02h2"); - builder.set_alpn_select_callback(|_, protos| { - const H2: &[u8] = b"\x02h2"; - if protos.windows(3).any(|window| window == H2) { - Ok(b"h2") - } else { - Err(AlpnError::NOACK) - } - }); - } - - if !protos.is_empty() { - builder.set_alpn_protos(&protos)?; - } - - Ok(OpensslAcceptor { - acceptor: builder.build(), - }) - } -} - -pub struct AcceptorFut(AcceptAsync); - -impl Future for AcceptorFut { - type Item = SslStream; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - self.0 - .poll() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - } -} - -impl AcceptorService for OpensslAcceptor { - type Accepted = SslStream; - type Future = AcceptorFut; - - fn scheme(&self) -> &'static str { - "https" + if !protos.is_empty() { + builder.set_alpn_protos(&protos)?; } - fn accept(&self, io: Io) -> Self::Future { - AcceptorFut(SslAcceptorExt::accept_async(&self.acceptor, io)) - } + Ok(builder.build()) } impl IoStream for SslStream { diff --git a/tests/test_server.rs b/tests/test_server.rs index 52c47dd27..30ee13fb3 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -30,6 +30,7 @@ use modhttp::Request; use rand::distributions::Alphanumeric; use rand::Rng; use tokio::runtime::current_thread::Runtime; +use tokio_current_thread::spawn; use tokio_tcp::TcpStream; use actix_web::*; @@ -904,7 +905,7 @@ fn test_h2() { let (response, _) = client.send_request(request, false).unwrap(); // Spawn a task to run the conn... - current_thread::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); + spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); response.and_then(|response| { assert_eq!(response.status(), http::StatusCode::OK);