From f4c01384ecc7fb863e6e41e5b345920431accc68 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 2 Dec 2019 17:33:11 +0600 Subject: [PATCH] update to latest actix-net --- Cargo.toml | 10 +- actix-framed/Cargo.toml | 3 +- actix-framed/src/app.rs | 5 +- actix-framed/tests/test_server.rs | 1 + actix-http/Cargo.toml | 9 +- actix-http/examples/echo.rs | 1 + actix-http/examples/echo2.rs | 2 +- actix-http/examples/hello-world.rs | 1 + actix-http/src/builder.rs | 51 ++- actix-http/src/config.rs | 24 +- actix-http/src/h1/dispatcher.rs | 17 +- actix-http/src/h1/expect.rs | 5 +- actix-http/src/h1/service.rs | 224 +++++++++----- actix-http/src/h1/upgrade.rs | 5 +- actix-http/src/h2/dispatcher.rs | 10 +- actix-http/src/h2/service.rs | 159 +++++++--- actix-http/src/lib.rs | 7 + actix-http/src/service.rs | 428 ++++++++++---------------- actix-http/src/test.rs | 15 - actix-http/tests/test_client.rs | 6 +- actix-http/tests/test_openssl.rs | 320 +++++++------------ actix-http/tests/test_server.rs | 157 ++++++---- actix-http/tests/test_ws.rs | 1 + actix-web-codegen/tests/test_macro.rs | 5 +- awc/Cargo.toml | 3 +- awc/tests/test_client.rs | 73 +++-- awc/tests/test_ssl_client.rs | 19 +- awc/tests/test_ws.rs | 1 + src/app_service.rs | 12 +- src/server.rs | 67 ++-- src/test.rs | 6 +- test-server/Cargo.toml | 3 +- tests/test_server.rs | 189 ++++++------ 33 files changed, 941 insertions(+), 898 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 689f7b147..441291d3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ secure-cookies = ["actix-http/secure-cookies"] fail = ["actix-http/fail"] # openssl -openssl = ["open-ssl", "actix-server/openssl", "awc/openssl"] +openssl = ["open-ssl", "actix-tls/openssl", "awc/openssl"] # rustls # rustls = ["rust-tls", "actix-server/rustls", "awc/rustls"] @@ -76,10 +76,11 @@ actix-router = "0.1.5" actix-rt = "1.0.0-alpha.1" actix-web-codegen = "0.2.0-alpha.1" actix-http = "0.3.0-alpha.1" -actix-server = "0.8.0-alpha.1" -actix-server-config = "0.3.0-alpha.1" +actix-server = "0.8.0-alpha.2" actix-testing = "0.3.0-alpha.1" actix-threadpool = "0.2.0-alpha.1" +#actix-tls = "0.1.0-alpha.1" +actix-tls = { git = "https://github.com/actix/actix-net.git", optional = true } awc = { version = "0.3.0-alpha.1", optional = true } bytes = "0.4" @@ -123,7 +124,6 @@ actix-web = { path = "." } actix-http = { path = "actix-http" } actix-http-test = { path = "test-server" } actix-web-codegen = { path = "actix-web-codegen" } -# actix-web-actors = { path = "actix-web-actors" } actix-cors = { path = "actix-cors" } actix-identity = { path = "actix-identity" } actix-session = { path = "actix-session" } @@ -136,7 +136,7 @@ actix-connect = { git = "https://github.com/actix/actix-net.git" } actix-rt = { git = "https://github.com/actix/actix-net.git" } actix-macros = { git = "https://github.com/actix/actix-net.git" } actix-server = { git = "https://github.com/actix/actix-net.git" } -actix-server-config = { git = "https://github.com/actix/actix-net.git" } actix-service = { git = "https://github.com/actix/actix-net.git" } actix-testing = { git = "https://github.com/actix/actix-net.git" } +actix-tls = { git = "https://github.com/actix/actix-net.git" } actix-utils = { git = "https://github.com/actix/actix-net.git" } diff --git a/actix-framed/Cargo.toml b/actix-framed/Cargo.toml index 4783daefd..24ca6400d 100644 --- a/actix-framed/Cargo.toml +++ b/actix-framed/Cargo.toml @@ -25,7 +25,6 @@ actix-service = "1.0.0-alpha.1" actix-router = "0.1.2" actix-rt = "1.0.0-alpha.1" actix-http = "0.3.0-alpha.1" -actix-server-config = "0.3.0-alpha.1" bytes = "0.4" futures = "0.3.1" @@ -33,7 +32,7 @@ pin-project = "0.4.6" log = "0.4" [dev-dependencies] -actix-server = { version = "0.8.0-alpha.1", features=["openssl"] } +actix-server = { version = "0.8.0-alpha.1" } actix-connect = { version = "0.3.0-alpha.1", features=["openssl"] } actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] } actix-utils = "0.5.0-alpha.1" diff --git a/actix-framed/src/app.rs b/actix-framed/src/app.rs index f3e746e9f..2f8c800f8 100644 --- a/actix-framed/src/app.rs +++ b/actix-framed/src/app.rs @@ -7,7 +7,6 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::h1::{Codec, SendResponse}; use actix_http::{Error, Request, Response}; use actix_router::{Path, Router, Url}; -use actix_server_config::ServerConfig; use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use futures::future::{ok, FutureExt, LocalBoxFuture}; @@ -97,7 +96,7 @@ where T: AsyncRead + AsyncWrite + Unpin + 'static, S: 'static, { - type Config = ServerConfig; + type Config = (); type Request = (Request, Framed); type Response = (); type Error = Error; @@ -105,7 +104,7 @@ where type Service = FramedAppService; type Future = CreateService; - fn new_service(&self, _: &ServerConfig) -> Self::Future { + fn new_service(&self, _: &()) -> Self::Future { CreateService { fut: self .services diff --git a/actix-framed/tests/test_server.rs b/actix-framed/tests/test_server.rs index 4d1028d31..c272f0f93 100644 --- a/actix-framed/tests/test_server.rs +++ b/actix-framed/tests/test_server.rs @@ -46,6 +46,7 @@ async fn test_simple() { FramedApp::new().service(FramedRoute::get("/index.html").to(ws_service)), ) .finish(|_| future::ok::<_, Error>(Response::NotFound())) + .tcp() }); assert!(srv.ws_at("/test").await.is_err()); diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 9a14abefe..ff47abd1e 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -26,7 +26,7 @@ path = "src/lib.rs" default = [] # openssl -openssl = ["open-ssl", "actix-connect/openssl", "tokio-openssl"] +openssl = ["open-ssl", "actix-tls/openssl", "actix-connect/openssl", "tokio-openssl"] # rustls support # rustls = ["rust-tls", "webpki-roots", "actix-connect/rustls"] @@ -51,9 +51,9 @@ actix-service = "1.0.0-alpha.1" actix-codec = "0.2.0-alpha.1" actix-connect = "1.0.0-alpha.1" actix-utils = "0.5.0-alpha.1" -actix-server-config = "0.3.0-alpha.1" actix-rt = "1.0.0-alpha.1" actix-threadpool = "0.2.0-alpha.1" +actix-tls = { git = "https://github.com/actix/actix-net.git", optional = true } base64 = "0.10" bitflags = "1.0" @@ -103,10 +103,11 @@ tokio-openssl = { version = "0.4.0-alpha.6", optional = true } # webpki-roots = { version = "0.18", optional = true } [dev-dependencies] -#actix-server = { version = "0.8.0-alpha.1", features=["openssl", "rustls"] } -actix-server = { version = "0.8.0-alpha.1", features=["openssl"] } +actix-server = { version = "0.8.0-alpha.1" } actix-connect = { version = "1.0.0-alpha.1", features=["openssl"] } actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] } +#actix-tls = { version = "0.1.0-alpha.1", features=["openssl"] } +actix-tls = { git = "https://github.com/actix/actix-net.git", features=["openssl"] } env_logger = "0.6" serde_derive = "1.0" open-ssl = { version="0.10", package="openssl" } diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index ba81020ca..5b2894f89 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -34,6 +34,7 @@ fn main() -> io::Result<()> { ) } }) + .tcp() })? .run() } diff --git a/actix-http/examples/echo2.rs b/actix-http/examples/echo2.rs index 3776c7d58..07d181277 100644 --- a/actix-http/examples/echo2.rs +++ b/actix-http/examples/echo2.rs @@ -25,7 +25,7 @@ fn main() -> io::Result<()> { Server::build() .bind("echo", "127.0.0.1:8080", || { - HttpService::build().finish(handle_request) + HttpService::build().finish(handle_request).tcp() })? .run() } diff --git a/actix-http/examples/hello-world.rs b/actix-http/examples/hello-world.rs index 6e3820390..7d8576869 100644 --- a/actix-http/examples/hello-world.rs +++ b/actix-http/examples/hello-world.rs @@ -21,6 +21,7 @@ fn main() -> io::Result<()> { res.header("x-head", HeaderValue::from_static("dummy value!")); future::ok::<_, ()>(res.body("Hello world!")) }) + .tcp() })? .run() } diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 7e1dae58f..271abd43f 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -1,9 +1,8 @@ -use std::fmt; use std::marker::PhantomData; use std::rc::Rc; +use std::{fmt, net}; use actix_codec::Framed; -use actix_server_config::ServerConfig as SrvConfig; use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use crate::body::MessageBody; @@ -24,6 +23,8 @@ pub struct HttpServiceBuilder> { keep_alive: KeepAlive, client_timeout: u64, client_disconnect: u64, + secure: bool, + local_addr: Option, expect: X, upgrade: Option, on_connect: Option Box>>, @@ -32,7 +33,7 @@ pub struct HttpServiceBuilder> { impl HttpServiceBuilder> where - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, ::Future: 'static, @@ -43,6 +44,8 @@ where keep_alive: KeepAlive::Timeout(5), client_timeout: 5000, client_disconnect: 0, + secure: false, + local_addr: None, expect: ExpectHandler, upgrade: None, on_connect: None, @@ -53,19 +56,15 @@ where impl HttpServiceBuilder where - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, ::Future: 'static, - X: ServiceFactory, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, ::Future: 'static, - U: ServiceFactory< - Config = SrvConfig, - Request = (Request, Framed), - Response = (), - >, + U: ServiceFactory), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, ::Future: 'static, @@ -78,6 +77,18 @@ where self } + /// Set connection secure state + pub fn secure(mut self) -> Self { + self.secure = true; + self + } + + /// Set the local address that this service is bound to. + pub fn local_addr(mut self, addr: net::SocketAddr) -> Self { + self.local_addr = Some(addr); + self + } + /// Set server client timeout in milliseconds for first request. /// /// Defines a timeout for reading client request header. If a client does not transmit @@ -113,7 +124,7 @@ where pub fn expect(self, expect: F) -> HttpServiceBuilder where F: IntoServiceFactory, - X1: ServiceFactory, + X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, ::Future: 'static, @@ -122,6 +133,8 @@ where keep_alive: self.keep_alive, client_timeout: self.client_timeout, client_disconnect: self.client_disconnect, + secure: self.secure, + local_addr: self.local_addr, expect: expect.into_factory(), upgrade: self.upgrade, on_connect: self.on_connect, @@ -137,7 +150,7 @@ where where F: IntoServiceFactory, U1: ServiceFactory< - Config = SrvConfig, + Config = (), Request = (Request, Framed), Response = (), >, @@ -149,6 +162,8 @@ where keep_alive: self.keep_alive, client_timeout: self.client_timeout, client_disconnect: self.client_disconnect, + secure: self.secure, + local_addr: self.local_addr, expect: self.expect, upgrade: Some(upgrade.into_factory()), on_connect: self.on_connect, @@ -170,7 +185,7 @@ where } /// Finish service configuration and create *http service* for HTTP/1 protocol. - pub fn h1(self, service: F) -> H1Service + pub fn h1(self, service: F) -> H1Service where B: MessageBody, F: IntoServiceFactory, @@ -182,6 +197,8 @@ where self.keep_alive, self.client_timeout, self.client_disconnect, + self.secure, + self.local_addr, ); H1Service::with_config(cfg, service.into_factory()) .expect(self.expect) @@ -190,7 +207,7 @@ where } /// Finish service configuration and create *http service* for HTTP/2 protocol. - pub fn h2(self, service: F) -> H2Service + pub fn h2(self, service: F) -> H2Service where B: MessageBody + 'static, F: IntoServiceFactory, @@ -203,12 +220,14 @@ where self.keep_alive, self.client_timeout, self.client_disconnect, + self.secure, + self.local_addr, ); H2Service::with_config(cfg, service.into_factory()).on_connect(self.on_connect) } /// Finish service configuration and create `HttpService` instance. - pub fn finish(self, service: F) -> HttpService + pub fn finish(self, service: F) -> HttpService where B: MessageBody + 'static, F: IntoServiceFactory, @@ -221,6 +240,8 @@ where self.keep_alive, self.client_timeout, self.client_disconnect, + self.secure, + self.local_addr, ); HttpService::with_config(cfg, service.into_factory()) .expect(self.expect) diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index bab3cdc6d..6ea75e565 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -1,8 +1,8 @@ use std::cell::UnsafeCell; -use std::fmt; use std::fmt::Write; use std::rc::Rc; use std::time::{Duration, Instant}; +use std::{fmt, net}; use actix_rt::time::{delay, delay_for, Delay}; use bytes::BytesMut; @@ -47,6 +47,8 @@ struct Inner { client_timeout: u64, client_disconnect: u64, ka_enabled: bool, + secure: bool, + local_addr: Option, timer: DateService, } @@ -58,7 +60,7 @@ impl Clone for ServiceConfig { impl Default for ServiceConfig { fn default() -> Self { - Self::new(KeepAlive::Timeout(5), 0, 0) + Self::new(KeepAlive::Timeout(5), 0, 0, false, None) } } @@ -68,6 +70,8 @@ impl ServiceConfig { keep_alive: KeepAlive, client_timeout: u64, client_disconnect: u64, + secure: bool, + local_addr: Option, ) -> ServiceConfig { let (keep_alive, ka_enabled) = match keep_alive { KeepAlive::Timeout(val) => (val as u64, true), @@ -85,10 +89,24 @@ impl ServiceConfig { ka_enabled, client_timeout, client_disconnect, + secure, + local_addr, timer: DateService::new(), })) } + #[inline] + /// Returns true if connection is secure(https) + pub fn secure(&self) -> bool { + self.0.secure + } + + #[inline] + /// Returns the local address that this server is bound to. + pub fn local_addr(&self) -> Option { + self.0.local_addr + } + #[inline] /// Keep alive duration if configured. pub fn keep_alive(&self) -> Option { @@ -271,7 +289,7 @@ mod tests { #[actix_rt::test] async fn test_date() { - let settings = ServiceConfig::new(KeepAlive::Os, 0, 0); + let settings = ServiceConfig::new(KeepAlive::Os, 0, 0, false, None); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); settings.set_date(&mut buf1); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 154b3ed40..d5b3a8ed6 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -5,9 +5,8 @@ use std::task::{Context, Poll}; use std::time::Instant; use std::{fmt, io, net}; -use actix_codec::{AsyncRead, Decoder, Encoder, Framed, FramedParts}; +use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; use actix_rt::time::{delay, Delay}; -use actix_server_config::IoStream; use actix_service::Service; use bitflags::bitflags; use bytes::{BufMut, BytesMut}; @@ -168,7 +167,7 @@ impl PartialEq for PollResponse { impl Dispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, S::Response: Into>, @@ -186,6 +185,7 @@ where expect: CloneableService, upgrade: Option>, on_connect: Option>, + peer_addr: Option, ) -> Self { Dispatcher::with_timeout( stream, @@ -197,6 +197,7 @@ where expect, upgrade, on_connect, + peer_addr, ) } @@ -211,6 +212,7 @@ where expect: CloneableService, upgrade: Option>, on_connect: Option>, + peer_addr: Option, ) -> Self { let keepalive = config.keep_alive_enabled(); let flags = if keepalive { @@ -234,7 +236,6 @@ where payload: None, state: State::None, error: None, - peer_addr: io.peer_addr(), messages: VecDeque::new(), io, codec, @@ -244,6 +245,7 @@ where upgrade, on_connect, flags, + peer_addr, ka_expire, ka_timer, }), @@ -253,7 +255,7 @@ where impl InnerDispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, S::Response: Into>, @@ -682,7 +684,7 @@ where impl Unpin for Dispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, S::Response: Into>, @@ -696,7 +698,7 @@ where impl Future for Dispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, S::Response: Into>, @@ -907,6 +909,7 @@ mod tests { CloneableService::new(ExpectHandler), None, None, + None, ); match Pin::new(&mut h1).poll(cx) { Poll::Pending => panic!(), diff --git a/actix-http/src/h1/expect.rs b/actix-http/src/h1/expect.rs index d6b4a9f1e..576b7672e 100644 --- a/actix-http/src/h1/expect.rs +++ b/actix-http/src/h1/expect.rs @@ -1,6 +1,5 @@ use std::task::{Context, Poll}; -use actix_server_config::ServerConfig; use actix_service::{Service, ServiceFactory}; use futures::future::{ok, Ready}; @@ -10,7 +9,7 @@ use crate::request::Request; pub struct ExpectHandler; impl ServiceFactory for ExpectHandler { - type Config = ServerConfig; + type Config = (); type Request = Request; type Response = Request; type Error = Error; @@ -18,7 +17,7 @@ impl ServiceFactory for ExpectHandler { type InitError = Error; type Future = Ready>; - fn new_service(&self, _: &ServerConfig) -> Self::Future { + fn new_service(&self, _: &()) -> Self::Future { ok(ExpectHandler) } } diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 197c92887..abc96e719 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -1,19 +1,19 @@ -use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; +use std::{fmt, net}; -use actix_codec::Framed; -use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig}; -use actix_service::{IntoServiceFactory, Service, ServiceFactory}; +use actix_codec::{AsyncRead, AsyncWrite, Framed}; +use actix_rt::net::TcpStream; +use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use futures::future::{ok, Ready}; use futures::ready; use crate::body::MessageBody; use crate::cloneable::CloneableService; -use crate::config::{KeepAlive, ServiceConfig}; +use crate::config::ServiceConfig; use crate::error::{DispatchError, Error, ParseError}; use crate::helpers::DataFactory; use crate::request::Request; @@ -24,39 +24,25 @@ use super::dispatcher::Dispatcher; use super::{ExpectHandler, Message, UpgradeHandler}; /// `ServiceFactory` implementation for HTTP1 transport -pub struct H1Service> { +pub struct H1Service> { srv: S, cfg: ServiceConfig, expect: X, upgrade: Option, on_connect: Option Box>>, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, B)>, } -impl H1Service +impl H1Service where - S: ServiceFactory, + S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, { - /// Create new `HttpService` instance with default config. - pub fn new>(service: F) -> Self { - let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0); - - H1Service { - cfg, - srv: service.into_factory(), - expect: ExpectHandler, - upgrade: None, - on_connect: None, - _t: PhantomData, - } - } - /// Create new `HttpService` instance with config. - pub fn with_config>( + pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { @@ -71,15 +57,102 @@ where } } -impl H1Service +impl H1Service where - S: ServiceFactory, + S: ServiceFactory, + S::Error: Into, + S::InitError: fmt::Debug, + S::Response: Into>, + B: MessageBody, + X: ServiceFactory, + X::Error: Into, + X::InitError: fmt::Debug, + U: ServiceFactory< + Config = (), + Request = (Request, Framed), + Response = (), + >, + U::Error: fmt::Display, + U::InitError: fmt::Debug, +{ + /// Create simple tcp stream service + pub fn tcp( + self, + ) -> impl ServiceFactory< + Config = (), + Request = TcpStream, + Response = (), + Error = DispatchError, + InitError = (), + > { + pipeline_factory(|io: TcpStream| { + let peer_addr = io.peer_addr().ok(); + ok((io, peer_addr)) + }) + .and_then(self) + } +} + +#[cfg(feature = "openssl")] +mod openssl { + use super::*; + + use actix_tls::openssl::{Acceptor, SslStream}; + use actix_tls::{openssl::HandshakeError, SslError}; + use open_ssl::ssl::SslAcceptor; + + impl H1Service, S, B, X, U> + where + S: ServiceFactory, + S::Error: Into, + S::InitError: fmt::Debug, + S::Response: Into>, + B: MessageBody, + X: ServiceFactory, + X::Error: Into, + X::InitError: fmt::Debug, + U: ServiceFactory< + Config = (), + Request = (Request, Framed, Codec>), + Response = (), + >, + U::Error: fmt::Display, + U::InitError: fmt::Debug, + { + /// Create openssl based service + pub fn openssl( + self, + acceptor: SslAcceptor, + ) -> impl ServiceFactory< + Config = (), + Request = TcpStream, + Response = (), + Error = SslError, DispatchError>, + InitError = (), + > { + pipeline_factory( + Acceptor::new(acceptor) + .map_err(SslError::Ssl) + .map_init_err(|_| panic!()), + ) + .and_then(|io: SslStream| { + let peer_addr = io.get_ref().peer_addr().ok(); + ok((io, peer_addr)) + }) + .and_then(self.map_err(SslError::Service)) + } + } +} + +impl H1Service +where + S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, { - pub fn expect(self, expect: X1) -> H1Service + pub fn expect(self, expect: X1) -> H1Service where X1: ServiceFactory, X1::Error: Into, @@ -95,7 +168,7 @@ where } } - pub fn upgrade(self, upgrade: Option) -> H1Service + pub fn upgrade(self, upgrade: Option) -> H1Service where U1: ServiceFactory), Response = ()>, U1::Error: fmt::Display, @@ -121,34 +194,30 @@ where } } -impl ServiceFactory for H1Service +impl ServiceFactory for H1Service where - T: IoStream, - S: ServiceFactory, + T: AsyncRead + AsyncWrite + Unpin, + S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, - X: ServiceFactory, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - U: ServiceFactory< - Config = SrvConfig, - Request = (Request, Framed), - Response = (), - >, + U: ServiceFactory), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, { - type Config = SrvConfig; - type Request = Io; + type Config = (); + type Request = (T, Option); type Response = (); type Error = DispatchError; type InitError = (); - type Service = H1ServiceHandler; - type Future = H1ServiceResponse; + type Service = H1ServiceHandler; + type Future = H1ServiceResponse; - fn new_service(&self, cfg: &SrvConfig) -> Self::Future { + fn new_service(&self, cfg: &()) -> Self::Future { H1ServiceResponse { fut: self.srv.new_service(cfg), fut_ex: Some(self.expect.new_service(cfg)), @@ -164,7 +233,7 @@ where #[doc(hidden)] #[pin_project::pin_project] -pub struct H1ServiceResponse +pub struct H1ServiceResponse where S: ServiceFactory, S::Error: Into, @@ -186,12 +255,12 @@ where upgrade: Option, on_connect: Option Box>>, cfg: Option, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, B)>, } -impl Future for H1ServiceResponse +impl Future for H1ServiceResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: ServiceFactory, S::Error: Into, S::Response: Into>, @@ -204,8 +273,7 @@ where U::Error: fmt::Display, U::InitError: fmt::Debug, { - type Output = - Result, ()>; + type Output = Result, ()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut this = self.as_mut().project(); @@ -247,16 +315,16 @@ where } /// `Service` implementation for HTTP1 transport -pub struct H1ServiceHandler { +pub struct H1ServiceHandler { srv: CloneableService, expect: CloneableService, upgrade: Option>, on_connect: Option Box>>, cfg: ServiceConfig, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, B)>, } -impl H1ServiceHandler +impl H1ServiceHandler where S: Service, S::Error: Into, @@ -273,7 +341,7 @@ where expect: X, upgrade: Option, on_connect: Option Box>>, - ) -> H1ServiceHandler { + ) -> H1ServiceHandler { H1ServiceHandler { srv: CloneableService::new(srv), expect: CloneableService::new(expect), @@ -285,9 +353,9 @@ where } } -impl Service for H1ServiceHandler +impl Service for H1ServiceHandler where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, S::Response: Into>, @@ -297,7 +365,7 @@ where U: Service), Response = ()>, U::Error: fmt::Display, { - type Request = Io; + type Request = (T, Option); type Response = (); type Error = DispatchError; type Future = Dispatcher; @@ -331,9 +399,7 @@ where } } - fn call(&mut self, req: Self::Request) -> Self::Future { - let io = req.into_parts().0; - + fn call(&mut self, (io, addr): Self::Request) -> Self::Future { let on_connect = if let Some(ref on_connect) = self.on_connect { Some(on_connect(&io)) } else { @@ -347,20 +413,21 @@ where self.expect.clone(), self.upgrade.clone(), on_connect, + addr, ) } } /// `ServiceFactory` implementation for `OneRequestService` service #[derive(Default)] -pub struct OneRequest { +pub struct OneRequest { config: ServiceConfig, - _t: PhantomData<(T, P)>, + _t: PhantomData, } -impl OneRequest +impl OneRequest where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, { /// Create new `H1SimpleService` instance. pub fn new() -> Self { @@ -371,38 +438,38 @@ where } } -impl ServiceFactory for OneRequest +impl ServiceFactory for OneRequest where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, { - type Config = SrvConfig; - type Request = Io; + type Config = (); + type Request = T; type Response = (Request, Framed); type Error = ParseError; type InitError = (); - type Service = OneRequestService; + type Service = OneRequestService; type Future = Ready>; - fn new_service(&self, _: &SrvConfig) -> Self::Future { + fn new_service(&self, _: &()) -> Self::Future { ok(OneRequestService { - config: self.config.clone(), _t: PhantomData, + config: self.config.clone(), }) } } /// `Service` implementation for HTTP1 transport. Reads one request and returns /// request and framed object. -pub struct OneRequestService { +pub struct OneRequestService { + _t: PhantomData, config: ServiceConfig, - _t: PhantomData<(T, P)>, } -impl Service for OneRequestService +impl Service for OneRequestService where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, { - type Request = Io; + type Request = T; type Response = (Request, Framed); type Error = ParseError; type Future = OneRequestServiceResponse; @@ -413,10 +480,7 @@ where fn call(&mut self, req: Self::Request) -> Self::Future { OneRequestServiceResponse { - framed: Some(Framed::new( - req.into_parts().0, - Codec::new(self.config.clone()), - )), + framed: Some(Framed::new(req, Codec::new(self.config.clone()))), } } } @@ -424,14 +488,14 @@ where #[doc(hidden)] pub struct OneRequestServiceResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, { framed: Option>, } impl Future for OneRequestServiceResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, { type Output = Result<(Request, Framed), ParseError>; diff --git a/actix-http/src/h1/upgrade.rs b/actix-http/src/h1/upgrade.rs index ce46fbe93..e84230ac7 100644 --- a/actix-http/src/h1/upgrade.rs +++ b/actix-http/src/h1/upgrade.rs @@ -2,7 +2,6 @@ use std::marker::PhantomData; use std::task::{Context, Poll}; use actix_codec::Framed; -use actix_server_config::ServerConfig; use actix_service::{Service, ServiceFactory}; use futures::future::Ready; @@ -13,7 +12,7 @@ use crate::request::Request; pub struct UpgradeHandler(PhantomData); impl ServiceFactory for UpgradeHandler { - type Config = ServerConfig; + type Config = (); type Request = (Request, Framed); type Response = (); type Error = Error; @@ -21,7 +20,7 @@ impl ServiceFactory for UpgradeHandler { type InitError = Error; type Future = Ready>; - fn new_service(&self, _: &ServerConfig) -> Self::Future { + fn new_service(&self, _: &()) -> Self::Future { unimplemented!() } } diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 188553806..e6e8967df 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -8,7 +8,6 @@ use std::{fmt, mem, net}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::time::Delay; -use actix_server_config::IoStream; use actix_service::Service; use bitflags::bitflags; use bytes::{Bytes, BytesMut}; @@ -36,7 +35,10 @@ const CHUNK_SIZE: usize = 16_384; /// Dispatcher for HTTP/2 protocol #[pin_project::pin_project] -pub struct Dispatcher, B: MessageBody> { +pub struct Dispatcher, B: MessageBody> +where + T: AsyncRead + AsyncWrite + Unpin, +{ service: CloneableService, connection: Connection, on_connect: Option>, @@ -49,7 +51,7 @@ pub struct Dispatcher, B: MessageBody impl Dispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, // S::Future: 'static, @@ -95,7 +97,7 @@ where impl Future for Dispatcher where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 860a61f73..a2c8275a1 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -6,8 +6,11 @@ use std::task::{Context, Poll}; use std::{io, net, rc}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig}; -use actix_service::{IntoServiceFactory, Service, ServiceFactory}; +use actix_rt::net::TcpStream; +use actix_service::{ + factory_fn, pipeline_factory, service_fn2, IntoServiceFactory, Service, + ServiceFactory, +}; use bytes::Bytes; use futures::future::{ok, Ready}; use futures::{ready, Stream}; @@ -23,39 +26,28 @@ use crate::helpers::DataFactory; use crate::payload::Payload; use crate::request::Request; use crate::response::Response; +use crate::Protocol; use super::dispatcher::Dispatcher; /// `ServiceFactory` implementation for HTTP2 transport -pub struct H2Service { +pub struct H2Service { srv: S, cfg: ServiceConfig, on_connect: Option Box>>, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, B)>, } -impl H2Service +impl H2Service where - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::Response: Into> + 'static, ::Future: 'static, B: MessageBody + 'static, { - /// Create new `HttpService` instance. - pub fn new>(service: F) -> Self { - let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0); - - H2Service { - cfg, - on_connect: None, - srv: service.into_factory(), - _t: PhantomData, - } - } - /// Create new `HttpService` instance with config. - pub fn with_config>( + pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { @@ -77,24 +69,98 @@ where } } -impl ServiceFactory for H2Service +impl H2Service where - T: IoStream, - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::Response: Into> + 'static, ::Future: 'static, B: MessageBody + 'static, { - type Config = SrvConfig; - type Request = Io; + /// Create simple tcp based service + pub fn tcp( + self, + ) -> impl ServiceFactory< + Config = (), + Request = TcpStream, + Response = (), + Error = DispatchError, + InitError = S::InitError, + > { + pipeline_factory(factory_fn(|| { + async { + Ok::<_, S::InitError>(service_fn2(|io: TcpStream| { + let peer_addr = io.peer_addr().ok(); + ok::<_, DispatchError>((io, peer_addr)) + })) + } + })) + .and_then(self) + } +} + +#[cfg(feature = "openssl")] +mod openssl { + use actix_service::{factory_fn, service_fn2}; + use actix_tls::openssl::{Acceptor, SslStream}; + use actix_tls::{openssl::HandshakeError, SslError}; + use open_ssl::ssl::SslAcceptor; + + use super::*; + + impl H2Service, S, B> + where + S: ServiceFactory, + S::Error: Into + 'static, + S::Response: Into> + 'static, + ::Future: 'static, + B: MessageBody + 'static, + { + /// Create ssl based service + pub fn openssl( + self, + acceptor: SslAcceptor, + ) -> impl ServiceFactory< + Config = (), + Request = TcpStream, + Response = (), + Error = SslError, DispatchError>, + InitError = S::InitError, + > { + pipeline_factory( + Acceptor::new(acceptor) + .map_err(SslError::Ssl) + .map_init_err(|_| panic!()), + ) + .and_then(factory_fn(|| { + ok::<_, S::InitError>(service_fn2(|io: SslStream| { + let peer_addr = io.get_ref().peer_addr().ok(); + ok((io, peer_addr)) + })) + })) + .and_then(self.map_err(SslError::Service)) + } + } +} + +impl ServiceFactory for H2Service +where + T: AsyncRead + AsyncWrite + Unpin, + S: ServiceFactory, + S::Error: Into + 'static, + S::Response: Into> + 'static, + ::Future: 'static, + B: MessageBody + 'static, +{ + type Config = (); + type Request = (T, Option); type Response = (); type Error = DispatchError; type InitError = S::InitError; - type Service = H2ServiceHandler; - type Future = H2ServiceResponse; + type Service = H2ServiceHandler; + type Future = H2ServiceResponse; - fn new_service(&self, cfg: &SrvConfig) -> Self::Future { + fn new_service(&self, cfg: &()) -> Self::Future { H2ServiceResponse { fut: self.srv.new_service(cfg), cfg: Some(self.cfg.clone()), @@ -106,24 +172,24 @@ where #[doc(hidden)] #[pin_project::pin_project] -pub struct H2ServiceResponse { +pub struct H2ServiceResponse { #[pin] fut: S::Future, cfg: Option, on_connect: Option Box>>, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, B)>, } -impl Future for H2ServiceResponse +impl Future for H2ServiceResponse where - T: IoStream, - S: ServiceFactory, + T: AsyncRead + AsyncWrite + Unpin, + S: ServiceFactory, S::Error: Into + 'static, S::Response: Into> + 'static, ::Future: 'static, B: MessageBody + 'static, { - type Output = Result, S::InitError>; + type Output = Result, S::InitError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.as_mut().project(); @@ -140,14 +206,14 @@ where } /// `Service` implementation for http/2 transport -pub struct H2ServiceHandler { +pub struct H2ServiceHandler { srv: CloneableService, cfg: ServiceConfig, on_connect: Option Box>>, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, B)>, } -impl H2ServiceHandler +impl H2ServiceHandler where S: Service, S::Error: Into + 'static, @@ -159,7 +225,7 @@ where cfg: ServiceConfig, on_connect: Option Box>>, srv: S, - ) -> H2ServiceHandler { + ) -> H2ServiceHandler { H2ServiceHandler { cfg, on_connect, @@ -169,16 +235,16 @@ where } } -impl Service for H2ServiceHandler +impl Service for H2ServiceHandler where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, { - type Request = Io; + type Request = (T, Option); type Response = (); type Error = DispatchError; type Future = H2ServiceHandlerResponse; @@ -191,9 +257,7 @@ where }) } - fn call(&mut self, req: Self::Request) -> Self::Future { - let io = req.into_parts().0; - let peer_addr = io.peer_addr(); + fn call(&mut self, (io, addr): Self::Request) -> Self::Future { let on_connect = if let Some(ref on_connect) = self.on_connect { Some(on_connect(&io)) } else { @@ -204,7 +268,7 @@ where state: State::Handshake( Some(self.srv.clone()), Some(self.cfg.clone()), - peer_addr, + addr, on_connect, server::handshake(io), ), @@ -212,8 +276,9 @@ where } } -enum State, B: MessageBody> +enum State, B: MessageBody> where + T: AsyncRead + AsyncWrite + Unpin, S::Future: 'static, { Incoming(Dispatcher), @@ -228,7 +293,7 @@ where pub struct H2ServiceHandlerResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, @@ -240,7 +305,7 @@ where impl Future for H2ServiceHandlerResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index b57fdddce..e476623d1 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -64,3 +64,10 @@ pub mod http { pub use crate::header::ContentEncoding; pub use crate::message::ConnectionType; } + +/// Http protocol +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub enum Protocol { + Http1, + Http2, +} diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 7340c15fd..8220421ab 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -1,15 +1,13 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{fmt, io, net, rc}; +use std::{fmt, net, rc}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_server_config::{ - Io as ServerIo, IoStream, Protocol, ServerConfig as SrvConfig, -}; -use actix_service::{IntoServiceFactory, Service, ServiceFactory}; -use bytes::{BufMut, Bytes, BytesMut}; -use futures::{ready, Future}; +use actix_rt::net::TcpStream; +use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; +use bytes::Bytes; +use futures::{future::ok, ready, Future}; use h2::server::{self, Handshake}; use pin_project::{pin_project, project}; @@ -21,21 +19,21 @@ use crate::error::{DispatchError, Error}; use crate::helpers::DataFactory; use crate::request::Request; use crate::response::Response; -use crate::{h1, h2::Dispatcher}; +use crate::{h1, h2::Dispatcher, Protocol}; /// `ServiceFactory` HTTP1.1/HTTP2 transport implementation -pub struct HttpService> { +pub struct HttpService> { srv: S, cfg: ServiceConfig, expect: X, upgrade: Option, on_connect: Option Box>>, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, B)>, } -impl HttpService +impl HttpService where - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, @@ -48,9 +46,9 @@ where } } -impl HttpService +impl HttpService where - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, @@ -59,7 +57,7 @@ where { /// Create new `HttpService` instance. pub fn new>(service: F) -> Self { - let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0); + let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None); HttpService { cfg, @@ -87,9 +85,9 @@ where } } -impl HttpService +impl HttpService where - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, @@ -101,9 +99,9 @@ where /// Service get called with request that contains `EXPECT` header. /// Service must return request in case of success, in that case /// request will be forwarded to main service. - pub fn expect(self, expect: X1) -> HttpService + pub fn expect(self, expect: X1) -> HttpService where - X1: ServiceFactory, + X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, ::Future: 'static, @@ -122,10 +120,10 @@ where /// /// If service is provided then normal requests handling get halted /// and this service get called with original request and framed object. - pub fn upgrade(self, upgrade: Option) -> HttpService + pub fn upgrade(self, upgrade: Option) -> HttpService where U1: ServiceFactory< - Config = SrvConfig, + Config = (), Request = (Request, Framed), Response = (), >, @@ -153,21 +151,122 @@ where } } -impl ServiceFactory for HttpService +impl HttpService where - T: IoStream, - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, ::Future: 'static, B: MessageBody + 'static, - X: ServiceFactory, + X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, ::Future: 'static, U: ServiceFactory< - Config = SrvConfig, + Config = (), + Request = (Request, Framed), + Response = (), + >, + U::Error: fmt::Display, + U::InitError: fmt::Debug, + ::Future: 'static, +{ + /// Create simple tcp stream service + pub fn tcp( + self, + ) -> impl ServiceFactory< + Config = (), + Request = TcpStream, + Response = (), + Error = DispatchError, + InitError = (), + > { + pipeline_factory(|io: TcpStream| { + let peer_addr = io.peer_addr().ok(); + ok((io, Protocol::Http1, peer_addr)) + }) + .and_then(self) + } +} + +#[cfg(feature = "openssl")] +mod openssl { + use super::*; + use actix_tls::openssl::{Acceptor, SslStream}; + use actix_tls::{openssl::HandshakeError, SslError}; + use open_ssl::ssl::SslAcceptor; + + impl HttpService, S, B, X, U> + where + S: ServiceFactory, + S::Error: Into + 'static, + S::InitError: fmt::Debug, + S::Response: Into> + 'static, + ::Future: 'static, + B: MessageBody + 'static, + X: ServiceFactory, + X::Error: Into, + X::InitError: fmt::Debug, + ::Future: 'static, + U: ServiceFactory< + Config = (), + Request = (Request, Framed, h1::Codec>), + Response = (), + >, + U::Error: fmt::Display, + U::InitError: fmt::Debug, + ::Future: 'static, + { + /// Create openssl based service + pub fn openssl( + self, + acceptor: SslAcceptor, + ) -> impl ServiceFactory< + Config = (), + Request = TcpStream, + Response = (), + Error = SslError, DispatchError>, + InitError = (), + > { + pipeline_factory( + Acceptor::new(acceptor) + .map_err(SslError::Ssl) + .map_init_err(|_| panic!()), + ) + .and_then(|io: SslStream| { + let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() { + if protos.windows(2).any(|window| window == b"h2") { + Protocol::Http2 + } else { + Protocol::Http1 + } + } else { + Protocol::Http1 + }; + let peer_addr = io.get_ref().peer_addr().ok(); + ok((io, proto, peer_addr)) + }) + .and_then(self.map_err(SslError::Service)) + } + } +} + +impl ServiceFactory for HttpService +where + T: AsyncRead + AsyncWrite + Unpin, + S: ServiceFactory, + S::Error: Into + 'static, + S::InitError: fmt::Debug, + S::Response: Into> + 'static, + ::Future: 'static, + B: MessageBody + 'static, + X: ServiceFactory, + X::Error: Into, + X::InitError: fmt::Debug, + ::Future: 'static, + U: ServiceFactory< + Config = (), Request = (Request, Framed), Response = (), >, @@ -175,15 +274,15 @@ where U::InitError: fmt::Debug, ::Future: 'static, { - type Config = SrvConfig; - type Request = ServerIo; + type Config = (); + type Request = (T, Protocol, Option); type Response = (); type Error = DispatchError; type InitError = (); - type Service = HttpServiceHandler; - type Future = HttpServiceResponse; + type Service = HttpServiceHandler; + type Future = HttpServiceResponse; - fn new_service(&self, cfg: &SrvConfig) -> Self::Future { + fn new_service(&self, cfg: &()) -> Self::Future { HttpServiceResponse { fut: self.srv.new_service(cfg), fut_ex: Some(self.expect.new_service(cfg)), @@ -191,7 +290,7 @@ where expect: None, upgrade: None, on_connect: self.on_connect.clone(), - cfg: Some(self.cfg.clone()), + cfg: self.cfg.clone(), _t: PhantomData, } } @@ -201,7 +300,6 @@ where #[pin_project] pub struct HttpServiceResponse< T, - P, S: ServiceFactory, B, X: ServiceFactory, @@ -216,13 +314,13 @@ pub struct HttpServiceResponse< expect: Option, upgrade: Option, on_connect: Option Box>>, - cfg: Option, - _t: PhantomData<(T, P, B)>, + cfg: ServiceConfig, + _t: PhantomData<(T, B)>, } -impl Future for HttpServiceResponse +impl Future for HttpServiceResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, @@ -239,7 +337,7 @@ where ::Future: 'static, { type Output = - Result, ()>; + Result, ()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut this = self.as_mut().project(); @@ -269,7 +367,7 @@ where Poll::Ready(result.map(|service| { let this = self.as_mut().project(); HttpServiceHandler::new( - this.cfg.take().unwrap(), + this.cfg.clone(), service, this.expect.take().unwrap(), this.upgrade.take(), @@ -280,16 +378,16 @@ where } /// `Service` implementation for http transport -pub struct HttpServiceHandler { +pub struct HttpServiceHandler { srv: CloneableService, expect: CloneableService, upgrade: Option>, cfg: ServiceConfig, on_connect: Option Box>>, - _t: PhantomData<(T, P, B, X)>, + _t: PhantomData<(T, B, X)>, } -impl HttpServiceHandler +impl HttpServiceHandler where S: Service, S::Error: Into + 'static, @@ -307,7 +405,7 @@ where expect: X, upgrade: Option, on_connect: Option Box>>, - ) -> HttpServiceHandler { + ) -> HttpServiceHandler { HttpServiceHandler { cfg, on_connect, @@ -319,9 +417,9 @@ where } } -impl Service for HttpServiceHandler +impl Service for HttpServiceHandler where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, @@ -332,7 +430,7 @@ where U: Service), Response = ()>, U::Error: fmt::Display, { - type Request = ServerIo; + type Request = (T, Protocol, Option); type Response = (); type Error = DispatchError; type Future = HttpServiceHandlerResponse; @@ -366,9 +464,7 @@ where } } - fn call(&mut self, req: Self::Request) -> Self::Future { - let (io, _, proto) = req.into_parts(); - + fn call(&mut self, (io, proto, peer_addr): Self::Request) -> Self::Future { let on_connect = if let Some(ref on_connect) = self.on_connect { Some(on_connect(&io)) } else { @@ -376,23 +472,16 @@ where }; match proto { - Protocol::Http2 => { - let peer_addr = io.peer_addr(); - let io = Io { - inner: io, - unread: None, - }; - HttpServiceHandlerResponse { - state: State::Handshake(Some(( - server::handshake(io), - self.cfg.clone(), - self.srv.clone(), - peer_addr, - on_connect, - ))), - } - } - Protocol::Http10 | Protocol::Http11 => HttpServiceHandlerResponse { + Protocol::Http2 => HttpServiceHandlerResponse { + state: State::H2Handshake(Some(( + server::handshake(io), + self.cfg.clone(), + self.srv.clone(), + on_connect, + peer_addr, + ))), + }, + Protocol::Http1 => HttpServiceHandlerResponse { state: State::H1(h1::Dispatcher::new( io, self.cfg.clone(), @@ -400,19 +489,9 @@ where self.expect.clone(), self.upgrade.clone(), on_connect, + peer_addr, )), }, - _ => HttpServiceHandlerResponse { - state: State::Unknown(Some(( - io, - BytesMut::with_capacity(14), - self.cfg.clone(), - self.srv.clone(), - self.expect.clone(), - self.upgrade.clone(), - on_connect, - ))), - }, } } } @@ -423,7 +502,7 @@ where S: Service, S::Future: 'static, S::Error: Into, - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, B: MessageBody, X: Service, X::Error: Into, @@ -431,25 +510,14 @@ where U::Error: fmt::Display, { H1(#[pin] h1::Dispatcher), - H2(#[pin] Dispatcher, S, B>), - Unknown( + H2(#[pin] Dispatcher), + H2Handshake( Option<( - T, - BytesMut, + Handshake, ServiceConfig, CloneableService, - CloneableService, - Option>, Option>, - )>, - ), - Handshake( - Option<( - Handshake, Bytes>, - ServiceConfig, - CloneableService, Option, - Option>, )>, ), } @@ -457,7 +525,7 @@ where #[pin_project] pub struct HttpServiceHandlerResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, @@ -472,11 +540,9 @@ where state: State, } -const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; - impl Future for HttpServiceHandlerResponse where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, @@ -496,7 +562,7 @@ where impl State where - T: IoStream, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Response: Into> + 'static, @@ -515,57 +581,7 @@ where match self.as_mut().project() { State::H1(disp) => disp.poll(cx), State::H2(disp) => disp.poll(cx), - State::Unknown(ref mut data) => { - if let Some(ref mut item) = data { - loop { - // Safety - we only write to the returned slice. - let b = unsafe { item.1.bytes_mut() }; - let n = ready!(Pin::new(&mut item.0).poll_read(cx, b))?; - if n == 0 { - return Poll::Ready(Ok(())); - } - // Safety - we know that 'n' bytes have - // been initialized via the contract of - // 'poll_read' - unsafe { item.1.advance_mut(n) }; - if item.1.len() >= HTTP2_PREFACE.len() { - break; - } - } - } else { - panic!() - } - let (io, buf, cfg, srv, expect, upgrade, on_connect) = - data.take().unwrap(); - if buf[..14] == HTTP2_PREFACE[..] { - let peer_addr = io.peer_addr(); - let io = Io { - inner: io, - unread: Some(buf), - }; - self.set(State::Handshake(Some(( - server::handshake(io), - cfg, - srv, - peer_addr, - on_connect, - )))); - } else { - self.set(State::H1(h1::Dispatcher::with_timeout( - io, - h1::Codec::new(cfg.clone()), - cfg, - buf, - None, - srv, - expect, - upgrade, - on_connect, - ))) - } - self.poll(cx) - } - State::Handshake(ref mut data) => { + State::H2Handshake(ref mut data) => { let conn = if let Some(ref mut item) = data { match Pin::new(&mut item.0).poll(cx) { Poll::Ready(Ok(conn)) => conn, @@ -578,7 +594,7 @@ where } else { panic!() }; - let (_, cfg, srv, peer_addr, on_connect) = data.take().unwrap(); + let (_, cfg, srv, on_connect, peer_addr) = data.take().unwrap(); self.set(State::H2(Dispatcher::new( srv, conn, on_connect, cfg, None, peer_addr, ))); @@ -587,117 +603,3 @@ where } } } - -/// Wrapper for `AsyncRead + AsyncWrite` types -#[pin_project::pin_project] -struct Io { - unread: Option, - #[pin] - inner: T, -} - -impl io::Read for Io { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if let Some(mut bytes) = self.unread.take() { - let size = std::cmp::min(buf.len(), bytes.len()); - buf[..size].copy_from_slice(&bytes[..size]); - if bytes.len() > size { - bytes.split_to(size); - self.unread = Some(bytes); - } - Ok(size) - } else { - self.inner.read(buf) - } - } -} - -impl io::Write for Io { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner.write(buf) - } - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - -impl AsyncRead for Io { - // unsafe fn initializer(&self) -> io::Initializer { - // self.get_mut().inner.initializer() - // } - - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let this = self.project(); - - if let Some(mut bytes) = this.unread.take() { - let size = std::cmp::min(buf.len(), bytes.len()); - buf[..size].copy_from_slice(&bytes[..size]); - if bytes.len() > size { - bytes.split_to(size); - *this.unread = Some(bytes); - } - Poll::Ready(Ok(size)) - } else { - this.inner.poll_read(cx, buf) - } - } - - // fn poll_read_vectored( - // self: Pin<&mut Self>, - // cx: &mut Context<'_>, - // bufs: &mut [io::IoSliceMut<'_>], - // ) -> Poll> { - // self.get_mut().inner.poll_read_vectored(cx, bufs) - // } -} - -impl actix_codec::AsyncWrite for Io { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project().inner.poll_shutdown(cx) - } -} - -impl actix_server_config::IoStream for Io { - #[inline] - fn peer_addr(&self) -> Option { - self.inner.peer_addr() - } - - #[inline] - fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { - self.inner.set_nodelay(nodelay) - } - - #[inline] - fn set_linger(&mut self, dur: Option) -> io::Result<()> { - self.inner.set_linger(dur) - } - - #[inline] - fn set_keepalive(&mut self, dur: Option) -> io::Result<()> { - self.inner.set_keepalive(dur) - } -} diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 744f057dc..ebb7bda37 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -6,7 +6,6 @@ use std::str::FromStr; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; -use actix_server_config::IoStream; use bytes::{Bytes, BytesMut}; use http::header::{self, HeaderName, HeaderValue}; use http::{HttpTryFrom, Method, Uri, Version}; @@ -272,17 +271,3 @@ impl AsyncWrite for TestBuffer { Poll::Ready(Ok(())) } } - -impl IoStream for TestBuffer { - fn set_nodelay(&mut self, _nodelay: bool) -> io::Result<()> { - Ok(()) - } - - fn set_linger(&mut self, _dur: Option) -> io::Result<()> { - Ok(()) - } - - fn set_keepalive(&mut self, _dur: Option) -> io::Result<()> { - Ok(()) - } -} diff --git a/actix-http/tests/test_client.rs b/actix-http/tests/test_client.rs index cdcaea028..711ee7afd 100644 --- a/actix-http/tests/test_client.rs +++ b/actix-http/tests/test_client.rs @@ -30,7 +30,9 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ #[actix_rt::test] async fn test_h1_v2() { let srv = TestServer::start(move || { - HttpService::build().finish(|_| future::ok::<_, ()>(Response::Ok().body(STR))) + HttpService::build() + .finish(|_| future::ok::<_, ()>(Response::Ok().body(STR))) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -57,6 +59,7 @@ async fn test_connection_close() { let srv = TestServer::start(move || { HttpService::build() .finish(|_| ok::<_, ()>(Response::Ok().body(STR))) + .tcp() .map(|_| ()) }); @@ -75,6 +78,7 @@ async fn test_with_query_parameter() { ok::<_, ()>(Response::BadRequest().finish()) } }) + .tcp() .map(|_| ()) }); diff --git a/actix-http/tests/test_openssl.rs b/actix-http/tests/test_openssl.rs index 0fdddaa1c..35e234af9 100644 --- a/actix-http/tests/test_openssl.rs +++ b/actix-http/tests/test_openssl.rs @@ -1,11 +1,8 @@ #![cfg(feature = "openssl")] use std::io; -use actix_codec::{AsyncRead, AsyncWrite}; use actix_http_test::TestServer; -use actix_server::ssl::OpensslAcceptor; -use actix_server_config::ServerConfig; -use actix_service::{factory_fn_cfg, pipeline_factory, service_fn2, ServiceFactory}; +use actix_service::{service_fn, ServiceFactory}; use bytes::{Bytes, BytesMut}; use futures::future::{err, ok, ready}; @@ -36,7 +33,7 @@ where Ok(body) } -fn ssl_acceptor() -> io::Result> { +fn ssl_acceptor() -> SslAcceptor { // load ssl keys let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); builder @@ -47,30 +44,29 @@ fn ssl_acceptor() -> io::Result io::Result<()> { - let openssl = ssl_acceptor()?; let srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|_| ok::<_, Error>(Response::Ok().finish())) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|_| ok::<_, Error>(Response::Ok().finish())) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -80,22 +76,15 @@ async fn test_h2() -> io::Result<()> { #[actix_rt::test] async fn test_h2_1() -> io::Result<()> { - let openssl = ssl_acceptor()?; let srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .finish(|req: Request| { - assert!(req.peer_addr().is_some()); - assert_eq!(req.version(), Version::HTTP_2); - ok::<_, Error>(Response::Ok().finish()) - }) - .map_err(|_| ()), - ) + HttpService::build() + .finish(|req: Request| { + assert!(req.peer_addr().is_some()); + assert_eq!(req.version(), Version::HTTP_2); + ok::<_, Error>(Response::Ok().finish()) + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -106,23 +95,16 @@ async fn test_h2_1() -> io::Result<()> { #[actix_rt::test] async fn test_h2_body() -> io::Result<()> { let data = "HELLOWORLD".to_owned().repeat(64 * 1024); - let openssl = ssl_acceptor()?; let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|mut req: Request<_>| { - async move { - let body = load_body(req.take_payload()).await?; - Ok::<_, Error>(Response::Ok().body(body)) - } - }) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|mut req: Request<_>| { + async move { + let body = load_body(req.take_payload()).await?; + Ok::<_, Error>(Response::Ok().body(body)) + } + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send_body(data.clone()).await.unwrap(); @@ -135,30 +117,22 @@ async fn test_h2_body() -> io::Result<()> { #[actix_rt::test] async fn test_h2_content_length() { - let openssl = ssl_acceptor().unwrap(); - let srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|req: Request| { - let indx: usize = req.uri().path()[1..].parse().unwrap(); - let statuses = [ - StatusCode::NO_CONTENT, - StatusCode::CONTINUE, - StatusCode::SWITCHING_PROTOCOLS, - StatusCode::PROCESSING, - StatusCode::OK, - StatusCode::NOT_FOUND, - ]; - ok::<_, ()>(Response::new(statuses[indx])) - }) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|req: Request| { + let indx: usize = req.uri().path()[1..].parse().unwrap(); + let statuses = [ + StatusCode::NO_CONTENT, + StatusCode::CONTINUE, + StatusCode::SWITCHING_PROTOCOLS, + StatusCode::PROCESSING, + StatusCode::OK, + StatusCode::NOT_FOUND, + ]; + ok::<_, ()>(Response::new(statuses[indx])) + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let header = HeaderName::from_static("content-length"); @@ -193,14 +167,9 @@ async fn test_h2_content_length() { async fn test_h2_headers() { let data = STR.repeat(10); let data2 = data.clone(); - let openssl = ssl_acceptor().unwrap(); let mut srv = TestServer::start(move || { let data = data.clone(); - pipeline_factory(openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e))) - .and_then( HttpService::build().h2(move |_| { let mut builder = Response::Ok(); for idx in 0..90 { @@ -222,7 +191,9 @@ async fn test_h2_headers() { ); } ok::<_, ()>(builder.body(data.clone())) - }).map_err(|_| ())) + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -257,18 +228,11 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ #[actix_rt::test] async fn test_h2_body2() { - let openssl = ssl_acceptor().unwrap(); let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|_| ok::<_, ()>(Response::Ok().body(STR))) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|_| ok::<_, ()>(Response::Ok().body(STR))) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -281,18 +245,11 @@ async fn test_h2_body2() { #[actix_rt::test] async fn test_h2_head_empty() { - let openssl = ssl_acceptor().unwrap(); let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .finish(|_| ok::<_, ()>(Response::Ok().body(STR))) - .map_err(|_| ()), - ) + HttpService::build() + .finish(|_| ok::<_, ()>(Response::Ok().body(STR))) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.shead("/").send().await.unwrap(); @@ -311,22 +268,13 @@ async fn test_h2_head_empty() { #[actix_rt::test] async fn test_h2_head_binary() { - let openssl = ssl_acceptor().unwrap(); let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|_| { - ok::<_, ()>( - Response::Ok().content_length(STR.len() as u64).body(STR), - ) - }) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|_| { + ok::<_, ()>(Response::Ok().content_length(STR.len() as u64).body(STR)) + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.shead("/").send().await.unwrap(); @@ -344,18 +292,11 @@ async fn test_h2_head_binary() { #[actix_rt::test] async fn test_h2_head_binary2() { - let openssl = ssl_acceptor().unwrap(); let srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|_| ok::<_, ()>(Response::Ok().body(STR))) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|_| ok::<_, ()>(Response::Ok().body(STR))) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.shead("/").send().await.unwrap(); @@ -369,24 +310,16 @@ async fn test_h2_head_binary2() { #[actix_rt::test] async fn test_h2_body_length() { - let openssl = ssl_acceptor().unwrap(); let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|_| { - let body = once(ok(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>( - Response::Ok() - .body(body::SizedStream::new(STR.len() as u64, body)), - ) - }) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|_| { + let body = once(ok(Bytes::from_static(STR.as_ref()))); + ok::<_, ()>( + Response::Ok().body(body::SizedStream::new(STR.len() as u64, body)), + ) + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -399,25 +332,18 @@ async fn test_h2_body_length() { #[actix_rt::test] async fn test_h2_body_chunked_explicit() { - let openssl = ssl_acceptor().unwrap(); let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|_| { - let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>( - Response::Ok() - .header(header::TRANSFER_ENCODING, "chunked") - .streaming(body), - ) - }) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|_| { + let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))); + ok::<_, ()>( + Response::Ok() + .header(header::TRANSFER_ENCODING, "chunked") + .streaming(body), + ) + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -433,28 +359,18 @@ async fn test_h2_body_chunked_explicit() { #[actix_rt::test] async fn test_h2_response_http_error_handling() { - let openssl = ssl_acceptor().unwrap(); - let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(factory_fn_cfg(|_: &ServerConfig| { - ok::<_, ()>(service_fn2(|_| { - let broken_header = Bytes::from_static(b"\0\0\0"); - ok::<_, ()>( - Response::Ok() - .header(header::CONTENT_TYPE, broken_header) - .body(STR), - ) - })) - })) - .map_err(|_| ()), - ) + HttpService::build() + .h2(service_fn(|_| { + let broken_header = Bytes::from_static(b"\0\0\0"); + ok::<_, ()>( + Response::Ok() + .header(header::CONTENT_TYPE, broken_header) + .body(STR), + ) + })) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -467,19 +383,11 @@ async fn test_h2_response_http_error_handling() { #[actix_rt::test] async fn test_h2_service_error() { - let openssl = ssl_acceptor().unwrap(); - let mut srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .h2(|_| err::(ErrorBadRequest("error"))) - .map_err(|_| ()), - ) + HttpService::build() + .h2(|_| err::(ErrorBadRequest("error"))) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); @@ -492,23 +400,15 @@ async fn test_h2_service_error() { #[actix_rt::test] async fn test_h2_on_connect() { - let openssl = ssl_acceptor().unwrap(); - let srv = TestServer::start(move || { - pipeline_factory( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) - .and_then( - HttpService::build() - .on_connect(|_| 10usize) - .h2(|req: Request| { - assert!(req.extensions().contains::()); - ok::<_, ()>(Response::Ok().finish()) - }) - .map_err(|_| ()), - ) + HttpService::build() + .on_connect(|_| 10usize) + .h2(|req: Request| { + assert!(req.extensions().contains::()); + ok::<_, ()>(Response::Ok().finish()) + }) + .openssl(ssl_acceptor()) + .map_err(|_| ()) }); let response = srv.sget("/").send().await.unwrap(); diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index a3ce3f9cb..850d30a3f 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -4,8 +4,7 @@ use std::{net, thread}; use actix_http_test::TestServer; use actix_rt::time::delay_for; -use actix_server_config::ServerConfig; -use actix_service::{factory_fn_cfg, pipeline, service_fn, ServiceFactory}; +use actix_service::service_fn; use bytes::Bytes; use futures::future::{self, err, ok, ready, FutureExt}; use futures::stream::{once, StreamExt}; @@ -27,6 +26,7 @@ async fn test_h1() { assert!(req.peer_addr().is_some()); future::ok::<_, ()>(Response::Ok().finish()) }) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -45,7 +45,7 @@ async fn test_h1_2() { assert_eq!(req.version(), http::Version::HTTP_11); future::ok::<_, ()>(Response::Ok().finish()) }) - .map(|_| ()) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -64,6 +64,7 @@ async fn test_expect_continue() { } })) .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -92,7 +93,8 @@ async fn test_expect_continue_h1() { } }) })) - .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .h1(service_fn(|_| future::ok::<_, ()>(Response::Ok().finish()))) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -114,18 +116,20 @@ async fn test_chunked_payload() { let total_size: usize = chunk_sizes.iter().sum(); let srv = TestServer::start(|| { - HttpService::build().h1(service_fn(|mut request: Request| { - request - .take_payload() - .map(|res| match res { - Ok(pl) => pl, - Err(e) => panic!(format!("Error reading payload: {}", e)), - }) - .fold(0usize, |acc, chunk| ready(acc + chunk.len())) - .map(|req_size| { - Ok::<_, Error>(Response::Ok().body(format!("size={}", req_size))) - }) - })) + HttpService::build() + .h1(service_fn(|mut request: Request| { + request + .take_payload() + .map(|res| match res { + Ok(pl) => pl, + Err(e) => panic!(format!("Error reading payload: {}", e)), + }) + .fold(0usize, |acc, chunk| ready(acc + chunk.len())) + .map(|req_size| { + Ok::<_, Error>(Response::Ok().body(format!("size={}", req_size))) + }) + })) + .tcp() }); let returned_size = { @@ -167,6 +171,7 @@ async fn test_slow_request() { HttpService::build() .client_timeout(100) .finish(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -179,7 +184,9 @@ async fn test_slow_request() { #[actix_rt::test] async fn test_http1_malformed_request() { let srv = TestServer::start(|| { - HttpService::build().h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + HttpService::build() + .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -192,7 +199,9 @@ async fn test_http1_malformed_request() { #[actix_rt::test] async fn test_http1_keepalive() { let srv = TestServer::start(|| { - HttpService::build().h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + HttpService::build() + .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -213,6 +222,7 @@ async fn test_http1_keepalive_timeout() { HttpService::build() .keep_alive(1) .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -230,7 +240,9 @@ async fn test_http1_keepalive_timeout() { #[actix_rt::test] async fn test_http1_keepalive_close() { let srv = TestServer::start(|| { - HttpService::build().h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + HttpService::build() + .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -248,7 +260,9 @@ async fn test_http1_keepalive_close() { #[actix_rt::test] async fn test_http10_keepalive_default_close() { let srv = TestServer::start(|| { - HttpService::build().h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + HttpService::build() + .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -265,7 +279,9 @@ async fn test_http10_keepalive_default_close() { #[actix_rt::test] async fn test_http10_keepalive() { let srv = TestServer::start(|| { - HttpService::build().h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + HttpService::build() + .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -292,6 +308,7 @@ async fn test_http1_keepalive_disabled() { HttpService::build() .keep_alive(KeepAlive::Disabled) .h1(|_| future::ok::<_, ()>(Response::Ok().finish())) + .tcp() }); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); @@ -313,18 +330,20 @@ async fn test_content_length() { }; let srv = TestServer::start(|| { - HttpService::build().h1(|req: Request| { - let indx: usize = req.uri().path()[1..].parse().unwrap(); - let statuses = [ - StatusCode::NO_CONTENT, - StatusCode::CONTINUE, - StatusCode::SWITCHING_PROTOCOLS, - StatusCode::PROCESSING, - StatusCode::OK, - StatusCode::NOT_FOUND, - ]; - future::ok::<_, ()>(Response::new(statuses[indx])) - }) + HttpService::build() + .h1(|req: Request| { + let indx: usize = req.uri().path()[1..].parse().unwrap(); + let statuses = [ + StatusCode::NO_CONTENT, + StatusCode::CONTINUE, + StatusCode::SWITCHING_PROTOCOLS, + StatusCode::PROCESSING, + StatusCode::OK, + StatusCode::NOT_FOUND, + ]; + future::ok::<_, ()>(Response::new(statuses[indx])) + }) + .tcp() }); let header = HeaderName::from_static("content-length"); @@ -377,7 +396,7 @@ async fn test_h1_headers() { ); } future::ok::<_, ()>(builder.body(data.clone())) - }) + }).tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -413,7 +432,9 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ #[actix_rt::test] async fn test_h1_body() { let mut srv = TestServer::start(|| { - HttpService::build().h1(|_| ok::<_, ()>(Response::Ok().body(STR))) + HttpService::build() + .h1(|_| ok::<_, ()>(Response::Ok().body(STR))) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -427,7 +448,9 @@ async fn test_h1_body() { #[actix_rt::test] async fn test_h1_head_empty() { let mut srv = TestServer::start(|| { - HttpService::build().h1(|_| ok::<_, ()>(Response::Ok().body(STR))) + HttpService::build() + .h1(|_| ok::<_, ()>(Response::Ok().body(STR))) + .tcp() }); let response = srv.head("/").send().await.unwrap(); @@ -449,9 +472,11 @@ async fn test_h1_head_empty() { #[actix_rt::test] async fn test_h1_head_binary() { let mut srv = TestServer::start(|| { - HttpService::build().h1(|_| { - ok::<_, ()>(Response::Ok().content_length(STR.len() as u64).body(STR)) - }) + HttpService::build() + .h1(|_| { + ok::<_, ()>(Response::Ok().content_length(STR.len() as u64).body(STR)) + }) + .tcp() }); let response = srv.head("/").send().await.unwrap(); @@ -473,7 +498,9 @@ async fn test_h1_head_binary() { #[actix_rt::test] async fn test_h1_head_binary2() { let srv = TestServer::start(|| { - HttpService::build().h1(|_| ok::<_, ()>(Response::Ok().body(STR))) + HttpService::build() + .h1(|_| ok::<_, ()>(Response::Ok().body(STR))) + .tcp() }); let response = srv.head("/").send().await.unwrap(); @@ -491,12 +518,14 @@ async fn test_h1_head_binary2() { #[actix_rt::test] async fn test_h1_body_length() { let mut srv = TestServer::start(|| { - HttpService::build().h1(|_| { - let body = once(ok(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>( - Response::Ok().body(body::SizedStream::new(STR.len() as u64, body)), - ) - }) + HttpService::build() + .h1(|_| { + let body = once(ok(Bytes::from_static(STR.as_ref()))); + ok::<_, ()>( + Response::Ok().body(body::SizedStream::new(STR.len() as u64, body)), + ) + }) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -510,14 +539,16 @@ async fn test_h1_body_length() { #[actix_rt::test] async fn test_h1_body_chunked_explicit() { let mut srv = TestServer::start(|| { - HttpService::build().h1(|_| { - let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>( - Response::Ok() - .header(header::TRANSFER_ENCODING, "chunked") - .streaming(body), - ) - }) + HttpService::build() + .h1(|_| { + let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))); + ok::<_, ()>( + Response::Ok() + .header(header::TRANSFER_ENCODING, "chunked") + .streaming(body), + ) + }) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -542,10 +573,12 @@ async fn test_h1_body_chunked_explicit() { #[actix_rt::test] async fn test_h1_body_chunked_implicit() { let mut srv = TestServer::start(|| { - HttpService::build().h1(|_| { - let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>(Response::Ok().streaming(body)) - }) + HttpService::build() + .h1(|_| { + let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))); + ok::<_, ()>(Response::Ok().streaming(body)) + }) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -568,8 +601,8 @@ async fn test_h1_body_chunked_implicit() { #[actix_rt::test] async fn test_h1_response_http_error_handling() { let mut srv = TestServer::start(|| { - HttpService::build().h1(factory_fn_cfg(|_: &ServerConfig| { - ok::<_, ()>(pipeline(|_| { + HttpService::build() + .h1(service_fn(|_| { let broken_header = Bytes::from_static(b"\0\0\0"); ok::<_, ()>( Response::Ok() @@ -577,7 +610,7 @@ async fn test_h1_response_http_error_handling() { .body(STR), ) })) - })) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -593,6 +626,7 @@ async fn test_h1_service_error() { let mut srv = TestServer::start(|| { HttpService::build() .h1(|_| future::err::(error::ErrorBadRequest("error"))) + .tcp() }); let response = srv.get("/").send().await.unwrap(); @@ -612,6 +646,7 @@ async fn test_h1_on_connect() { assert!(req.extensions().contains::()); future::ok::<_, ()>(Response::Ok().finish()) }) + .tcp() }); let response = srv.get("/").send().await.unwrap(); diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index aa81bc41b..5ac5fcaaf 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -40,6 +40,7 @@ async fn test_simple() { HttpService::build() .upgrade(actix_service::service_fn(ws_service)) .finish(|_| future::ok::<_, ()>(Response::NotFound())) + .tcp() }); // client service diff --git a/actix-web-codegen/tests/test_macro.rs b/actix-web-codegen/tests/test_macro.rs index b6ac6dd18..c4f2d7e89 100644 --- a/actix-web-codegen/tests/test_macro.rs +++ b/actix-web-codegen/tests/test_macro.rs @@ -78,6 +78,7 @@ async fn test_params() { .service(put_param_test) .service(delete_param_test), ) + .tcp() }); let request = srv.request(http::Method::GET, srv.url("/test/it")); @@ -107,6 +108,7 @@ async fn test_body() { .service(patch_test) .service(test), ) + .tcp() }); let request = srv.request(http::Method::GET, srv.url("/test")); let response = request.send().await.unwrap(); @@ -149,7 +151,8 @@ async fn test_body() { #[actix_rt::test] async fn test_auto_async() { - let srv = TestServer::start(|| HttpService::new(App::new().service(auto_async))); + let srv = + TestServer::start(|| HttpService::new(App::new().service(auto_async)).tcp()); let request = srv.request(http::Method::GET, srv.url("/test")); let response = request.send().await.unwrap(); diff --git a/awc/Cargo.toml b/awc/Cargo.toml index e9268aac0..883b275da 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -67,7 +67,8 @@ actix-web = { version = "2.0.0-alpha.1", features=["openssl"] } actix-http = { version = "0.3.0-alpha.1", features=["openssl"] } actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] } actix-utils = "0.5.0-alpha.1" -actix-server = { version = "0.8.0-alpha.1", features=["openssl"] } +actix-server = { version = "0.8.0-alpha.2" } +#actix-tls = { version = "0.1.0-alpha.1", features=["openssl"] } brotli2 = { version="0.3.2" } flate2 = { version="1.0.2" } env_logger = "0.6" diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index 15e9a07ac..6bd39973f 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -49,6 +49,7 @@ async fn test_simple() { HttpService::new(App::new().service( web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))), )) + .tcp() }); let request = srv.get("/").header("x-test", "111").send(); @@ -77,6 +78,7 @@ async fn test_json() { HttpService::new(App::new().service( web::resource("/").route(web::to(|_: web::Json| HttpResponse::Ok())), )) + .tcp() }); let request = srv @@ -93,6 +95,7 @@ async fn test_form() { HttpService::new(App::new().service(web::resource("/").route(web::to( |_: web::Form>| HttpResponse::Ok(), )))) + .tcp() }); let mut data = HashMap::new(); @@ -112,6 +115,7 @@ async fn test_timeout() { Ok::<_, Error>(HttpResponse::Ok().body(STR)) } })))) + .tcp() }); let connector = awc::Connector::new() @@ -142,6 +146,7 @@ async fn test_timeout_override() { Ok::<_, Error>(HttpResponse::Ok().body(STR)) } })))) + .tcp() }); let client = awc::Client::build() @@ -168,9 +173,13 @@ async fn test_connection_reuse() { num2.fetch_add(1, Ordering::Relaxed); ok(io) }) - .and_then(HttpService::new( - App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), - )) + .and_then( + HttpService::new( + App::new() + .service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), + ) + .tcp(), + ) }); let client = awc::Client::default(); @@ -200,9 +209,13 @@ async fn test_connection_force_close() { num2.fetch_add(1, Ordering::Relaxed); ok(io) }) - .and_then(HttpService::new( - App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), - )) + .and_then( + HttpService::new( + App::new() + .service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), + ) + .tcp(), + ) }); let client = awc::Client::default(); @@ -232,12 +245,15 @@ async fn test_connection_server_close() { num2.fetch_add(1, Ordering::Relaxed); ok(io) }) - .and_then(HttpService::new( - App::new().service( - web::resource("/") - .route(web::to(|| HttpResponse::Ok().force_close().finish())), - ), - )) + .and_then( + HttpService::new( + App::new().service( + web::resource("/") + .route(web::to(|| HttpResponse::Ok().force_close().finish())), + ), + ) + .tcp(), + ) }); let client = awc::Client::default(); @@ -267,9 +283,12 @@ async fn test_connection_wait_queue() { num2.fetch_add(1, Ordering::Relaxed); ok(io) }) - .and_then(HttpService::new(App::new().service( - web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))), - ))) + .and_then( + HttpService::new(App::new().service( + web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))), + )) + .tcp(), + ) }); let client = awc::Client::build() @@ -308,12 +327,15 @@ async fn test_connection_wait_queue_force_close() { num2.fetch_add(1, Ordering::Relaxed); ok(io) }) - .and_then(HttpService::new( - App::new().service( - web::resource("/") - .route(web::to(|| HttpResponse::Ok().force_close().body(STR))), - ), - )) + .and_then( + HttpService::new( + App::new().service( + web::resource("/") + .route(web::to(|| HttpResponse::Ok().force_close().body(STR))), + ), + ) + .tcp(), + ) }); let client = awc::Client::build() @@ -353,6 +375,7 @@ async fn test_with_query_parameter() { } }, ))) + .tcp() }); let res = awc::Client::new() @@ -373,6 +396,7 @@ async fn test_no_decompress() { res })), )) + .tcp() }); let mut res = awc::Client::new() @@ -419,6 +443,7 @@ async fn test_client_gzip_encoding() { .header("content-encoding", "gzip") .body(data) })))) + .tcp() }); // client request @@ -442,6 +467,7 @@ async fn test_client_gzip_encoding_large() { .header("content-encoding", "gzip") .body(data) })))) + .tcp() }); // client request @@ -471,6 +497,7 @@ async fn test_client_gzip_encoding_large_random() { .body(data) }, )))) + .tcp() }); // client request @@ -495,6 +522,7 @@ async fn test_client_brotli_encoding() { .body(data) }, )))) + .tcp() }); // client request @@ -707,6 +735,7 @@ async fn test_client_cookie_handling() { } }), )) + .tcp() }); let request = srv.get("/").cookie(cookie1.clone()).cookie(cookie2.clone()); @@ -768,6 +797,7 @@ async fn client_basic_auth() { } }), )) + .tcp() }); // set authorization header to Basic @@ -796,6 +826,7 @@ async fn client_bearer_auth() { } }), )) + .tcp() }); // set authorization header to Bearer diff --git a/awc/tests/test_ssl_client.rs b/awc/tests/test_ssl_client.rs index 1abb071a4..a9a7fa2fb 100644 --- a/awc/tests/test_ssl_client.rs +++ b/awc/tests/test_ssl_client.rs @@ -1,20 +1,16 @@ #![cfg(feature = "openssl")] -use open_ssl::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod, SslVerifyMode}; - -use std::io::Result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use actix_codec::{AsyncRead, AsyncWrite}; use actix_http::HttpService; use actix_http_test::TestServer; -use actix_server::ssl::OpensslAcceptor; use actix_service::{pipeline_factory, ServiceFactory}; use actix_web::http::Version; use actix_web::{web, App, HttpResponse}; use futures::future::ok; +use open_ssl::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod, SslVerifyMode}; -fn ssl_acceptor() -> Result> { +fn ssl_acceptor() -> SslAcceptor { // load ssl keys let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); builder @@ -31,13 +27,12 @@ fn ssl_acceptor() -> Result> { Err(open_ssl::ssl::AlpnError::NOACK) } }); - builder.set_alpn_protos(b"\x02h2")?; - Ok(actix_server::ssl::OpensslAcceptor::new(builder.build())) + builder.set_alpn_protos(b"\x02h2").unwrap(); + builder.build() } #[actix_rt::test] async fn test_connection_reuse_h2() { - let openssl = ssl_acceptor().unwrap(); let num = Arc::new(AtomicUsize::new(0)); let num2 = num.clone(); @@ -47,15 +42,11 @@ async fn test_connection_reuse_h2() { num2.fetch_add(1, Ordering::Relaxed); ok(io) }) - .and_then( - openssl - .clone() - .map_err(|e| println!("Openssl error: {}", e)), - ) .and_then( HttpService::build() .h2(App::new() .service(web::resource("/").route(web::to(|| HttpResponse::Ok())))) + .openssl(ssl_acceptor()) .map_err(|_| ()), ) }); diff --git a/awc/tests/test_ws.rs b/awc/tests/test_ws.rs index 2e1d3981e..d90f55531 100644 --- a/awc/tests/test_ws.rs +++ b/awc/tests/test_ws.rs @@ -46,6 +46,7 @@ async fn test_simple() { } }) .finish(|_| ok::<_, Error>(Response::NotFound())) + .tcp() }); // client service diff --git a/src/app_service.rs b/src/app_service.rs index 3fa5a6eed..6c18cd542 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -7,7 +7,6 @@ use std::task::{Context, Poll}; use actix_http::{Extensions, Request, Response}; use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url}; -use actix_server_config::ServerConfig; use actix_service::boxed::{self, BoxService, BoxServiceFactory}; use actix_service::{service_fn, Service, ServiceFactory}; use futures::future::{ok, FutureExt, LocalBoxFuture}; @@ -59,7 +58,7 @@ where InitError = (), >, { - type Config = ServerConfig; + type Config = (); type Request = Request; type Response = ServiceResponse; type Error = T::Error; @@ -67,7 +66,7 @@ where type Service = AppInitService; type Future = AppInitResult; - fn new_service(&self, cfg: &ServerConfig) -> Self::Future { + fn new_service(&self, _: &()) -> Self::Future { // update resource default service let default = self.default.clone().unwrap_or_else(|| { Rc::new(boxed::factory(service_fn(|req: ServiceRequest| { @@ -76,13 +75,6 @@ where }); // App config - { - let mut c = self.config.borrow_mut(); - let loc_cfg = Rc::get_mut(&mut c.0).unwrap(); - loc_cfg.secure = cfg.secure(); - loc_cfg.addr = cfg.local_addr(); - } - let mut config = AppService::new( self.config.borrow().clone(), default.clone(), diff --git a/src/server.rs b/src/server.rs index a98d06275..f3ec550cf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,11 +2,13 @@ use std::marker::PhantomData; use std::sync::Arc; use std::{fmt, io, net}; -use actix_http::{body::MessageBody, Error, HttpService, KeepAlive, Request, Response}; +use actix_http::{ + body::MessageBody, Error, HttpService, KeepAlive, Protocol, Request, Response, +}; use actix_rt::System; use actix_server::{Server, ServerBuilder}; -use actix_server_config::ServerConfig; -use actix_service::{IntoServiceFactory, Service, ServiceFactory}; +use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; +use futures::future::ok; use parking_lot::Mutex; use net2::TcpBuilder; @@ -52,7 +54,7 @@ pub struct HttpServer where F: Fn() -> I + Send + Clone + 'static, I: IntoServiceFactory, - S: ServiceFactory, + S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, @@ -71,7 +73,7 @@ impl HttpServer where F: Fn() -> I + Send + Clone + 'static, I: IntoServiceFactory, - S: ServiceFactory, + S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, @@ -137,8 +139,8 @@ where /// can be used to limit the global SSL CPU usage. /// /// By default max connections is set to a 256. - pub fn maxconnrate(mut self, num: usize) -> Self { - self.builder = self.builder.maxconnrate(num); + pub fn maxconnrate(self, num: usize) -> Self { + actix_tls::max_concurrent_ssl_connect(num); self } @@ -247,7 +249,9 @@ where HttpService::build() .keep_alive(c.keep_alive) .client_timeout(c.client_timeout) + .local_addr(addr) .finish(factory()) + .tcp() }, )?; Ok(self) @@ -271,10 +275,6 @@ where lst: net::TcpListener, acceptor: SslAcceptor, ) -> io::Result { - use actix_server::ssl::{OpensslAcceptor, SslError}; - use actix_service::pipeline_factory; - - let acceptor = OpensslAcceptor::new(acceptor); let factory = self.factory.clone(); let cfg = self.config.clone(); let addr = lst.local_addr().unwrap(); @@ -288,15 +288,12 @@ where lst, move || { let c = cfg.lock(); - pipeline_factory(acceptor.clone().map_err(SslError::Ssl)).and_then( - HttpService::build() - .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown) - .finish(factory()) - .map_err(SslError::Service) - .map_init_err(|_| ()), - ) + HttpService::build() + .keep_alive(c.keep_alive) + .client_timeout(c.client_timeout) + .client_disconnect(c.client_shutdown) + .finish(factory()) + .openssl(acceptor.clone()) }, )?; Ok(self) @@ -444,6 +441,8 @@ where mut self, lst: std::os::unix::net::UnixListener, ) -> io::Result { + use actix_rt::net::UnixStream; + let cfg = self.config.clone(); let factory = self.factory.clone(); // todo duplicated: @@ -459,10 +458,12 @@ where self.builder = self.builder.listen_uds(addr, lst, move || { let c = cfg.lock(); - HttpService::build() - .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .finish(factory()) + pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then( + HttpService::build() + .keep_alive(c.keep_alive) + .client_timeout(c.client_timeout) + .finish(factory()), + ) })?; Ok(self) } @@ -475,6 +476,8 @@ where where A: AsRef, { + use actix_rt::net::UnixStream; + let cfg = self.config.clone(); let factory = self.factory.clone(); self.sockets.push(Socket { @@ -490,10 +493,13 @@ where addr, move || { let c = cfg.lock(); - HttpService::build() - .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .finish(factory()) + pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))) + .and_then( + HttpService::build() + .keep_alive(c.keep_alive) + .client_timeout(c.client_timeout) + .finish(factory()), + ) }, )?; Ok(self) @@ -504,7 +510,7 @@ impl HttpServer where F: Fn() -> I + Send + Clone + 'static, I: IntoServiceFactory, - S: ServiceFactory, + S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, @@ -585,8 +591,11 @@ fn openssl_acceptor(mut builder: SslAcceptorBuilder) -> io::Result builder.set_alpn_select_callback(|_, protos| { const H2: &[u8] = b"\x02h2"; + const H11: &[u8] = b"\x08http/1.1"; if protos.windows(3).any(|window| window == H2) { Ok(b"h2") + } else if protos.windows(9).any(|window| window == H11) { + Ok(b"http/1.1") } else { Err(AlpnError::NOACK) } diff --git a/src/test.rs b/src/test.rs index e19393156..9ded3f9f8 100644 --- a/src/test.rs +++ b/src/test.rs @@ -6,7 +6,6 @@ use actix_http::http::{HttpTryFrom, Method, StatusCode, Uri, Version}; use actix_http::test::TestRequest as HttpTestRequest; use actix_http::{cookie::Cookie, Extensions, Request}; use actix_router::{Path, ResourceDef, Url}; -use actix_server_config::ServerConfig; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use bytes::{Bytes, BytesMut}; use futures::future::ok; @@ -71,16 +70,15 @@ pub async fn init_service( where R: IntoServiceFactory, S: ServiceFactory< - Config = ServerConfig, + Config = (), Request = Request, Response = ServiceResponse, Error = E, >, S::InitError: std::fmt::Debug, { - let cfg = ServerConfig::new("127.0.0.1:8080".parse().unwrap()); let srv = app.into_factory(); - srv.new_service(&cfg).await.unwrap() + srv.new_service(&()).await.unwrap() } /// Calls service and waits for response future completion. diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index e59e439fe..bb7a10040 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -27,7 +27,7 @@ path = "src/lib.rs" default = [] # openssl -openssl = ["open-ssl", "actix-server/openssl", "awc/openssl"] +openssl = ["open-ssl", "awc/openssl", ] # "actix-tls/openssl"] [dependencies] actix-service = "1.0.0-alpha.1" @@ -36,7 +36,6 @@ actix-connect = "1.0.0-alpha.1" actix-utils = "0.5.0-alpha.1" actix-rt = "1.0.0-alpha.1" actix-server = "0.8.0-alpha.1" -actix-server-config = "0.3.0-alpha.1" actix-testing = "0.3.0-alpha.1" awc = "0.3.0-alpha.1" diff --git a/tests/test_server.rs b/tests/test_server.rs index bfdf3f0ee..7cfda04ad 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -4,7 +4,7 @@ use actix_http::http::header::{ ContentEncoding, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING, }; -use actix_http::{h1, Error, HttpService, Response}; +use actix_http::{Error, HttpService, Response}; use actix_http_test::TestServer; use brotli2::write::{BrotliDecoder, BrotliEncoder}; use bytes::Bytes; @@ -42,10 +42,10 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ #[actix_rt::test] async fn test_body() { let srv = TestServer::start(|| { - h1::H1Service::new( - App::new() - .service(web::resource("/").route(web::to(|| Response::Ok().body(STR)))), - ) + HttpService::build() + .h1(App::new() + .service(web::resource("/").route(web::to(|| Response::Ok().body(STR))))) + .tcp() }); let mut response = srv.get("/").send().await.unwrap(); @@ -60,11 +60,11 @@ async fn test_body() { #[actix_rt::test] async fn test_body_gzip() { let srv = TestServer::start(|| { - h1::H1Service::new( - App::new() + HttpService::build() + .h1(App::new() .wrap(Compress::new(ContentEncoding::Gzip)) - .service(web::resource("/").route(web::to(|| Response::Ok().body(STR)))), - ) + .service(web::resource("/").route(web::to(|| Response::Ok().body(STR))))) + .tcp() }); let mut response = srv @@ -90,13 +90,13 @@ async fn test_body_gzip() { #[actix_rt::test] async fn test_body_gzip2() { let srv = TestServer::start(|| { - h1::H1Service::new( - App::new() + HttpService::build() + .h1(App::new() .wrap(Compress::new(ContentEncoding::Gzip)) .service(web::resource("/").route(web::to(|| { Response::Ok().body(STR).into_body::() - }))), - ) + })))) + .tcp() }); let mut response = srv @@ -122,8 +122,8 @@ async fn test_body_gzip2() { #[actix_rt::test] async fn test_body_encoding_override() { let srv = TestServer::start(|| { - h1::H1Service::new( - App::new() + HttpService::build() + .h1(App::new() .wrap(Compress::new(ContentEncoding::Gzip)) .service(web::resource("/").route(web::to(|| { Response::Ok().encoding(ContentEncoding::Deflate).body(STR) @@ -136,8 +136,8 @@ async fn test_body_encoding_override() { response.encoding(ContentEncoding::Deflate); response - }))), - ) + })))) + .tcp() }); // Builder @@ -187,14 +187,14 @@ async fn test_body_gzip_large() { let srv = TestServer::start(move || { let data = srv_data.clone(); - h1::H1Service::new( - App::new() + HttpService::build() + .h1(App::new() .wrap(Compress::new(ContentEncoding::Gzip)) .service( web::resource("/") .route(web::to(move || Response::Ok().body(data.clone()))), - ), - ) + )) + .tcp() }); let mut response = srv @@ -227,14 +227,14 @@ async fn test_body_gzip_large_random() { let srv = TestServer::start(move || { let data = srv_data.clone(); - h1::H1Service::new( - App::new() + HttpService::build() + .h1(App::new() .wrap(Compress::new(ContentEncoding::Gzip)) .service( web::resource("/") .route(web::to(move || Response::Ok().body(data.clone()))), - ), - ) + )) + .tcp() }); let mut response = srv @@ -261,15 +261,15 @@ async fn test_body_gzip_large_random() { #[actix_rt::test] async fn test_body_chunked_implicit() { let srv = TestServer::start(move || { - h1::H1Service::new( - App::new() + HttpService::build() + .h1(App::new() .wrap(Compress::new(ContentEncoding::Gzip)) .service(web::resource("/").route(web::get().to(move || { Response::Ok().streaming(once(ok::<_, Error>(Bytes::from_static( STR.as_ref(), )))) - }))), - ) + })))) + .tcp() }); let mut response = srv @@ -299,12 +299,15 @@ async fn test_body_chunked_implicit() { #[cfg(feature = "brotli")] async fn test_body_br_streaming() { let srv = TestServer::start(move || { - h1::H1Service::new(App::new().wrap(Compress::new(ContentEncoding::Br)).service( - web::resource("/").route(web::to(move || { - Response::Ok() - .streaming(once(ok::<_, Error>(Bytes::from_static(STR.as_ref())))) - })), - )) + HttpService::build() + .h1(App::new().wrap(Compress::new(ContentEncoding::Br)).service( + web::resource("/").route(web::to(move || { + Response::Ok().streaming(once(ok::<_, Error>(Bytes::from_static( + STR.as_ref(), + )))) + })), + )) + .tcp() }); let mut response = srv @@ -329,9 +332,11 @@ async fn test_body_br_streaming() { #[actix_rt::test] async fn test_head_binary() { let srv = TestServer::start(move || { - h1::H1Service::new(App::new().service(web::resource("/").route( - web::head().to(move || Response::Ok().content_length(100).body(STR)), - ))) + HttpService::build() + .h1(App::new().service(web::resource("/").route( + web::head().to(move || Response::Ok().content_length(100).body(STR)), + ))) + .tcp() }); let mut response = srv.head("/").send().await.unwrap(); @@ -350,14 +355,18 @@ async fn test_head_binary() { #[actix_rt::test] async fn test_no_chunking() { let srv = TestServer::start(move || { - h1::H1Service::new(App::new().service(web::resource("/").route(web::to( - move || { - Response::Ok() - .no_chunking() - .content_length(STR.len() as u64) - .streaming(once(ok::<_, Error>(Bytes::from_static(STR.as_ref())))) - }, - )))) + HttpService::build() + .h1( + App::new().service(web::resource("/").route(web::to(move || { + Response::Ok() + .no_chunking() + .content_length(STR.len() as u64) + .streaming(once(ok::<_, Error>(Bytes::from_static( + STR.as_ref(), + )))) + }))), + ) + .tcp() }); let mut response = srv.get("/").send().await.unwrap(); @@ -373,13 +382,13 @@ async fn test_no_chunking() { #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] async fn test_body_deflate() { let srv = TestServer::start(move || { - h1::H1Service::new( - App::new() + HttpService::build() + .h1(App::new() .wrap(Compress::new(ContentEncoding::Deflate)) .service( web::resource("/").route(web::to(move || Response::Ok().body(STR))), - ), - ) + )) + .tcp() }); // client request @@ -405,9 +414,11 @@ async fn test_body_deflate() { #[cfg(any(feature = "brotli"))] async fn test_body_brotli() { let srv = TestServer::start(move || { - h1::H1Service::new(App::new().wrap(Compress::new(ContentEncoding::Br)).service( - web::resource("/").route(web::to(move || Response::Ok().body(STR))), - )) + HttpService::build() + .h1(App::new().wrap(Compress::new(ContentEncoding::Br)).service( + web::resource("/").route(web::to(move || Response::Ok().body(STR))), + )) + .tcp() }); // client request @@ -434,12 +445,12 @@ async fn test_body_brotli() { #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] async fn test_encoding() { let srv = TestServer::start(move || { - HttpService::new( - App::new().wrap(Compress::default()).service( + HttpService::build() + .h1(App::new().wrap(Compress::default()).service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); // client request @@ -463,12 +474,12 @@ async fn test_encoding() { #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] async fn test_gzip_encoding() { let srv = TestServer::start(move || { - HttpService::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); // client request @@ -493,12 +504,12 @@ async fn test_gzip_encoding() { async fn test_gzip_encoding_large() { let data = STR.repeat(10); let srv = TestServer::start(move || { - h1::H1Service::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); // client request @@ -527,12 +538,12 @@ async fn test_reading_gzip_encoding_large_random() { .collect::(); let srv = TestServer::start(move || { - HttpService::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); // client request @@ -557,12 +568,12 @@ async fn test_reading_gzip_encoding_large_random() { #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] async fn test_reading_deflate_encoding() { let srv = TestServer::start(move || { - h1::H1Service::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); @@ -587,12 +598,12 @@ async fn test_reading_deflate_encoding() { async fn test_reading_deflate_encoding_large() { let data = STR.repeat(10); let srv = TestServer::start(move || { - h1::H1Service::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); @@ -621,12 +632,12 @@ async fn test_reading_deflate_encoding_large_random() { .collect::(); let srv = TestServer::start(move || { - h1::H1Service::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); @@ -651,12 +662,12 @@ async fn test_reading_deflate_encoding_large_random() { #[cfg(feature = "brotli")] async fn test_brotli_encoding() { let srv = TestServer::start(move || { - h1::H1Service::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); let mut e = BrotliEncoder::new(Vec::new(), 5); @@ -681,12 +692,12 @@ async fn test_brotli_encoding() { async fn test_brotli_encoding_large() { let data = STR.repeat(10); let srv = TestServer::start(move || { - h1::H1Service::new( - App::new().service( + HttpService::build() + .h1(App::new().service( web::resource("/") .route(web::to(move |body: Bytes| Response::Ok().body(body))), - ), - ) + )) + .tcp() }); let mut e = BrotliEncoder::new(Vec::new(), 5);