From 43d325a139eb16676ffea26eac3495f85fd452e4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 8 Apr 2019 14:51:16 -0700 Subject: [PATCH] allow to specify upgrade service --- actix-http/src/builder.rs | 58 +++++++--------- actix-http/src/h1/dispatcher.rs | 29 ++++++-- actix-http/src/h1/mod.rs | 2 + actix-http/src/h1/service.rs | 89 +++++++++++++++++++----- actix-http/src/h1/upgrade.rs | 40 +++++++++++ actix-http/src/service.rs | 118 +++++++++++++++++++++++++------- actix-session/src/cookie.rs | 1 - 7 files changed, 254 insertions(+), 83 deletions(-) create mode 100644 actix-http/src/h1/upgrade.rs diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 6d93c156..7b07d30e 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -1,13 +1,14 @@ use std::fmt; use std::marker::PhantomData; +use actix_codec::Framed; use actix_server_config::ServerConfig as SrvConfig; use actix_service::{IntoNewService, NewService, Service}; use crate::body::MessageBody; use crate::config::{KeepAlive, ServiceConfig}; use crate::error::Error; -use crate::h1::{ExpectHandler, H1Service}; +use crate::h1::{Codec, ExpectHandler, H1Service, UpgradeHandler}; use crate::h2::H2Service; use crate::request::Request; use crate::response::Response; @@ -17,15 +18,16 @@ use crate::service::HttpService; /// /// This type can be used to construct an instance of `http service` through a /// builder-like pattern. -pub struct HttpServiceBuilder { +pub struct HttpServiceBuilder> { keep_alive: KeepAlive, client_timeout: u64, client_disconnect: u64, expect: X, + upgrade: Option, _t: PhantomData<(T, S)>, } -impl HttpServiceBuilder +impl HttpServiceBuilder> where S: NewService, S::Error: Into, @@ -38,12 +40,13 @@ where client_timeout: 5000, client_disconnect: 0, expect: ExpectHandler, + upgrade: None, _t: PhantomData, } } } -impl HttpServiceBuilder +impl HttpServiceBuilder where S: NewService, S::Error: Into, @@ -51,11 +54,14 @@ where X: NewService, X::Error: Into, X::InitError: fmt::Debug, + U: NewService), Response = ()>, + U::Error: fmt::Display, + U::InitError: fmt::Debug, { /// Set server keep-alive setting. /// /// By default keep alive is set to a 5 seconds. - pub fn keep_alive>(mut self, val: U) -> Self { + pub fn keep_alive>(mut self, val: W) -> Self { self.keep_alive = val.into(); self } @@ -92,43 +98,25 @@ where /// Service get called with request that contains `EXPECT` header. /// Service must return request in case of success, in that case /// request will be forwarded to main service. - pub fn expect(self, expect: F) -> HttpServiceBuilder + pub fn expect(self, expect: F) -> HttpServiceBuilder where - F: IntoNewService, - U: NewService, - U::Error: Into, - U::InitError: fmt::Debug, + F: IntoNewService, + X1: NewService, + X1::Error: Into, + X1::InitError: fmt::Debug, { HttpServiceBuilder { keep_alive: self.keep_alive, client_timeout: self.client_timeout, client_disconnect: self.client_disconnect, expect: expect.into_new_service(), + upgrade: self.upgrade, _t: PhantomData, } } - // #[cfg(feature = "ssl")] - // /// Configure alpn protocols for SslAcceptorBuilder. - // pub fn configure_openssl( - // builder: &mut openssl::ssl::SslAcceptorBuilder, - // ) -> io::Result<()> { - // let protos: &[u8] = b"\x02h2"; - // builder.set_alpn_select_callback(|_, protos| { - // const H2: &[u8] = b"\x02h2"; - // if protos.windows(3).any(|window| window == H2) { - // Ok(b"h2") - // } else { - // Err(openssl::ssl::AlpnError::NOACK) - // } - // }); - // builder.set_alpn_protos(&protos)?; - - // Ok(()) - // } - /// Finish service configuration and create *http service* for HTTP/1 protocol. - pub fn h1(self, service: F) -> H1Service + pub fn h1(self, service: F) -> H1Service where B: MessageBody + 'static, F: IntoNewService, @@ -141,7 +129,9 @@ where self.client_timeout, self.client_disconnect, ); - H1Service::with_config(cfg, service.into_new_service()).expect(self.expect) + H1Service::with_config(cfg, service.into_new_service()) + .expect(self.expect) + .upgrade(self.upgrade) } /// Finish service configuration and create *http service* for HTTP/2 protocol. @@ -163,7 +153,7 @@ where } /// Finish service configuration and create `HttpService` instance. - pub fn finish(self, service: F) -> HttpService + pub fn finish(self, service: F) -> HttpService where B: MessageBody + 'static, F: IntoNewService, @@ -177,6 +167,8 @@ where self.client_timeout, self.client_disconnect, ); - HttpService::with_config(cfg, service.into_new_service()).expect(self.expect) + HttpService::with_config(cfg, service.into_new_service()) + .expect(self.expect) + .upgrade(self.upgrade) } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index a223161f..eccf2412 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::time::Instant; use std::{fmt, io}; -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; +use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::Service; use actix_utils::cloneable::CloneableService; use bitflags::bitflags; @@ -39,27 +39,32 @@ bitflags! { } /// Dispatcher for HTTP/1.1 protocol -pub struct Dispatcher +pub struct Dispatcher where S: Service, S::Error: Into, B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { - inner: Option>, + inner: Option>, } -struct InnerDispatcher +struct InnerDispatcher where S: Service, S::Error: Into, B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { service: CloneableService, expect: CloneableService, + upgrade: Option>, flags: Flags, error: Option, @@ -132,7 +137,7 @@ where } } -impl Dispatcher +impl Dispatcher where T: AsyncRead + AsyncWrite, S: Service, @@ -141,6 +146,8 @@ where B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { /// Create http/1 dispatcher. pub fn new( @@ -148,6 +155,7 @@ where config: ServiceConfig, service: CloneableService, expect: CloneableService, + upgrade: Option>, ) -> Self { Dispatcher::with_timeout( stream, @@ -157,6 +165,7 @@ where None, service, expect, + upgrade, ) } @@ -169,6 +178,7 @@ where timeout: Option, service: CloneableService, expect: CloneableService, + upgrade: Option>, ) -> Self { let keepalive = config.keep_alive_enabled(); let flags = if keepalive { @@ -198,6 +208,7 @@ where messages: VecDeque::new(), service, expect, + upgrade, flags, ka_expire, ka_timer, @@ -206,7 +217,7 @@ where } } -impl InnerDispatcher +impl InnerDispatcher where T: AsyncRead + AsyncWrite, S: Service, @@ -215,6 +226,8 @@ where B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { fn can_read(&self) -> bool { if self.flags.contains(Flags::READ_DISCONNECT) { @@ -603,7 +616,7 @@ where } } -impl Future for Dispatcher +impl Future for Dispatcher where T: AsyncRead + AsyncWrite, S: Service, @@ -612,6 +625,8 @@ where B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { type Item = (); type Error = DispatchError; diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 79d7cda8..58712278 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -9,6 +9,7 @@ mod encoder; mod expect; mod payload; mod service; +mod upgrade; pub use self::client::{ClientCodec, ClientPayloadCodec}; pub use self::codec::Codec; @@ -16,6 +17,7 @@ pub use self::dispatcher::Dispatcher; pub use self::expect::ExpectHandler; pub use self::payload::Payload; pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; +pub use self::upgrade::UpgradeHandler; #[derive(Debug)] /// Codec message diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index c3d21b4d..f92fd0c8 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -16,13 +16,14 @@ use crate::response::Response; use super::codec::Codec; use super::dispatcher::Dispatcher; -use super::{ExpectHandler, Message}; +use super::{ExpectHandler, Message, UpgradeHandler}; /// `NewService` implementation for HTTP1 transport -pub struct H1Service { +pub struct H1Service> { srv: S, cfg: ServiceConfig, expect: X, + upgrade: Option, _t: PhantomData<(T, P, B)>, } @@ -42,6 +43,7 @@ where cfg, srv: service.into_new_service(), expect: ExpectHandler, + upgrade: None, _t: PhantomData, } } @@ -55,12 +57,13 @@ where cfg, srv: service.into_new_service(), expect: ExpectHandler, + upgrade: None, _t: PhantomData, } } } -impl H1Service +impl H1Service where S: NewService, S::Error: Into, @@ -68,22 +71,38 @@ where S::InitError: fmt::Debug, B: MessageBody, { - pub fn expect(self, expect: U) -> H1Service + pub fn expect(self, expect: X1) -> H1Service where - U: NewService, - U::Error: Into, - U::InitError: fmt::Debug, + X1: NewService, + X1::Error: Into, + X1::InitError: fmt::Debug, { H1Service { expect, cfg: self.cfg, srv: self.srv, + upgrade: self.upgrade, + _t: PhantomData, + } + } + + pub fn upgrade(self, upgrade: Option) -> H1Service + where + U1: NewService), Response = ()>, + U1::Error: fmt::Display, + U1::InitError: fmt::Debug, + { + H1Service { + upgrade, + cfg: self.cfg, + srv: self.srv, + expect: self.expect, _t: PhantomData, } } } -impl NewService for H1Service +impl NewService for H1Service where T: AsyncRead + AsyncWrite, S: NewService, @@ -94,19 +113,24 @@ where X: NewService, X::Error: Into, X::InitError: fmt::Debug, + U: NewService), Response = ()>, + U::Error: fmt::Display, + U::InitError: fmt::Debug, { type Request = Io; type Response = (); type Error = DispatchError; type InitError = (); - type Service = H1ServiceHandler; - type Future = H1ServiceResponse; + type Service = H1ServiceHandler; + type Future = H1ServiceResponse; fn new_service(&self, cfg: &SrvConfig) -> Self::Future { H1ServiceResponse { fut: self.srv.new_service(cfg).into_future(), fut_ex: Some(self.expect.new_service(&())), + fut_upg: self.upgrade.as_ref().map(|f| f.new_service(&())), expect: None, + upgrade: None, cfg: Some(self.cfg.clone()), _t: PhantomData, } @@ -114,7 +138,7 @@ where } #[doc(hidden)] -pub struct H1ServiceResponse +pub struct H1ServiceResponse where S: NewService, S::Error: Into, @@ -122,15 +146,20 @@ where X: NewService, X::Error: Into, X::InitError: fmt::Debug, + U: NewService), Response = ()>, + U::Error: fmt::Display, + U::InitError: fmt::Debug, { fut: S::Future, fut_ex: Option, + fut_upg: Option, expect: Option, + upgrade: Option, cfg: Option, _t: PhantomData<(T, P, B)>, } -impl Future for H1ServiceResponse +impl Future for H1ServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, @@ -141,8 +170,11 @@ where X: NewService, X::Error: Into, X::InitError: fmt::Debug, + U: NewService), Response = ()>, + U::Error: fmt::Display, + U::InitError: fmt::Debug, { - type Item = H1ServiceHandler; + type Item = H1ServiceHandler; type Error = (); fn poll(&mut self) -> Poll { @@ -154,6 +186,14 @@ where self.fut_ex.take(); } + if let Some(ref mut fut) = self.fut_upg { + let upgrade = try_ready!(fut + .poll() + .map_err(|e| log::error!("Init http service error: {:?}", e))); + self.upgrade = Some(upgrade); + self.fut_ex.take(); + } + let service = try_ready!(self .fut .poll() @@ -162,19 +202,21 @@ where self.cfg.take().unwrap(), service, self.expect.take().unwrap(), + self.upgrade.take(), ))) } } /// `Service` implementation for HTTP1 transport -pub struct H1ServiceHandler { +pub struct H1ServiceHandler { srv: CloneableService, expect: CloneableService, + upgrade: Option>, cfg: ServiceConfig, _t: PhantomData<(T, P, B)>, } -impl H1ServiceHandler +impl H1ServiceHandler where S: Service, S::Error: Into, @@ -182,18 +224,26 @@ where B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { - fn new(cfg: ServiceConfig, srv: S, expect: X) -> H1ServiceHandler { + fn new( + cfg: ServiceConfig, + srv: S, + expect: X, + upgrade: Option, + ) -> H1ServiceHandler { H1ServiceHandler { srv: CloneableService::new(srv), expect: CloneableService::new(expect), + upgrade: upgrade.map(|s| CloneableService::new(s)), cfg, _t: PhantomData, } } } -impl Service for H1ServiceHandler +impl Service for H1ServiceHandler where T: AsyncRead + AsyncWrite, S: Service, @@ -202,11 +252,13 @@ where B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { type Request = Io; type Response = (); type Error = DispatchError; - type Future = Dispatcher; + type Future = Dispatcher; fn poll_ready(&mut self) -> Poll<(), Self::Error> { let ready = self @@ -243,6 +295,7 @@ where self.cfg.clone(), self.srv.clone(), self.expect.clone(), + self.upgrade.clone(), ) } } diff --git a/actix-http/src/h1/upgrade.rs b/actix-http/src/h1/upgrade.rs new file mode 100644 index 00000000..0d0164fe --- /dev/null +++ b/actix-http/src/h1/upgrade.rs @@ -0,0 +1,40 @@ +use std::marker::PhantomData; + +use actix_codec::Framed; +use actix_service::{NewService, Service}; +use futures::future::FutureResult; +use futures::{Async, Poll}; + +use crate::error::Error; +use crate::h1::Codec; +use crate::request::Request; + +pub struct UpgradeHandler(PhantomData); + +impl NewService for UpgradeHandler { + type Request = (Request, Framed); + type Response = (); + type Error = Error; + type Service = UpgradeHandler; + type InitError = Error; + type Future = FutureResult; + + fn new_service(&self, _: &()) -> Self::Future { + unimplemented!() + } +} + +impl Service for UpgradeHandler { + type Request = (Request, Framed); + type Response = (); + type Error = Error; + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, _: Self::Request) -> Self::Future { + unimplemented!() + } +} diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 57ab6ec2..2af1238b 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -1,7 +1,7 @@ use std::marker::PhantomData; use std::{fmt, io}; -use actix_codec::{AsyncRead, AsyncWrite}; +use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_server_config::{Io as ServerIo, Protocol, ServerConfig as SrvConfig}; use actix_service::{IntoNewService, NewService, Service}; use actix_utils::cloneable::CloneableService; @@ -18,10 +18,11 @@ use crate::response::Response; use crate::{h1, h2::Dispatcher}; /// `NewService` HTTP1.1/HTTP2 transport implementation -pub struct HttpService { +pub struct HttpService> { srv: S, cfg: ServiceConfig, expect: X, + upgrade: Option, _t: PhantomData<(T, P, B)>, } @@ -57,6 +58,7 @@ where cfg, srv: service.into_new_service(), expect: h1::ExpectHandler, + upgrade: None, _t: PhantomData, } } @@ -70,12 +72,13 @@ where cfg, srv: service.into_new_service(), expect: h1::ExpectHandler, + upgrade: None, _t: PhantomData, } } } -impl HttpService +impl HttpService where S: NewService, S::Error: Into, @@ -88,22 +91,42 @@ where /// Service get called with request that contains `EXPECT` header. /// Service must return request in case of success, in that case /// request will be forwarded to main service. - pub fn expect(self, expect: U) -> HttpService + pub fn expect(self, expect: X1) -> HttpService where - U: NewService, - U::Error: Into, - U::InitError: fmt::Debug, + X1: NewService, + X1::Error: Into, + X1::InitError: fmt::Debug, { HttpService { expect, cfg: self.cfg, srv: self.srv, + upgrade: self.upgrade, + _t: PhantomData, + } + } + + /// Provide service for custom `Connection: UPGRADE` support. + /// + /// If service is provided then normal requests handling get halted + /// and this service get called with original request and framed object. + pub fn upgrade(self, upgrade: Option) -> HttpService + where + U1: NewService), Response = ()>, + U1::Error: fmt::Display, + U1::InitError: fmt::Debug, + { + HttpService { + upgrade, + cfg: self.cfg, + srv: self.srv, + expect: self.expect, _t: PhantomData, } } } -impl NewService for HttpService +impl NewService for HttpService where T: AsyncRead + AsyncWrite, S: NewService, @@ -115,19 +138,24 @@ where X: NewService, X::Error: Into, X::InitError: fmt::Debug, + U: NewService), Response = ()>, + U::Error: fmt::Display, + U::InitError: fmt::Debug, { type Request = ServerIo; type Response = (); type Error = DispatchError; type InitError = (); - type Service = HttpServiceHandler; - type Future = HttpServiceResponse; + type Service = HttpServiceHandler; + type Future = HttpServiceResponse; fn new_service(&self, cfg: &SrvConfig) -> Self::Future { HttpServiceResponse { fut: self.srv.new_service(cfg).into_future(), fut_ex: Some(self.expect.new_service(&())), + fut_upg: self.upgrade.as_ref().map(|f| f.new_service(&())), expect: None, + upgrade: None, cfg: Some(self.cfg.clone()), _t: PhantomData, } @@ -135,15 +163,24 @@ where } #[doc(hidden)] -pub struct HttpServiceResponse, B, X: NewService> { +pub struct HttpServiceResponse< + T, + P, + S: NewService, + B, + X: NewService, + U: NewService, +> { fut: S::Future, fut_ex: Option, + fut_upg: Option, expect: Option, + upgrade: Option, cfg: Option, _t: PhantomData<(T, P, B)>, } -impl Future for HttpServiceResponse +impl Future for HttpServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, @@ -155,8 +192,11 @@ where X: NewService, X::Error: Into, X::InitError: fmt::Debug, + U: NewService), Response = ()>, + U::Error: fmt::Display, + U::InitError: fmt::Debug, { - type Item = HttpServiceHandler; + type Item = HttpServiceHandler; type Error = (); fn poll(&mut self) -> Poll { @@ -168,6 +208,14 @@ where self.fut_ex.take(); } + if let Some(ref mut fut) = self.fut_upg { + let upgrade = try_ready!(fut + .poll() + .map_err(|e| log::error!("Init http service error: {:?}", e))); + self.upgrade = Some(upgrade); + self.fut_ex.take(); + } + let service = try_ready!(self .fut .poll() @@ -176,19 +224,21 @@ where self.cfg.take().unwrap(), service, self.expect.take().unwrap(), + self.upgrade.take(), ))) } } /// `Service` implementation for http transport -pub struct HttpServiceHandler { +pub struct HttpServiceHandler { srv: CloneableService, expect: CloneableService, + upgrade: Option>, cfg: ServiceConfig, _t: PhantomData<(T, P, B, X)>, } -impl HttpServiceHandler +impl HttpServiceHandler where S: Service, S::Error: Into, @@ -197,18 +247,26 @@ where B: MessageBody + 'static, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { - fn new(cfg: ServiceConfig, srv: S, expect: X) -> HttpServiceHandler { + fn new( + cfg: ServiceConfig, + srv: S, + expect: X, + upgrade: Option, + ) -> HttpServiceHandler { HttpServiceHandler { cfg, srv: CloneableService::new(srv), expect: CloneableService::new(expect), + upgrade: upgrade.map(|s| CloneableService::new(s)), _t: PhantomData, } } } -impl Service for HttpServiceHandler +impl Service for HttpServiceHandler where T: AsyncRead + AsyncWrite, S: Service, @@ -218,11 +276,13 @@ where B: MessageBody + 'static, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { type Request = ServerIo; type Response = (); type Error = DispatchError; - type Future = HttpServiceHandlerResponse; + type Future = HttpServiceHandlerResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { let ready = self @@ -275,6 +335,7 @@ where self.cfg.clone(), self.srv.clone(), self.expect.clone(), + self.upgrade.clone(), )), }, _ => HttpServiceHandlerResponse { @@ -284,13 +345,14 @@ where self.cfg.clone(), self.srv.clone(), self.expect.clone(), + self.upgrade.clone(), ))), }, } } } -enum State +enum State where S: Service, S::Future: 'static, @@ -299,8 +361,10 @@ where B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { - H1(h1::Dispatcher), + H1(h1::Dispatcher), H2(Dispatcher, S, B>), Unknown( Option<( @@ -309,12 +373,13 @@ where ServiceConfig, CloneableService, CloneableService, + Option>, )>, ), Handshake(Option<(Handshake, Bytes>, ServiceConfig, CloneableService)>), } -pub struct HttpServiceHandlerResponse +pub struct HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite, S: Service, @@ -324,13 +389,15 @@ where B: MessageBody + 'static, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { - state: State, + state: State, } const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; -impl Future for HttpServiceHandlerResponse +impl Future for HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite, S: Service, @@ -340,6 +407,8 @@ where B: MessageBody, X: Service, X::Error: Into, + U: Service), Response = ()>, + U::Error: fmt::Display, { type Item = (); type Error = DispatchError; @@ -366,7 +435,7 @@ where } else { panic!() } - let (io, buf, cfg, srv, expect) = data.take().unwrap(); + let (io, buf, cfg, srv, expect, upgrade) = data.take().unwrap(); if buf[..14] == HTTP2_PREFACE[..] { let io = Io { inner: io, @@ -383,6 +452,7 @@ where None, srv, expect, + upgrade, )) } self.poll() diff --git a/actix-session/src/cookie.rs b/actix-session/src/cookie.rs index f7b4ec03..b44c87e0 100644 --- a/actix-session/src/cookie.rs +++ b/actix-session/src/cookie.rs @@ -295,7 +295,6 @@ where type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - //self.service.poll_ready().map_err(|e| e.into()) self.service.poll_ready() }