1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-24 08:22:59 +01:00

added combined http1/2 service

This commit is contained in:
Nikolay Kim 2019-03-06 22:56:34 -08:00
parent e25483a0d5
commit 3b069e0568
13 changed files with 575 additions and 169 deletions

View File

@ -39,7 +39,8 @@ fail = ["failure"]
[dependencies] [dependencies]
#actix-service = "0.3.2" #actix-service = "0.3.2"
actix-codec = "0.1.0" actix-codec = "0.1.1"
#actix-connector = "0.3.0" #actix-connector = "0.3.0"
#actix-utils = "0.3.1" #actix-utils = "0.3.1"

View File

@ -328,12 +328,11 @@ impl ResponseError for cookie::ParseError {
} }
} }
#[derive(Debug, Display)] #[derive(Debug, Display, From)]
/// A set of errors that can occur during dispatching http requests /// A set of errors that can occur during dispatching http requests
pub enum DispatchError<E: fmt::Debug> { pub enum DispatchError {
/// Service error /// Service error
#[display(fmt = "Service specific error: {:?}", _0)] Service,
Service(E),
/// An `io::Error` that occurred while trying to read or write to a network /// An `io::Error` that occurred while trying to read or write to a network
/// stream. /// stream.
@ -373,24 +372,6 @@ pub enum DispatchError<E: fmt::Debug> {
Unknown, Unknown,
} }
impl<E: fmt::Debug> From<ParseError> for DispatchError<E> {
fn from(err: ParseError) -> Self {
DispatchError::Parse(err)
}
}
impl<E: fmt::Debug> From<io::Error> for DispatchError<E> {
fn from(err: io::Error) -> Self {
DispatchError::Io(err)
}
}
impl<E: fmt::Debug> From<h2::Error> for DispatchError<E> {
fn from(err: h2::Error) -> Self {
DispatchError::H2(err)
}
}
/// A set of error that can occure during parsing content type /// A set of error that can occure during parsing content type
#[derive(PartialEq, Debug, Display)] #[derive(PartialEq, Debug, Display)]
pub enum ContentTypeError { pub enum ContentTypeError {

View File

@ -20,7 +20,7 @@ use crate::response::Response;
use super::codec::Codec; use super::codec::Codec;
use super::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use super::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
use super::{H1ServiceResult, Message, MessageType}; use super::{Message, MessageType};
const MAX_PIPELINED_MESSAGES: usize = 16; const MAX_PIPELINED_MESSAGES: usize = 16;
@ -50,7 +50,7 @@ where
service: CloneableService<S>, service: CloneableService<S>,
flags: Flags, flags: Flags,
framed: Framed<T, Codec>, framed: Framed<T, Codec>,
error: Option<DispatchError<S::Error>>, error: Option<DispatchError>,
config: ServiceConfig, config: ServiceConfig,
state: State<S, B>, state: State<S, B>,
@ -93,12 +93,17 @@ where
{ {
/// Create http/1 dispatcher. /// Create http/1 dispatcher.
pub fn new(stream: T, config: ServiceConfig, service: CloneableService<S>) -> Self { pub fn new(stream: T, config: ServiceConfig, service: CloneableService<S>) -> Self {
Dispatcher::with_timeout(stream, config, None, service) Dispatcher::with_timeout(
Framed::new(stream, Codec::new(config.clone())),
config,
None,
service,
)
} }
/// Create http/1 dispatcher with slow request timeout. /// Create http/1 dispatcher with slow request timeout.
pub fn with_timeout( pub fn with_timeout(
stream: T, framed: Framed<T, Codec>,
config: ServiceConfig, config: ServiceConfig,
timeout: Option<Delay>, timeout: Option<Delay>,
service: CloneableService<S>, service: CloneableService<S>,
@ -109,7 +114,6 @@ where
} else { } else {
Flags::empty() Flags::empty()
}; };
let framed = Framed::new(stream, Codec::new(config.clone()));
// keep-alive timer // keep-alive timer
let (ka_expire, ka_timer) = if let Some(delay) = timeout { let (ka_expire, ka_timer) = if let Some(delay) = timeout {
@ -167,7 +171,7 @@ where
} }
/// Flush stream /// Flush stream
fn poll_flush(&mut self) -> Poll<bool, DispatchError<S::Error>> { fn poll_flush(&mut self) -> Poll<bool, DispatchError> {
if !self.framed.is_write_buf_empty() { if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() { match self.framed.poll_complete() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
@ -192,7 +196,7 @@ where
&mut self, &mut self,
message: Response<()>, message: Response<()>,
body: ResponseBody<B>, body: ResponseBody<B>,
) -> Result<State<S, B>, DispatchError<S::Error>> { ) -> Result<State<S, B>, DispatchError> {
self.framed self.framed
.force_send(Message::Item((message, body.length()))) .force_send(Message::Item((message, body.length())))
.map_err(|err| { .map_err(|err| {
@ -210,7 +214,7 @@ where
} }
} }
fn poll_response(&mut self) -> Result<(), DispatchError<S::Error>> { fn poll_response(&mut self) -> Result<(), DispatchError> {
let mut retry = self.can_read(); let mut retry = self.can_read();
loop { loop {
let state = match mem::replace(&mut self.state, State::None) { let state = match mem::replace(&mut self.state, State::None) {
@ -225,7 +229,7 @@ where
None => None, None => None,
}, },
State::ServiceCall(mut fut) => { State::ServiceCall(mut fut) => {
match fut.poll().map_err(DispatchError::Service)? { match fut.poll().map_err(|_| DispatchError::Service)? {
Async::Ready(res) => { Async::Ready(res) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
Some(self.send_response(res, body)?) Some(self.send_response(res, body)?)
@ -283,12 +287,9 @@ where
Ok(()) Ok(())
} }
fn handle_request( fn handle_request(&mut self, req: Request) -> Result<State<S, B>, DispatchError> {
&mut self,
req: Request,
) -> Result<State<S, B>, DispatchError<S::Error>> {
let mut task = self.service.call(req); let mut task = self.service.call(req);
match task.poll().map_err(DispatchError::Service)? { match task.poll().map_err(|_| DispatchError::Service)? {
Async::Ready(res) => { Async::Ready(res) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.send_response(res, body) self.send_response(res, body)
@ -298,7 +299,7 @@ where
} }
/// Process one incoming requests /// Process one incoming requests
pub(self) fn poll_request(&mut self) -> Result<bool, DispatchError<S::Error>> { pub(self) fn poll_request(&mut self) -> Result<bool, DispatchError> {
// limit a mount of non processed requests // limit a mount of non processed requests
if self.messages.len() >= MAX_PIPELINED_MESSAGES { if self.messages.len() >= MAX_PIPELINED_MESSAGES {
return Ok(false); return Ok(false);
@ -400,7 +401,7 @@ where
} }
/// keep-alive timer /// keep-alive timer
fn poll_keepalive(&mut self) -> Result<(), DispatchError<S::Error>> { fn poll_keepalive(&mut self) -> Result<(), DispatchError> {
if self.ka_timer.is_none() { if self.ka_timer.is_none() {
return Ok(()); return Ok(());
} }
@ -469,8 +470,8 @@ where
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
{ {
type Item = H1ServiceResult<T>; type Item = ();
type Error = DispatchError<S::Error>; type Error = DispatchError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -490,7 +491,7 @@ where
} }
if inner.flags.contains(Flags::DISCONNECTED) { if inner.flags.contains(Flags::DISCONNECTED) {
return Ok(Async::Ready(H1ServiceResult::Disconnected)); return Ok(Async::Ready(()));
} }
// keep-alive and stream errors // keep-alive and stream errors
@ -523,14 +524,12 @@ where
}; };
let mut inner = self.inner.take().unwrap(); let mut inner = self.inner.take().unwrap();
if shutdown {
Ok(Async::Ready(H1ServiceResult::Shutdown( // TODO: shutdown
inner.framed.into_inner(), Ok(Async::Ready(()))
))) //Ok(Async::Ready(HttpServiceResult::Shutdown(
} else { // inner.framed.into_inner(),
let req = inner.unhandled.take().unwrap(); //)))
Ok(Async::Ready(H1ServiceResult::Unhandled(req, inner.framed)))
}
} }
} }

View File

@ -1,7 +1,4 @@
//! HTTP/1 implementation //! HTTP/1 implementation
use std::fmt;
use actix_codec::Framed;
use bytes::Bytes; use bytes::Bytes;
mod client; mod client;
@ -18,29 +15,6 @@ pub use self::dispatcher::Dispatcher;
pub use self::payload::{Payload, PayloadBuffer}; pub use self::payload::{Payload, PayloadBuffer};
pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; pub use self::service::{H1Service, H1ServiceHandler, OneRequest};
use crate::request::Request;
/// H1 service response type
pub enum H1ServiceResult<T> {
Disconnected,
Shutdown(T),
Unhandled(Request, Framed<T, Codec>),
}
impl<T: fmt::Debug> fmt::Debug for H1ServiceResult<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
H1ServiceResult::Disconnected => write!(f, "H1ServiceResult::Disconnected"),
H1ServiceResult::Shutdown(ref v) => {
write!(f, "H1ServiceResult::Shutdown({:?})", v)
}
H1ServiceResult::Unhandled(ref req, _) => {
write!(f, "H1ServiceResult::Unhandled({:?})", req)
}
}
}
}
#[derive(Debug)] #[derive(Debug)]
/// Codec message /// Codec message
pub enum Message<T> { pub enum Message<T> {
@ -67,6 +41,7 @@ pub enum MessageType {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::request::Request;
impl Message<Request> { impl Message<Request> {
pub fn message(self) -> Request { pub fn message(self) -> Request {

View File

@ -17,7 +17,7 @@ use crate::response::Response;
use super::codec::Codec; use super::codec::Codec;
use super::dispatcher::Dispatcher; use super::dispatcher::Dispatcher;
use super::{H1ServiceResult, Message}; use super::Message;
/// `NewService` implementation for HTTP1 transport /// `NewService` implementation for HTTP1 transport
pub struct H1Service<T, S, B> { pub struct H1Service<T, S, B> {
@ -72,8 +72,8 @@ where
S::Service: 'static, S::Service: 'static,
B: MessageBody, B: MessageBody,
{ {
type Response = H1ServiceResult<T>; type Response = ();
type Error = DispatchError<S::Error>; type Error = DispatchError;
type InitError = S::InitError; type InitError = S::InitError;
type Service = H1ServiceHandler<T, S::Service, B>; type Service = H1ServiceHandler<T, S::Service, B>;
type Future = H1ServiceResponse<T, S, B>; type Future = H1ServiceResponse<T, S, B>;
@ -275,12 +275,15 @@ where
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
{ {
type Response = H1ServiceResult<T>; type Response = ();
type Error = DispatchError<S::Error>; type Error = DispatchError;
type Future = Dispatcher<T, S, B>; type Future = Dispatcher<T, S, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.srv.poll_ready().map_err(DispatchError::Service) self.srv.poll_ready().map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service
})
} }
fn call(&mut self, req: T) -> Self::Future { fn call(&mut self, req: T) -> Self::Future {

View File

@ -26,8 +26,6 @@ use crate::payload::Payload;
use crate::request::Request; use crate::request::Request;
use crate::response::Response; use crate::response::Response;
use super::H2ServiceResult;
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
bitflags! { bitflags! {
@ -40,7 +38,7 @@ bitflags! {
/// Dispatcher for HTTP/2 protocol /// Dispatcher for HTTP/2 protocol
pub struct Dispatcher< pub struct Dispatcher<
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
B: MessageBody, B: MessageBody,
> { > {
flags: Flags, flags: Flags,
@ -55,8 +53,8 @@ pub struct Dispatcher<
impl<T, S, B> Dispatcher<T, S, B> impl<T, S, B> Dispatcher<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + fmt::Debug, S::Error: fmt::Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
@ -97,13 +95,13 @@ where
impl<T, S, B> Future for Dispatcher<T, S, B> impl<T, S, B> Future for Dispatcher<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + fmt::Debug, S::Error: fmt::Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
type Item = (); type Item = ();
type Error = DispatchError<()>; type Error = DispatchError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -143,21 +141,21 @@ where
} }
} }
struct ServiceResponse<S: Service<Request<Payload>>, B> { struct ServiceResponse<S: Service<Request>, B> {
state: ServiceResponseState<S, B>, state: ServiceResponseState<S, B>,
config: ServiceConfig, config: ServiceConfig,
buffer: Option<Bytes>, buffer: Option<Bytes>,
} }
enum ServiceResponseState<S: Service<Request<Payload>>, B> { enum ServiceResponseState<S: Service<Request>, B> {
ServiceCall(S::Future, Option<SendResponse<Bytes>>), ServiceCall(S::Future, Option<SendResponse<Bytes>>),
SendPayload(SendStream<Bytes>, ResponseBody<B>), SendPayload(SendStream<Bytes>, ResponseBody<B>),
} }
impl<S, B> ServiceResponse<S, B> impl<S, B> ServiceResponse<S, B>
where where
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + fmt::Debug, S::Error: fmt::Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
@ -224,8 +222,8 @@ where
impl<S, B> Future for ServiceResponse<S, B> impl<S, B> Future for ServiceResponse<S, B>
where where
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + fmt::Debug, S::Error: fmt::Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
@ -258,7 +256,7 @@ where
} }
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => { Err(e) => {
let res: Response = e.into().into(); let res: Response = Response::InternalServerError().finish();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
let mut send = send.take().unwrap(); let mut send = send.take().unwrap();

View File

@ -9,26 +9,10 @@ use h2::RecvStream;
mod dispatcher; mod dispatcher;
mod service; mod service;
pub use self::dispatcher::Dispatcher;
pub use self::service::H2Service; pub use self::service::H2Service;
use crate::error::PayloadError; use crate::error::PayloadError;
/// H1 service response type
pub enum H2ServiceResult<T> {
Disconnected,
Shutdown(T),
}
impl<T: fmt::Debug> fmt::Debug for H2ServiceResult<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
H2ServiceResult::Disconnected => write!(f, "H2ServiceResult::Disconnected"),
H2ServiceResult::Shutdown(ref v) => {
write!(f, "H2ServiceResult::Shutdown({:?})", v)
}
}
}
}
/// H2 receive stream /// H2 receive stream
pub struct Payload { pub struct Payload {
pl: RecvStream, pl: RecvStream,

View File

@ -20,7 +20,6 @@ use crate::request::Request;
use crate::response::Response; use crate::response::Response;
use super::dispatcher::Dispatcher; use super::dispatcher::Dispatcher;
use super::H2ServiceResult;
/// `NewService` implementation for HTTP2 transport /// `NewService` implementation for HTTP2 transport
pub struct H2Service<T, S, B> { pub struct H2Service<T, S, B> {
@ -31,14 +30,14 @@ pub struct H2Service<T, S, B> {
impl<T, S, B> H2Service<T, S, B> impl<T, S, B> H2Service<T, S, B>
where where
S: NewService<Request<Payload>>, S: NewService<Request>,
S::Service: 'static, S::Service: 'static,
S::Error: Into<Error> + Debug + 'static, S::Error: Debug + 'static,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
/// Create new `HttpService` instance. /// Create new `HttpService` instance.
pub fn new<F: IntoNewService<S, Request<Payload>>>(service: F) -> Self { pub fn new<F: IntoNewService<S, Request>>(service: F) -> Self {
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0); let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0);
H2Service { H2Service {
@ -57,14 +56,14 @@ where
impl<T, S, B> NewService<T> for H2Service<T, S, B> impl<T, S, B> NewService<T> for H2Service<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: NewService<Request<Payload>>, S: NewService<Request>,
S::Service: 'static, S::Service: 'static,
S::Error: Into<Error> + Debug, S::Error: Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
type Response = (); type Response = ();
type Error = DispatchError<()>; type Error = DispatchError;
type InitError = S::InitError; type InitError = S::InitError;
type Service = H2ServiceHandler<T, S::Service, B>; type Service = H2ServiceHandler<T, S::Service, B>;
type Future = H2ServiceResponse<T, S, B>; type Future = H2ServiceResponse<T, S, B>;
@ -94,9 +93,9 @@ pub struct H2ServiceBuilder<T, S> {
impl<T, S> H2ServiceBuilder<T, S> impl<T, S> H2ServiceBuilder<T, S>
where where
S: NewService<Request<Payload>>, S: NewService<Request>,
S::Service: 'static, S::Service: 'static,
S::Error: Into<Error> + Debug + 'static, S::Error: Debug + 'static,
{ {
/// Create instance of `H2ServiceBuilder` /// Create instance of `H2ServiceBuilder`
pub fn new() -> H2ServiceBuilder<T, S> { pub fn new() -> H2ServiceBuilder<T, S> {
@ -189,30 +188,11 @@ where
self self
} }
// #[cfg(feature = "ssl")]
// /// Configure alpn protocols for SslAcceptorBuilder.
// pub fn configure_openssl(
// builder: &mut openssl::ssl::SslAcceptorBuilder,
// ) -> io::Result<()> {
// let protos: &[u8] = b"\x02h2";
// builder.set_alpn_select_callback(|_, protos| {
// const H2: &[u8] = b"\x02h2";
// if protos.windows(3).any(|window| window == H2) {
// Ok(b"h2")
// } else {
// Err(openssl::ssl::AlpnError::NOACK)
// }
// });
// builder.set_alpn_protos(&protos)?;
// Ok(())
// }
/// Finish service configuration and create `H1Service` instance. /// Finish service configuration and create `H1Service` instance.
pub fn finish<F, B>(self, service: F) -> H2Service<T, S, B> pub fn finish<F, B>(self, service: F) -> H2Service<T, S, B>
where where
B: MessageBody, B: MessageBody,
F: IntoNewService<S, Request<Payload>>, F: IntoNewService<S, Request>,
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,
@ -228,7 +208,7 @@ where
} }
#[doc(hidden)] #[doc(hidden)]
pub struct H2ServiceResponse<T, S: NewService<Request<Payload>>, B> { pub struct H2ServiceResponse<T, S: NewService<Request>, B> {
fut: <S::Future as IntoFuture>::Future, fut: <S::Future as IntoFuture>::Future,
cfg: Option<ServiceConfig>, cfg: Option<ServiceConfig>,
_t: PhantomData<(T, B)>, _t: PhantomData<(T, B)>,
@ -237,10 +217,10 @@ pub struct H2ServiceResponse<T, S: NewService<Request<Payload>>, B> {
impl<T, S, B> Future for H2ServiceResponse<T, S, B> impl<T, S, B> Future for H2ServiceResponse<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: NewService<Request<Payload>>, S: NewService<Request>,
S::Service: 'static, S::Service: 'static,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
S::Error: Into<Error> + Debug, S::Error: Debug,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
type Item = H2ServiceHandler<T, S::Service, B>; type Item = H2ServiceHandler<T, S::Service, B>;
@ -264,8 +244,8 @@ pub struct H2ServiceHandler<T, S: 'static, B> {
impl<T, S, B> H2ServiceHandler<T, S, B> impl<T, S, B> H2ServiceHandler<T, S, B>
where where
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + Debug, S::Error: Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
@ -281,19 +261,19 @@ where
impl<T, S, B> Service<T> for H2ServiceHandler<T, S, B> impl<T, S, B> Service<T> for H2ServiceHandler<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + Debug, S::Error: Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
type Response = (); type Response = ();
type Error = DispatchError<()>; type Error = DispatchError;
type Future = H2ServiceHandlerResponse<T, S, B>; type Future = H2ServiceHandlerResponse<T, S, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.srv.poll_ready().map_err(|e| { self.srv.poll_ready().map_err(|e| {
error!("Service readiness error: {:?}", e); error!("Service readiness error: {:?}", e);
DispatchError::Service(()) DispatchError::Service
}) })
} }
@ -308,11 +288,7 @@ where
} }
} }
enum State< enum State<T: AsyncRead + AsyncWrite, S: Service<Request> + 'static, B: MessageBody> {
T: AsyncRead + AsyncWrite,
S: Service<Request<Payload>> + 'static,
B: MessageBody,
> {
Incoming(Dispatcher<T, S, B>), Incoming(Dispatcher<T, S, B>),
Handshake( Handshake(
Option<CloneableService<S>>, Option<CloneableService<S>>,
@ -324,8 +300,8 @@ enum State<
pub struct H2ServiceHandlerResponse<T, S, B> pub struct H2ServiceHandlerResponse<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + Debug, S::Error: Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
@ -335,13 +311,13 @@ where
impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B> impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
S: Service<Request<Payload>> + 'static, S: Service<Request> + 'static,
S::Error: Into<Error> + Debug, S::Error: Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
{ {
type Item = (); type Item = ();
type Error = DispatchError<()>; type Error = DispatchError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.state { match self.state {

View File

@ -97,6 +97,7 @@ pub use self::message::{Head, Message, RequestHead, ResponseHead};
pub use self::payload::{Payload, PayloadStream}; pub use self::payload::{Payload, PayloadStream};
pub use self::request::Request; pub use self::request::Request;
pub use self::response::Response; pub use self::response::Response;
pub use self::service::HttpService;
pub use self::service::{SendError, SendResponse}; pub use self::service::{SendError, SendResponse};
pub mod dev { pub mod dev {

View File

@ -1,3 +1,5 @@
mod senderror; mod senderror;
mod service;
pub use self::senderror::{SendError, SendResponse}; pub use self::senderror::{SendError, SendResponse};
pub use self::service::HttpService;

446
src/service/service.rs Normal file
View File

@ -0,0 +1,446 @@
use std::fmt::Debug;
use std::marker::PhantomData;
use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed, FramedParts};
use actix_service::{IntoNewService, NewService, Service};
use actix_utils::cloneable::CloneableService;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{try_ready, Async, Future, IntoFuture, Poll};
use h2::server::{self, Handshake};
use log::error;
use crate::body::MessageBody;
use crate::config::{KeepAlive, ServiceConfig};
use crate::error::DispatchError;
use crate::request::Request;
use crate::response::Response;
use crate::{h1, h2::Dispatcher};
/// `NewService` HTTP1.1/HTTP2 transport implementation
pub struct HttpService<T, S, B> {
srv: S,
cfg: ServiceConfig,
_t: PhantomData<(T, B)>,
}
impl<T, S, B> HttpService<T, S, B>
where
S: NewService<Request>,
S::Service: 'static,
S::Error: Debug + 'static,
S::Response: Into<Response<B>>,
B: MessageBody + 'static,
{
/// Create new `HttpService` instance.
pub fn new<F: IntoNewService<S, Request>>(service: F) -> Self {
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0);
HttpService {
cfg,
srv: service.into_new_service(),
_t: PhantomData,
}
}
/// Create builder for `HttpService` instance.
pub fn build() -> HttpServiceBuilder<T, S> {
HttpServiceBuilder::new()
}
}
impl<T, S, B> NewService<T> for HttpService<T, S, B>
where
T: AsyncRead + AsyncWrite + 'static,
S: NewService<Request>,
S::Service: 'static,
S::Error: Debug,
S::Response: Into<Response<B>>,
B: MessageBody + 'static,
{
type Response = ();
type Error = DispatchError;
type InitError = S::InitError;
type Service = HttpServiceHandler<T, S::Service, B>;
type Future = HttpServiceResponse<T, S, B>;
fn new_service(&self, _: &()) -> Self::Future {
HttpServiceResponse {
fut: self.srv.new_service(&()).into_future(),
cfg: Some(self.cfg.clone()),
_t: PhantomData,
}
}
}
/// A http service factory builder
///
/// This type can be used to construct an instance of `ServiceConfig` through a
/// builder-like pattern.
pub struct HttpServiceBuilder<T, S> {
keep_alive: KeepAlive,
client_timeout: u64,
client_disconnect: u64,
host: String,
addr: net::SocketAddr,
secure: bool,
_t: PhantomData<(T, S)>,
}
impl<T, S> HttpServiceBuilder<T, S>
where
S: NewService<Request>,
S::Service: 'static,
S::Error: Debug + 'static,
{
/// Create instance of `HttpServiceBuilder` type
pub fn new() -> HttpServiceBuilder<T, S> {
HttpServiceBuilder {
keep_alive: KeepAlive::Timeout(5),
client_timeout: 5000,
client_disconnect: 0,
secure: false,
host: "localhost".to_owned(),
addr: "127.0.0.1:8080".parse().unwrap(),
_t: PhantomData,
}
}
/// Enable secure flag for current server.
/// This flags also enables `client disconnect timeout`.
///
/// By default this flag is set to false.
pub fn secure(mut self) -> Self {
self.secure = true;
if self.client_disconnect == 0 {
self.client_disconnect = 3000;
}
self
}
/// Set server keep-alive setting.
///
/// By default keep alive is set to a 5 seconds.
pub fn keep_alive<U: Into<KeepAlive>>(mut self, val: U) -> Self {
self.keep_alive = val.into();
self
}
/// Set server client timeout in milliseconds for first request.
///
/// Defines a timeout for reading client request header. If a client does not transmit
/// the entire set headers within this time, the request is terminated with
/// the 408 (Request Time-out) error.
///
/// To disable timeout set value to 0.
///
/// By default client timeout is set to 5000 milliseconds.
pub fn client_timeout(mut self, val: u64) -> Self {
self.client_timeout = val;
self
}
/// Set server connection disconnect timeout in milliseconds.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the request get dropped. This timeout affects secure connections.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3000 milliseconds.
pub fn client_disconnect(mut self, val: u64) -> Self {
self.client_disconnect = val;
self
}
/// Set server host name.
///
/// Host name is used by application router aa a hostname for url
/// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo.
/// html#method.host) documentation for more information.
///
/// By default host name is set to a "localhost" value.
pub fn server_hostname(mut self, val: &str) -> Self {
self.host = val.to_owned();
self
}
/// Set server ip address.
///
/// Host name is used by application router aa a hostname for url
/// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo.
/// html#method.host) documentation for more information.
///
/// By default server address is set to a "127.0.0.1:8080"
pub fn server_address<U: net::ToSocketAddrs>(mut self, addr: U) -> Self {
match addr.to_socket_addrs() {
Err(err) => error!("Can not convert to SocketAddr: {}", err),
Ok(mut addrs) => {
if let Some(addr) = addrs.next() {
self.addr = addr;
}
}
}
self
}
// #[cfg(feature = "ssl")]
// /// Configure alpn protocols for SslAcceptorBuilder.
// pub fn configure_openssl(
// builder: &mut openssl::ssl::SslAcceptorBuilder,
// ) -> io::Result<()> {
// let protos: &[u8] = b"\x02h2";
// builder.set_alpn_select_callback(|_, protos| {
// const H2: &[u8] = b"\x02h2";
// if protos.windows(3).any(|window| window == H2) {
// Ok(b"h2")
// } else {
// Err(openssl::ssl::AlpnError::NOACK)
// }
// });
// builder.set_alpn_protos(&protos)?;
// Ok(())
// }
/// Finish service configuration and create `HttpService` instance.
pub fn finish<F, B>(self, service: F) -> HttpService<T, S, B>
where
B: MessageBody,
F: IntoNewService<S, Request>,
{
let cfg = ServiceConfig::new(
self.keep_alive,
self.client_timeout,
self.client_disconnect,
);
HttpService {
cfg,
srv: service.into_new_service(),
_t: PhantomData,
}
}
}
#[doc(hidden)]
pub struct HttpServiceResponse<T, S: NewService<Request>, B> {
fut: <S::Future as IntoFuture>::Future,
cfg: Option<ServiceConfig>,
_t: PhantomData<(T, B)>,
}
impl<T, S, B> Future for HttpServiceResponse<T, S, B>
where
T: AsyncRead + AsyncWrite,
S: NewService<Request>,
S::Service: 'static,
S::Response: Into<Response<B>>,
S::Error: Debug,
B: MessageBody + 'static,
{
type Item = HttpServiceHandler<T, S::Service, B>;
type Error = S::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let service = try_ready!(self.fut.poll());
Ok(Async::Ready(HttpServiceHandler::new(
self.cfg.take().unwrap(),
service,
)))
}
}
/// `Service` implementation for http transport
pub struct HttpServiceHandler<T, S: 'static, B> {
srv: CloneableService<S>,
cfg: ServiceConfig,
_t: PhantomData<(T, B)>,
}
impl<T, S, B> HttpServiceHandler<T, S, B>
where
S: Service<Request> + 'static,
S::Error: Debug,
S::Response: Into<Response<B>>,
B: MessageBody + 'static,
{
fn new(cfg: ServiceConfig, srv: S) -> HttpServiceHandler<T, S, B> {
HttpServiceHandler {
cfg,
srv: CloneableService::new(srv),
_t: PhantomData,
}
}
}
impl<T, S, B> Service<T> for HttpServiceHandler<T, S, B>
where
T: AsyncRead + AsyncWrite + 'static,
S: Service<Request> + 'static,
S::Error: Debug,
S::Response: Into<Response<B>>,
B: MessageBody + 'static,
{
type Response = ();
type Error = DispatchError;
type Future = HttpServiceHandlerResponse<T, S, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.srv.poll_ready().map_err(|e| {
error!("Service readiness error: {:?}", e);
DispatchError::Service
})
}
fn call(&mut self, req: T) -> Self::Future {
HttpServiceHandlerResponse {
state: State::Unknown(Some((
req,
BytesMut::with_capacity(14),
self.cfg.clone(),
self.srv.clone(),
))),
}
}
}
enum State<T, S: Service<Request> + 'static, B: MessageBody>
where
S::Error: fmt::Debug,
T: AsyncRead + AsyncWrite + 'static,
{
H1(h1::Dispatcher<T, S, B>),
H2(Dispatcher<Io<T>, S, B>),
Unknown(Option<(T, BytesMut, ServiceConfig, CloneableService<S>)>),
Handshake(Option<(Handshake<Io<T>, Bytes>, ServiceConfig, CloneableService<S>)>),
}
pub struct HttpServiceHandlerResponse<T, S, B>
where
T: AsyncRead + AsyncWrite + 'static,
S: Service<Request> + 'static,
S::Error: Debug,
S::Response: Into<Response<B>>,
B: MessageBody + 'static,
{
state: State<T, S, B>,
}
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
impl<T, S, B> Future for HttpServiceHandlerResponse<T, S, B>
where
T: AsyncRead + AsyncWrite,
S: Service<Request> + 'static,
S::Error: Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
{
type Item = ();
type Error = DispatchError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.state {
State::H1(ref mut disp) => disp.poll(),
State::H2(ref mut disp) => disp.poll(),
State::Unknown(ref mut data) => {
if let Some(ref mut item) = data {
loop {
unsafe {
let b = item.1.bytes_mut();
let n = { try_ready!(item.0.poll_read(b)) };
item.1.advance_mut(n);
if item.1.len() >= HTTP2_PREFACE.len() {
break;
}
}
}
} else {
panic!()
}
let (io, buf, cfg, srv) = data.take().unwrap();
if buf[..14] == HTTP2_PREFACE[..] {
let io = Io {
inner: io,
unread: Some(buf),
};
self.state =
State::Handshake(Some((server::handshake(io), cfg, srv)));
} else {
let framed = Framed::from_parts(FramedParts::with_read_buf(
io,
h1::Codec::new(cfg.clone()),
buf,
));
self.state =
State::H1(h1::Dispatcher::with_timeout(framed, cfg, None, srv))
}
self.poll()
}
State::Handshake(ref mut data) => {
let conn = if let Some(ref mut item) = data {
match item.0.poll() {
Ok(Async::Ready(conn)) => conn,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
trace!("H2 handshake error: {}", err);
return Err(err.into());
}
}
} else {
panic!()
};
let (_, cfg, srv) = data.take().unwrap();
self.state = State::H2(Dispatcher::new(srv, conn, cfg, None));
self.poll()
}
}
}
}
/// Wrapper for `AsyncRead + AsyncWrite` types
struct Io<T> {
unread: Option<BytesMut>,
inner: T,
}
impl<T: io::Read> io::Read for Io<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Some(mut bytes) = self.unread.take() {
let size = std::cmp::min(buf.len(), bytes.len());
buf[..size].copy_from_slice(&bytes[..size]);
if bytes.len() > size {
bytes.split_to(size);
self.unread = Some(bytes);
}
Ok(size)
} else {
self.inner.read(buf)
}
}
}
impl<T: io::Write> io::Write for Io<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<T: AsyncRead + 'static> AsyncRead for Io<T> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
}
impl<T: AsyncWrite + 'static> AsyncWrite for Io<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}

View File

@ -73,7 +73,7 @@ impl TestServer {
.start(); .start();
tx.send((System::current(), local_addr)).unwrap(); tx.send((System::current(), local_addr)).unwrap();
sys.run(); sys.run()
}); });
let (system, addr) = rx.recv().unwrap(); let (system, addr) = rx.recv().unwrap();

View File

@ -10,8 +10,8 @@ use futures::stream::once;
use actix_http::body::Body; use actix_http::body::Body;
use actix_http::{ use actix_http::{
body, client, h1, h2, http, Error, HttpMessage as HttpMessage2, KeepAlive, Request, body, client, h1, h2, http, Error, HttpMessage as HttpMessage2, HttpService,
Response, KeepAlive, Request, Response,
}; };
#[test] #[test]
@ -31,6 +31,26 @@ fn test_h1() {
assert!(response.status().is_success()); assert!(response.status().is_success());
} }
#[test]
fn test_h1_2() {
let mut srv = TestServer::new(|| {
HttpService::build()
.keep_alive(KeepAlive::Disabled)
.client_timeout(1000)
.client_disconnect(1000)
.server_hostname("localhost")
.finish(|req: Request| {
assert_eq!(req.version(), http::Version::HTTP_11);
future::ok::<_, ()>(Response::Ok().finish())
})
.map(|_| ())
});
let req = client::ClientRequest::get(srv.url("/")).finish().unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
}
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
fn ssl_acceptor<T>() -> std::io::Result<actix_server::ssl::OpensslAcceptor<T>> { fn ssl_acceptor<T>() -> std::io::Result<actix_server::ssl::OpensslAcceptor<T>> {
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
@ -71,7 +91,30 @@ fn test_h2() -> std::io::Result<()> {
let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap(); let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap();
let response = srv.send_request(req).unwrap(); let response = srv.send_request(req).unwrap();
println!("RES: {:?}", response); assert!(response.status().is_success());
Ok(())
}
#[cfg(feature = "ssl")]
#[test]
fn test_h2_1() -> std::io::Result<()> {
let openssl = ssl_acceptor()?;
let mut srv = TestServer::new(move || {
openssl
.clone()
.map_err(|e| println!("Openssl error: {}", e))
.and_then(
HttpService::build()
.finish(|req: Request| {
assert_eq!(req.version(), http::Version::HTTP_2);
future::ok::<_, Error>(Response::Ok().finish())
})
.map_err(|_| ()),
)
});
let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap();
let response = srv.send_request(req).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
Ok(()) Ok(())
} }
@ -79,9 +122,6 @@ fn test_h2() -> std::io::Result<()> {
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
#[test] #[test]
fn test_h2_body() -> std::io::Result<()> { fn test_h2_body() -> std::io::Result<()> {
// std::env::set_var("RUST_LOG", "actix_http=trace");
// env_logger::init();
let data = "HELLOWORLD".to_owned().repeat(64 * 1024); let data = "HELLOWORLD".to_owned().repeat(64 * 1024);
let openssl = ssl_acceptor()?; let openssl = ssl_acceptor()?;
let mut srv = TestServer::new(move || { let mut srv = TestServer::new(move || {