diff --git a/examples/framed_hello.rs b/examples/framed_hello.rs index ff3977854..74b0f7dfd 100644 --- a/examples/framed_hello.rs +++ b/examples/framed_hello.rs @@ -2,8 +2,8 @@ use std::{env, io}; use actix_codec::Framed; use actix_http::{h1, Response, SendResponse, ServiceConfig}; -use actix_server::Server; -use actix_service::NewService; +use actix_server::{Io, Server}; +use actix_service::{fn_service, NewService}; use actix_utils::framed::IntoFramed; use actix_utils::stream::TakeItem; use futures::Future; @@ -14,7 +14,8 @@ fn main() -> io::Result<()> { Server::build() .bind("framed_hello", "127.0.0.1:8080", || { - IntoFramed::new(|| h1::Codec::new(ServiceConfig::default())) + fn_service(|io: Io<_>| Ok(io.into_parts().0)) + .and_then(IntoFramed::new(|| h1::Codec::new(ServiceConfig::default()))) .and_then(TakeItem::new().map_err(|_| ())) .and_then(|(_req, _framed): (_, Framed<_, _>)| { SendResponse::send(_framed, Response::Ok().body("Hello world!")) diff --git a/src/builder.rs b/src/builder.rs index 1df96b0e1..2f7466a90 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -95,7 +95,7 @@ where // } /// Finish service configuration and create *http service* for HTTP/1 protocol. - pub fn h1(self, service: F) -> H1Service + pub fn h1(self, service: F) -> H1Service where B: MessageBody + 'static, F: IntoNewService, @@ -110,7 +110,7 @@ where } /// Finish service configuration and create *http service* for HTTP/2 protocol. - pub fn h2(self, service: F) -> H2Service + pub fn h2(self, service: F) -> H2Service where B: MessageBody + 'static, F: IntoNewService, @@ -125,7 +125,7 @@ where } /// Finish service configuration and create `HttpService` instance. - pub fn finish(self, service: F) -> HttpService + pub fn finish(self, service: F) -> HttpService where B: MessageBody + 'static, F: IntoNewService, diff --git a/src/h1/service.rs b/src/h1/service.rs index e55ff0d99..f3301b9b2 100644 --- a/src/h1/service.rs +++ b/src/h1/service.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use std::marker::PhantomData; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_server_config::ServerConfig as SrvConfig; +use actix_server_config::{Io, ServerConfig as SrvConfig}; use actix_service::{IntoNewService, NewService, Service}; use actix_utils::cloneable::CloneableService; use futures::future::{ok, FutureResult}; @@ -19,13 +19,13 @@ use super::dispatcher::Dispatcher; use super::Message; /// `NewService` implementation for HTTP1 transport -pub struct H1Service { +pub struct H1Service { srv: S, cfg: ServiceConfig, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl H1Service +impl H1Service where S: NewService, S::Error: Debug, @@ -57,7 +57,7 @@ where } } -impl NewService for H1Service +impl NewService for H1Service where T: AsyncRead + AsyncWrite, S: NewService, @@ -66,12 +66,12 @@ where S::Service: 'static, B: MessageBody, { - type Request = T; + type Request = Io; type Response = (); type Error = DispatchError; type InitError = S::InitError; - type Service = H1ServiceHandler; - type Future = H1ServiceResponse; + type Service = H1ServiceHandler; + type Future = H1ServiceResponse; fn new_service(&self, cfg: &SrvConfig) -> Self::Future { H1ServiceResponse { @@ -83,13 +83,13 @@ where } #[doc(hidden)] -pub struct H1ServiceResponse, B> { +pub struct H1ServiceResponse, B> { fut: ::Future, cfg: Option, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl Future for H1ServiceResponse +impl Future for H1ServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, @@ -98,7 +98,7 @@ where S::Response: Into>, B: MessageBody, { - type Item = H1ServiceHandler; + type Item = H1ServiceHandler; type Error = S::InitError; fn poll(&mut self) -> Poll { @@ -111,20 +111,20 @@ where } /// `Service` implementation for HTTP1 transport -pub struct H1ServiceHandler { +pub struct H1ServiceHandler { srv: CloneableService, cfg: ServiceConfig, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl H1ServiceHandler +impl H1ServiceHandler where S: Service, S::Error: Debug, S::Response: Into>, B: MessageBody, { - fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler { + fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler { H1ServiceHandler { srv: CloneableService::new(srv), cfg, @@ -133,7 +133,7 @@ where } } -impl Service for H1ServiceHandler +impl Service for H1ServiceHandler where T: AsyncRead + AsyncWrite, S: Service, @@ -141,7 +141,7 @@ where S::Response: Into>, B: MessageBody, { - type Request = T; + type Request = Io; type Response = (); type Error = DispatchError; type Future = Dispatcher; @@ -153,19 +153,19 @@ where }) } - fn call(&mut self, req: T) -> Self::Future { - Dispatcher::new(req, self.cfg.clone(), self.srv.clone()) + fn call(&mut self, req: Self::Request) -> Self::Future { + Dispatcher::new(req.into_parts().0, self.cfg.clone(), self.srv.clone()) } } /// `NewService` implementation for `OneRequestService` service #[derive(Default)] -pub struct OneRequest { +pub struct OneRequest { config: ServiceConfig, - _t: PhantomData, + _t: PhantomData<(T, P)>, } -impl OneRequest +impl OneRequest where T: AsyncRead + AsyncWrite, { @@ -178,15 +178,15 @@ where } } -impl NewService for OneRequest +impl NewService for OneRequest where T: AsyncRead + AsyncWrite, { - type Request = T; + type Request = Io; type Response = (Request, Framed); type Error = ParseError; type InitError = (); - type Service = OneRequestService; + type Service = OneRequestService; type Future = FutureResult; fn new_service(&self, _: &SrvConfig) -> Self::Future { @@ -199,16 +199,16 @@ where /// `Service` implementation for HTTP1 transport. Reads one request and returns /// request and framed object. -pub struct OneRequestService { +pub struct OneRequestService { config: ServiceConfig, - _t: PhantomData, + _t: PhantomData<(T, P)>, } -impl Service for OneRequestService +impl Service for OneRequestService where T: AsyncRead + AsyncWrite, { - type Request = T; + type Request = Io; type Response = (Request, Framed); type Error = ParseError; type Future = OneRequestServiceResponse; @@ -217,9 +217,12 @@ where Ok(Async::Ready(())) } - fn call(&mut self, req: T) -> Self::Future { + fn call(&mut self, req: Self::Request) -> Self::Future { OneRequestServiceResponse { - framed: Some(Framed::new(req, Codec::new(self.config.clone()))), + framed: Some(Framed::new( + req.into_parts().0, + Codec::new(self.config.clone()), + )), } } } diff --git a/src/h2/service.rs b/src/h2/service.rs index ce7c3b5dd..6ab37919c 100644 --- a/src/h2/service.rs +++ b/src/h2/service.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::{io, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_server_config::ServerConfig as SrvConfig; +use actix_server_config::{Io, ServerConfig as SrvConfig}; use actix_service::{IntoNewService, NewService, Service}; use actix_utils::cloneable::CloneableService; use bytes::Bytes; @@ -23,13 +23,13 @@ use crate::response::Response; use super::dispatcher::Dispatcher; /// `NewService` implementation for HTTP2 transport -pub struct H2Service { +pub struct H2Service { srv: S, cfg: ServiceConfig, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl H2Service +impl H2Service where S: NewService, S::Service: 'static, @@ -61,7 +61,7 @@ where } } -impl NewService for H2Service +impl NewService for H2Service where T: AsyncRead + AsyncWrite, S: NewService, @@ -70,12 +70,12 @@ where S::Response: Into>, B: MessageBody + 'static, { - type Request = T; + type Request = Io; type Response = (); type Error = DispatchError; type InitError = S::InitError; - type Service = H2ServiceHandler; - type Future = H2ServiceResponse; + type Service = H2ServiceHandler; + type Future = H2ServiceResponse; fn new_service(&self, cfg: &SrvConfig) -> Self::Future { H2ServiceResponse { @@ -87,13 +87,13 @@ where } #[doc(hidden)] -pub struct H2ServiceResponse, B> { +pub struct H2ServiceResponse, B> { fut: ::Future, cfg: Option, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl Future for H2ServiceResponse +impl Future for H2ServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, @@ -102,7 +102,7 @@ where S::Error: Debug, B: MessageBody + 'static, { - type Item = H2ServiceHandler; + type Item = H2ServiceHandler; type Error = S::InitError; fn poll(&mut self) -> Poll { @@ -115,20 +115,20 @@ where } /// `Service` implementation for http/2 transport -pub struct H2ServiceHandler { +pub struct H2ServiceHandler { srv: CloneableService, cfg: ServiceConfig, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl H2ServiceHandler +impl H2ServiceHandler where S: Service + 'static, S::Error: Debug, S::Response: Into>, B: MessageBody + 'static, { - fn new(cfg: ServiceConfig, srv: S) -> H2ServiceHandler { + fn new(cfg: ServiceConfig, srv: S) -> H2ServiceHandler { H2ServiceHandler { cfg, srv: CloneableService::new(srv), @@ -137,7 +137,7 @@ where } } -impl Service for H2ServiceHandler +impl Service for H2ServiceHandler where T: AsyncRead + AsyncWrite, S: Service + 'static, @@ -145,7 +145,7 @@ where S::Response: Into>, B: MessageBody + 'static, { - type Request = T; + type Request = Io; type Response = (); type Error = DispatchError; type Future = H2ServiceHandlerResponse; @@ -157,12 +157,12 @@ where }) } - fn call(&mut self, req: T) -> Self::Future { + fn call(&mut self, req: Self::Request) -> Self::Future { H2ServiceHandlerResponse { state: State::Handshake( Some(self.srv.clone()), Some(self.cfg.clone()), - server::handshake(req), + server::handshake(req.into_parts().0), ), } } diff --git a/src/service/service.rs b/src/service/service.rs index ac28c77a5..3ddf55739 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::{fmt, io}; use actix_codec::{AsyncRead, AsyncWrite, Framed, FramedParts}; -use actix_server_config::ServerConfig as SrvConfig; +use actix_server_config::{Io as ServerIo, Protocol, ServerConfig as SrvConfig}; use actix_service::{IntoNewService, NewService, Service}; use actix_utils::cloneable::CloneableService; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -20,13 +20,27 @@ use crate::response::Response; use crate::{h1, h2::Dispatcher}; /// `NewService` HTTP1.1/HTTP2 transport implementation -pub struct HttpService { +pub struct HttpService { srv: S, cfg: ServiceConfig, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl HttpService +impl HttpService +where + S: NewService, + S::Service: 'static, + S::Error: Debug + 'static, + S::Response: Into>, + B: MessageBody + 'static, +{ + /// Create builder for `HttpService` instance. + pub fn build() -> HttpServiceBuilder { + HttpServiceBuilder::new() + } +} + +impl HttpService where S: NewService, S::Service: 'static, @@ -56,14 +70,9 @@ where _t: PhantomData, } } - - /// Create builder for `HttpService` instance. - pub fn build() -> HttpServiceBuilder { - HttpServiceBuilder::new() - } } -impl NewService for HttpService +impl NewService for HttpService where T: AsyncRead + AsyncWrite + 'static, S: NewService, @@ -72,12 +81,12 @@ where S::Response: Into>, B: MessageBody + 'static, { - type Request = T; + type Request = ServerIo; type Response = (); type Error = DispatchError; type InitError = S::InitError; - type Service = HttpServiceHandler; - type Future = HttpServiceResponse; + type Service = HttpServiceHandler; + type Future = HttpServiceResponse; fn new_service(&self, cfg: &SrvConfig) -> Self::Future { HttpServiceResponse { @@ -89,13 +98,13 @@ where } #[doc(hidden)] -pub struct HttpServiceResponse, B> { +pub struct HttpServiceResponse, B> { fut: ::Future, cfg: Option, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl Future for HttpServiceResponse +impl Future for HttpServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, @@ -104,7 +113,7 @@ where S::Error: Debug, B: MessageBody + 'static, { - type Item = HttpServiceHandler; + type Item = HttpServiceHandler; type Error = S::InitError; fn poll(&mut self) -> Poll { @@ -117,20 +126,20 @@ where } /// `Service` implementation for http transport -pub struct HttpServiceHandler { +pub struct HttpServiceHandler { srv: CloneableService, cfg: ServiceConfig, - _t: PhantomData<(T, B)>, + _t: PhantomData<(T, P, B)>, } -impl HttpServiceHandler +impl HttpServiceHandler where S: Service + 'static, S::Error: Debug, S::Response: Into>, B: MessageBody + 'static, { - fn new(cfg: ServiceConfig, srv: S) -> HttpServiceHandler { + fn new(cfg: ServiceConfig, srv: S) -> HttpServiceHandler { HttpServiceHandler { cfg, srv: CloneableService::new(srv), @@ -139,7 +148,7 @@ where } } -impl Service for HttpServiceHandler +impl Service for HttpServiceHandler where T: AsyncRead + AsyncWrite + 'static, S: Service + 'static, @@ -147,7 +156,7 @@ where S::Response: Into>, B: MessageBody + 'static, { - type Request = T; + type Request = ServerIo; type Response = (); type Error = DispatchError; type Future = HttpServiceHandlerResponse; @@ -159,14 +168,37 @@ where }) } - fn call(&mut self, req: T) -> Self::Future { - HttpServiceHandlerResponse { - state: State::Unknown(Some(( - req, - BytesMut::with_capacity(14), - self.cfg.clone(), - self.srv.clone(), - ))), + fn call(&mut self, req: Self::Request) -> Self::Future { + let (io, params, proto) = req.into_parts(); + match proto { + Protocol::Http2 => { + let io = Io { + inner: io, + unread: None, + }; + HttpServiceHandlerResponse { + state: State::Handshake(Some(( + server::handshake(io), + self.cfg.clone(), + self.srv.clone(), + ))), + } + } + Protocol::Http10 | Protocol::Http11 => HttpServiceHandlerResponse { + state: State::H1(h1::Dispatcher::new( + io, + self.cfg.clone(), + self.srv.clone(), + )), + }, + _ => HttpServiceHandlerResponse { + state: State::Unknown(Some(( + io, + BytesMut::with_capacity(14), + self.cfg.clone(), + self.srv.clone(), + ))), + }, } } } diff --git a/tests/test_server.rs b/tests/test_server.rs index 7a28bca8a..3771d35c6 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -49,7 +49,7 @@ fn test_h1_2() { } #[cfg(feature = "ssl")] -fn ssl_acceptor() -> std::io::Result> { +fn ssl_acceptor() -> std::io::Result> { use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; // load ssl keys let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 4111ca3db..634b9acdd 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -2,7 +2,8 @@ use std::io; use actix_codec::Framed; use actix_http_test::TestServer; -use actix_service::NewService; +use actix_server::Io; +use actix_service::{fn_service, NewService}; use actix_utils::framed::IntoFramed; use actix_utils::stream::TakeItem; use bytes::{Bytes, BytesMut}; @@ -35,7 +36,8 @@ fn ws_service(req: ws::Frame) -> impl Future| Ok(io.into_parts().0)) + .and_then(IntoFramed::new(|| h1::Codec::new(ServiceConfig::default()))) .and_then(TakeItem::new().map_err(|_| ())) .and_then(|(req, framed): (_, Framed<_, _>)| { // validate request