1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-01 08:45:10 +02:00

allow to specify upgrade service

This commit is contained in:
Nikolay Kim
2019-04-08 14:51:16 -07:00
parent 0a6dd0efdf
commit 43d325a139
7 changed files with 254 additions and 83 deletions

View File

@ -2,7 +2,7 @@ use std::collections::VecDeque;
use std::time::Instant;
use std::{fmt, io};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::Service;
use actix_utils::cloneable::CloneableService;
use bitflags::bitflags;
@ -39,27 +39,32 @@ bitflags! {
}
/// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S, B, X>
pub struct Dispatcher<T, S, B, X, U>
where
S: Service<Request = Request>,
S::Error: Into<Error>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
inner: Option<InnerDispatcher<T, S, B, X>>,
inner: Option<InnerDispatcher<T, S, B, X, U>>,
}
struct InnerDispatcher<T, S, B, X>
struct InnerDispatcher<T, S, B, X, U>
where
S: Service<Request = Request>,
S::Error: Into<Error>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
flags: Flags,
error: Option<DispatchError>,
@ -132,7 +137,7 @@ where
}
}
impl<T, S, B, X> Dispatcher<T, S, B, X>
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
@ -141,6 +146,8 @@ where
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
/// Create http/1 dispatcher.
pub fn new(
@ -148,6 +155,7 @@ where
config: ServiceConfig,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
) -> Self {
Dispatcher::with_timeout(
stream,
@ -157,6 +165,7 @@ where
None,
service,
expect,
upgrade,
)
}
@ -169,6 +178,7 @@ where
timeout: Option<Delay>,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
) -> Self {
let keepalive = config.keep_alive_enabled();
let flags = if keepalive {
@ -198,6 +208,7 @@ where
messages: VecDeque::new(),
service,
expect,
upgrade,
flags,
ka_expire,
ka_timer,
@ -206,7 +217,7 @@ where
}
}
impl<T, S, B, X> InnerDispatcher<T, S, B, X>
impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
@ -215,6 +226,8 @@ where
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn can_read(&self) -> bool {
if self.flags.contains(Flags::READ_DISCONNECT) {
@ -603,7 +616,7 @@ where
}
}
impl<T, S, B, X> Future for Dispatcher<T, S, B, X>
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
@ -612,6 +625,8 @@ where
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
type Item = ();
type Error = DispatchError;