1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-25 22:49:21 +02:00
Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
fakeshadow
2021-01-04 07:47:04 +08:00
committed by GitHub
parent 1f202d40e4
commit 32de9f8840
75 changed files with 788 additions and 826 deletions

View File

@ -8,7 +8,7 @@ use std::{
};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use actix_rt::time::{delay_until, Delay, Instant};
use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_service::Service;
use bitflags::bitflags;
use bytes::{Buf, BytesMut};
@ -51,12 +51,12 @@ bitflags! {
/// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S, B, X, U>
where
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
#[pin]
@ -69,12 +69,12 @@ where
#[pin_project(project = DispatcherStateProj)]
enum DispatcherState<T, S, B, X, U>
where
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
Normal(#[pin] InnerDispatcher<T, S, B, X, U>),
@ -84,12 +84,12 @@ where
#[pin_project(project = InnerDispatcherProj)]
struct InnerDispatcher<T, S, B, X, U>
where
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
service: CloneableService<S>,
@ -106,7 +106,8 @@ where
messages: VecDeque<DispatcherMessage>,
ka_expire: Instant,
ka_timer: Option<Delay>,
#[pin]
ka_timer: Option<Sleep>,
io: Option<T>,
read_buf: BytesMut,
@ -123,8 +124,8 @@ enum DispatcherMessage {
#[pin_project(project = StateProj)]
enum State<S, B, X>
where
S: Service<Request = Request>,
X: Service<Request = Request, Response = Request>,
S: Service<Request>,
X: Service<Request, Response = Request>,
B: MessageBody,
{
None,
@ -135,8 +136,8 @@ where
impl<S, B, X> State<S, B, X>
where
S: Service<Request = Request>,
X: Service<Request = Request, Response = Request>,
S: Service<Request>,
X: Service<Request, Response = Request>,
B: MessageBody,
{
fn is_empty(&self) -> bool {
@ -166,13 +167,13 @@ impl PartialEq for PollResponse {
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
/// Create HTTP/1 dispatcher.
@ -205,7 +206,7 @@ where
codec: Codec,
config: ServiceConfig,
read_buf: BytesMut,
timeout: Option<Delay>,
timeout: Option<Sleep>,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
@ -257,13 +258,13 @@ where
impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn can_read(&self, cx: &mut Context<'_>) -> bool {
@ -660,7 +661,7 @@ where
// shutdown timeout
if this.flags.contains(Flags::SHUTDOWN) {
if let Some(interval) = this.codec.config().client_disconnect_timer() {
*this.ka_timer = Some(delay_until(interval));
this.ka_timer.set(Some(sleep_until(interval)));
} else {
this.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = this.payload.take() {
@ -673,12 +674,14 @@ where
}
}
match Pin::new(&mut this.ka_timer.as_mut().unwrap()).poll(cx) {
match this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(()) => {
// if we get timeout during shutdown, drop connection
if this.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout);
} else if this.ka_timer.as_mut().unwrap().deadline() >= *this.ka_expire {
} else if this.ka_timer.as_mut().as_pin_mut().unwrap().deadline()
>= *this.ka_expire
{
// check for any outstanding tasks
if this.state.is_empty() && this.write_buf.is_empty() {
if this.flags.contains(Flags::STARTED) {
@ -689,9 +692,15 @@ where
if let Some(deadline) =
this.codec.config().client_disconnect_timer()
{
if let Some(mut timer) = this.ka_timer.as_mut() {
if let Some(timer) = this.ka_timer.as_mut().as_pin_mut()
{
timer.reset(deadline);
let _ = Pin::new(&mut timer).poll(cx);
let _ = this
.ka_timer
.as_mut()
.as_pin_mut()
.unwrap()
.poll(cx);
}
} else {
// no shutdown timeout, drop socket
@ -716,14 +725,15 @@ where
} else if let Some(deadline) =
this.codec.config().keep_alive_expire()
{
if let Some(mut timer) = this.ka_timer.as_mut() {
if let Some(timer) = this.ka_timer.as_mut().as_pin_mut() {
timer.reset(deadline);
let _ = Pin::new(&mut timer).poll(cx);
let _ =
this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx);
}
}
} else if let Some(mut timer) = this.ka_timer.as_mut() {
} else if let Some(timer) = this.ka_timer.as_mut().as_pin_mut() {
timer.reset(*this.ka_expire);
let _ = Pin::new(&mut timer).poll(cx);
let _ = this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx);
}
}
Poll::Pending => (),
@ -736,13 +746,13 @@ where
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
type Output = Result<(), DispatchError>;
@ -951,12 +961,12 @@ fn read<T>(
where
T: AsyncRead + Unpin,
{
Pin::new(io).poll_read_buf(cx, buf)
actix_codec::poll_read_buf(Pin::new(io), cx, buf)
}
#[cfg(test)]
mod tests {
use std::{marker::PhantomData, str};
use std::str;
use actix_service::fn_service;
use futures_util::future::{lazy, ready};
@ -985,21 +995,19 @@ mod tests {
}
}
fn ok_service() -> impl Service<Request = Request, Response = Response, Error = Error>
{
fn ok_service() -> impl Service<Request, Response = Response, Error = Error> {
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::Ok().finish())))
}
fn echo_path_service(
) -> impl Service<Request = Request, Response = Response, Error = Error> {
fn echo_path_service() -> impl Service<Request, Response = Response, Error = Error> {
fn_service(|req: Request| {
let path = req.path().as_bytes();
ready(Ok::<_, Error>(Response::Ok().body(Body::from_slice(path))))
})
}
fn echo_payload_service(
) -> impl Service<Request = Request, Response = Response, Error = Error> {
fn echo_payload_service() -> impl Service<Request, Response = Response, Error = Error>
{
fn_service(|mut req: Request| {
Box::pin(async move {
use futures_util::stream::StreamExt as _;
@ -1007,7 +1015,7 @@ mod tests {
let mut pl = req.take_payload();
let mut body = BytesMut::new();
while let Some(chunk) = pl.next().await {
body.extend_from_slice(chunk.unwrap().bytes())
body.extend_from_slice(chunk.unwrap().chunk())
}
Ok::<_, Error>(Response::Ok().body(body))
@ -1020,7 +1028,7 @@ mod tests {
lazy(|cx| {
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler<TestBuffer>>::new(
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
ServiceConfig::default(),
CloneableService::new(ok_service()),
@ -1060,7 +1068,7 @@ mod tests {
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler<TestBuffer>>::new(
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
cfg,
CloneableService::new(echo_path_service()),
@ -1114,7 +1122,7 @@ mod tests {
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler<TestBuffer>>::new(
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf,
cfg,
CloneableService::new(echo_path_service()),
@ -1163,7 +1171,7 @@ mod tests {
lazy(|cx| {
let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler<_>>::new(
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
CloneableService::new(echo_payload_service()),
@ -1234,7 +1242,7 @@ mod tests {
lazy(|cx| {
let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler<_>>::new(
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
CloneableService::new(echo_path_service()),
@ -1293,12 +1301,12 @@ mod tests {
lazy(|cx| {
let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler<_>>::new(
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
cfg,
CloneableService::new(ok_service()),
CloneableService::new(ExpectHandler),
Some(CloneableService::new(UpgradeHandler(PhantomData))),
Some(CloneableService::new(UpgradeHandler)),
Extensions::new(),
None,
);

View File

@ -135,7 +135,7 @@ pub(crate) trait MessageType: Sized {
let mut has_date = false;
let mut buf = dst.bytes_mut().as_mut_ptr() as *mut u8;
let mut buf = dst.chunk_mut().as_mut_ptr() as *mut u8;
let mut remaining = dst.capacity() - dst.len();
// tracks bytes written since last buffer resize
@ -177,7 +177,7 @@ pub(crate) trait MessageType: Sized {
// re-assign buf raw pointer since it's possible that the buffer was
// reallocated and/or resized
buf = dst.bytes_mut().as_mut_ptr() as *mut u8;
buf = dst.chunk_mut().as_mut_ptr() as *mut u8;
}
// SAFETY: on each write, it is enough to ensure that the advancement of the
@ -224,7 +224,7 @@ pub(crate) trait MessageType: Sized {
// re-assign buf raw pointer since it's possible that the buffer was
// reallocated and/or resized
buf = dst.bytes_mut().as_mut_ptr() as *mut u8;
buf = dst.chunk_mut().as_mut_ptr() as *mut u8;
}
// SAFETY: on each write, it is enough to ensure that the advancement of

View File

@ -8,11 +8,10 @@ use crate::request::Request;
pub struct ExpectHandler;
impl ServiceFactory for ExpectHandler {
type Config = ();
type Request = Request;
impl ServiceFactory<Request> for ExpectHandler {
type Response = Request;
type Error = Error;
type Config = ();
type Service = ExpectHandler;
type InitError = Error;
type Future = Ready<Result<Self::Service, Self::InitError>>;
@ -22,8 +21,7 @@ impl ServiceFactory for ExpectHandler {
}
}
impl Service for ExpectHandler {
type Request = Request;
impl Service<Request> for ExpectHandler {
type Response = Request;
type Error = Error;
type Future = Ready<Result<Self::Response, Self::Error>>;

View File

@ -24,25 +24,25 @@ use super::dispatcher::Dispatcher;
use super::{ExpectHandler, UpgradeHandler};
/// `ServiceFactory` implementation for HTTP1 transport
pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler<T>> {
pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler> {
srv: S,
cfg: ServiceConfig,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_t: PhantomData<(T, B)>,
_t: PhantomData<B>,
}
impl<T, S, B> H1Service<T, S, B>
where
S: ServiceFactory<Config = (), Request = Request>,
S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
{
/// Create new `HttpService` instance with config.
pub(crate) fn with_config<F: IntoServiceFactory<S>>(
pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
cfg: ServiceConfig,
service: F,
) -> Self {
@ -59,19 +59,15 @@ where
impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
where
S: ServiceFactory<Config = (), Request = Request>,
S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
X: ServiceFactory<Config = (), Request = Request, Response = Request>,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
Config = (),
Request = (Request, Framed<TcpStream, Codec>),
Response = (),
>,
U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -79,8 +75,8 @@ where
pub fn tcp(
self,
) -> impl ServiceFactory<
TcpStream,
Config = (),
Request = TcpStream,
Response = (),
Error = DispatchError,
InitError = (),
@ -97,22 +93,23 @@ where
mod openssl {
use super::*;
use actix_tls::openssl::{Acceptor, SslAcceptor, SslStream};
use actix_tls::{openssl::HandshakeError, TlsError};
use actix_service::ServiceFactoryExt;
use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, SslStream};
use actix_tls::accept::TlsError;
impl<S, B, X, U> H1Service<SslStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Config = (), Request = Request>,
S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
X: ServiceFactory<Config = (), Request = Request, Response = Request>,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
(Request, Framed<SslStream<TcpStream>, Codec>),
Config = (),
Request = (Request, Framed<SslStream<TcpStream>, Codec>),
Response = (),
>,
U::Error: fmt::Display + Into<Error>,
@ -123,10 +120,10 @@ mod openssl {
self,
acceptor: SslAcceptor,
) -> impl ServiceFactory<
TcpStream,
Config = (),
Request = TcpStream,
Response = (),
Error = TlsError<HandshakeError<TcpStream>, DispatchError>,
Error = TlsError<SslError, DispatchError>,
InitError = (),
> {
pipeline_factory(
@ -146,23 +143,24 @@ mod openssl {
#[cfg(feature = "rustls")]
mod rustls {
use super::*;
use actix_tls::rustls::{Acceptor, ServerConfig, TlsStream};
use actix_tls::TlsError;
use actix_service::ServiceFactoryExt;
use actix_tls::accept::rustls::{Acceptor, ServerConfig, TlsStream};
use actix_tls::accept::TlsError;
use std::{fmt, io};
impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Config = (), Request = Request>,
S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
X: ServiceFactory<Config = (), Request = Request, Response = Request>,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
(Request, Framed<TlsStream<TcpStream>, Codec>),
Config = (),
Request = (Request, Framed<TlsStream<TcpStream>, Codec>),
Response = (),
>,
U::Error: fmt::Display + Into<Error>,
@ -173,8 +171,8 @@ mod rustls {
self,
config: ServerConfig,
) -> impl ServiceFactory<
TcpStream,
Config = (),
Request = TcpStream,
Response = (),
Error = TlsError<io::Error, DispatchError>,
InitError = (),
@ -195,7 +193,7 @@ mod rustls {
impl<T, S, B, X, U> H1Service<T, S, B, X, U>
where
S: ServiceFactory<Config = (), Request = Request>,
S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
@ -203,7 +201,7 @@ where
{
pub fn expect<X1>(self, expect: X1) -> H1Service<T, S, B, X1, U>
where
X1: ServiceFactory<Request = Request, Response = Request>,
X1: ServiceFactory<Request, Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
{
@ -219,7 +217,7 @@ where
pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, S, B, X, U1>
where
U1: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
U1: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
{
@ -240,27 +238,27 @@ where
}
}
impl<T, S, B, X, U> ServiceFactory for H1Service<T, S, B, X, U>
impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)>
for H1Service<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Config = (), Request = Request>,
S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
X: ServiceFactory<Config = (), Request = Request, Response = Request>,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<Config = (), Request = (Request, Framed<T, Codec>), Response = ()>,
U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
type Config = ();
type Request = (T, Option<net::SocketAddr>);
type Response = ();
type Error = DispatchError;
type InitError = ();
type Config = ();
type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
type InitError = ();
type Future = H1ServiceResponse<T, S, B, X, U>;
fn new_service(&self, _: ()) -> Self::Future {
@ -281,13 +279,13 @@ where
#[pin_project::pin_project]
pub struct H1ServiceResponse<T, S, B, X, U>
where
S: ServiceFactory<Request = Request>,
S: ServiceFactory<Request>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
X: ServiceFactory<Request = Request, Response = Request>,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
U: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
{
@ -307,15 +305,15 @@ where
impl<T, S, B, X, U> Future for H1ServiceResponse<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request = Request>,
S: ServiceFactory<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
X: ServiceFactory<Request = Request, Response = Request>,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
U: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
{
@ -362,24 +360,29 @@ where
}
/// `Service` implementation for HTTP/1 transport
pub struct H1ServiceHandler<T, S: Service, B, X: Service, U: Service> {
pub struct H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
X: Service<Request>,
U: Service<(Request, Framed<T, Codec>)>,
{
srv: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: ServiceConfig,
_t: PhantomData<(T, B)>,
_t: PhantomData<B>,
}
impl<T, S, B, X, U> H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn new(
@ -400,19 +403,19 @@ where
}
}
impl<T, S, B, X, U> Service for H1ServiceHandler<T, S, B, X, U>
impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)>
for H1ServiceHandler<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display + Into<Error>,
{
type Request = (T, Option<net::SocketAddr>);
type Response = ();
type Error = DispatchError;
type Future = Dispatcher<T, S, B, X, U>;
@ -459,7 +462,7 @@ where
}
}
fn call(&mut self, (io, addr): Self::Request) -> Self::Future {
fn call(&mut self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {
let mut connect_extensions = Extensions::new();
if let Some(ref handler) = self.on_connect_ext {
// run on_connect_ext callback, populating connect extensions

View File

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::task::{Context, Poll};
use actix_codec::Framed;
@ -9,14 +8,13 @@ use crate::error::Error;
use crate::h1::Codec;
use crate::request::Request;
pub struct UpgradeHandler<T>(pub(crate) PhantomData<T>);
pub struct UpgradeHandler;
impl<T> ServiceFactory for UpgradeHandler<T> {
type Config = ();
type Request = (Request, Framed<T, Codec>);
impl<T> ServiceFactory<(Request, Framed<T, Codec>)> for UpgradeHandler {
type Response = ();
type Error = Error;
type Service = UpgradeHandler<T>;
type Config = ();
type Service = UpgradeHandler;
type InitError = Error;
type Future = Ready<Result<Self::Service, Self::InitError>>;
@ -25,8 +23,7 @@ impl<T> ServiceFactory for UpgradeHandler<T> {
}
}
impl<T> Service for UpgradeHandler<T> {
type Request = (Request, Framed<T, Codec>);
impl<T> Service<(Request, Framed<T, Codec>)> for UpgradeHandler {
type Response = ();
type Error = Error;
type Future = Ready<Result<Self::Response, Self::Error>>;
@ -35,7 +32,7 @@ impl<T> Service for UpgradeHandler<T> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: Self::Request) -> Self::Future {
fn call(&mut self, _: (Request, Framed<T, Codec>)) -> Self::Future {
ready(Ok(()))
}
}