1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-26 06:57:43 +02:00

update to latest actix-net

This commit is contained in:
Nikolay Kim
2019-12-02 17:33:11 +06:00
parent 33574403b5
commit f4c01384ec
33 changed files with 941 additions and 898 deletions

View File

@ -8,7 +8,6 @@ use std::{fmt, mem, net};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::Delay;
use actix_server_config::IoStream;
use actix_service::Service;
use bitflags::bitflags;
use bytes::{Bytes, BytesMut};
@ -36,7 +35,10 @@ const CHUNK_SIZE: usize = 16_384;
/// Dispatcher for HTTP/2 protocol
#[pin_project::pin_project]
pub struct Dispatcher<T: IoStream, S: Service<Request = Request>, B: MessageBody> {
pub struct Dispatcher<T, S: Service<Request = Request>, B: MessageBody>
where
T: AsyncRead + AsyncWrite + Unpin,
{
service: CloneableService<S>,
connection: Connection<T, Bytes>,
on_connect: Option<Box<dyn DataFactory>>,
@ -49,7 +51,7 @@ pub struct Dispatcher<T: IoStream, S: Service<Request = Request>, B: MessageBody
impl<T, S, B> Dispatcher<T, S, B>
where
T: IoStream,
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S::Error: Into<Error>,
// S::Future: 'static,
@ -95,7 +97,7 @@ where
impl<T, S, B> Future for Dispatcher<T, S, B>
where
T: IoStream,
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S::Error: Into<Error> + 'static,
S::Future: 'static,

View File

@ -6,8 +6,11 @@ use std::task::{Context, Poll};
use std::{io, net, rc};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig};
use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use actix_rt::net::TcpStream;
use actix_service::{
factory_fn, pipeline_factory, service_fn2, IntoServiceFactory, Service,
ServiceFactory,
};
use bytes::Bytes;
use futures::future::{ok, Ready};
use futures::{ready, Stream};
@ -23,39 +26,28 @@ use crate::helpers::DataFactory;
use crate::payload::Payload;
use crate::request::Request;
use crate::response::Response;
use crate::Protocol;
use super::dispatcher::Dispatcher;
/// `ServiceFactory` implementation for HTTP2 transport
pub struct H2Service<T, P, S, B> {
pub struct H2Service<T, S, B> {
srv: S,
cfg: ServiceConfig,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, P, B)>,
_t: PhantomData<(T, B)>,
}
impl<T, P, S, B> H2Service<T, P, S, B>
impl<T, S, B> H2Service<T, S, B>
where
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S: ServiceFactory<Config = (), Request = Request>,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service>::Future: 'static,
B: MessageBody + 'static,
{
/// Create new `HttpService` instance.
pub fn new<F: IntoServiceFactory<S>>(service: F) -> Self {
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0);
H2Service {
cfg,
on_connect: None,
srv: service.into_factory(),
_t: PhantomData,
}
}
/// Create new `HttpService` instance with config.
pub fn with_config<F: IntoServiceFactory<S>>(
pub(crate) fn with_config<F: IntoServiceFactory<S>>(
cfg: ServiceConfig,
service: F,
) -> Self {
@ -77,24 +69,98 @@ where
}
}
impl<T, P, S, B> ServiceFactory for H2Service<T, P, S, B>
impl<S, B> H2Service<TcpStream, S, B>
where
T: IoStream,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
S: ServiceFactory<Config = (), Request = Request>,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service>::Future: 'static,
B: MessageBody + 'static,
{
type Config = SrvConfig;
type Request = Io<T, P>;
/// Create simple tcp based service
pub fn tcp(
self,
) -> impl ServiceFactory<
Config = (),
Request = TcpStream,
Response = (),
Error = DispatchError,
InitError = S::InitError,
> {
pipeline_factory(factory_fn(|| {
async {
Ok::<_, S::InitError>(service_fn2(|io: TcpStream| {
let peer_addr = io.peer_addr().ok();
ok::<_, DispatchError>((io, peer_addr))
}))
}
}))
.and_then(self)
}
}
#[cfg(feature = "openssl")]
mod openssl {
use actix_service::{factory_fn, service_fn2};
use actix_tls::openssl::{Acceptor, SslStream};
use actix_tls::{openssl::HandshakeError, SslError};
use open_ssl::ssl::SslAcceptor;
use super::*;
impl<S, B> H2Service<SslStream<TcpStream>, S, B>
where
S: ServiceFactory<Config = (), Request = Request>,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service>::Future: 'static,
B: MessageBody + 'static,
{
/// Create ssl based service
pub fn openssl(
self,
acceptor: SslAcceptor,
) -> impl ServiceFactory<
Config = (),
Request = TcpStream,
Response = (),
Error = SslError<HandshakeError<TcpStream>, DispatchError>,
InitError = S::InitError,
> {
pipeline_factory(
Acceptor::new(acceptor)
.map_err(SslError::Ssl)
.map_init_err(|_| panic!()),
)
.and_then(factory_fn(|| {
ok::<_, S::InitError>(service_fn2(|io: SslStream<TcpStream>| {
let peer_addr = io.get_ref().peer_addr().ok();
ok((io, peer_addr))
}))
}))
.and_then(self.map_err(SslError::Service))
}
}
}
impl<T, S, B> ServiceFactory for H2Service<T, S, B>
where
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Config = (), Request = Request>,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service>::Future: 'static,
B: MessageBody + 'static,
{
type Config = ();
type Request = (T, Option<net::SocketAddr>);
type Response = ();
type Error = DispatchError;
type InitError = S::InitError;
type Service = H2ServiceHandler<T, P, S::Service, B>;
type Future = H2ServiceResponse<T, P, S, B>;
type Service = H2ServiceHandler<T, S::Service, B>;
type Future = H2ServiceResponse<T, S, B>;
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
fn new_service(&self, cfg: &()) -> Self::Future {
H2ServiceResponse {
fut: self.srv.new_service(cfg),
cfg: Some(self.cfg.clone()),
@ -106,24 +172,24 @@ where
#[doc(hidden)]
#[pin_project::pin_project]
pub struct H2ServiceResponse<T, P, S: ServiceFactory, B> {
pub struct H2ServiceResponse<T, S: ServiceFactory, B> {
#[pin]
fut: S::Future,
cfg: Option<ServiceConfig>,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, P, B)>,
_t: PhantomData<(T, B)>,
}
impl<T, P, S, B> Future for H2ServiceResponse<T, P, S, B>
impl<T, S, B> Future for H2ServiceResponse<T, S, B>
where
T: IoStream,
S: ServiceFactory<Config = SrvConfig, Request = Request>,
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Config = (), Request = Request>,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service>::Future: 'static,
B: MessageBody + 'static,
{
type Output = Result<H2ServiceHandler<T, P, S::Service, B>, S::InitError>;
type Output = Result<H2ServiceHandler<T, S::Service, B>, S::InitError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.as_mut().project();
@ -140,14 +206,14 @@ where
}
/// `Service` implementation for http/2 transport
pub struct H2ServiceHandler<T, P, S, B> {
pub struct H2ServiceHandler<T, S, B> {
srv: CloneableService<S>,
cfg: ServiceConfig,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
_t: PhantomData<(T, P, B)>,
_t: PhantomData<(T, B)>,
}
impl<T, P, S, B> H2ServiceHandler<T, P, S, B>
impl<T, S, B> H2ServiceHandler<T, S, B>
where
S: Service<Request = Request>,
S::Error: Into<Error> + 'static,
@ -159,7 +225,7 @@ where
cfg: ServiceConfig,
on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
srv: S,
) -> H2ServiceHandler<T, P, S, B> {
) -> H2ServiceHandler<T, S, B> {
H2ServiceHandler {
cfg,
on_connect,
@ -169,16 +235,16 @@ where
}
}
impl<T, P, S, B> Service for H2ServiceHandler<T, P, S, B>
impl<T, S, B> Service for H2ServiceHandler<T, S, B>
where
T: IoStream,
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static,
{
type Request = Io<T, P>;
type Request = (T, Option<net::SocketAddr>);
type Response = ();
type Error = DispatchError;
type Future = H2ServiceHandlerResponse<T, S, B>;
@ -191,9 +257,7 @@ where
})
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let io = req.into_parts().0;
let peer_addr = io.peer_addr();
fn call(&mut self, (io, addr): Self::Request) -> Self::Future {
let on_connect = if let Some(ref on_connect) = self.on_connect {
Some(on_connect(&io))
} else {
@ -204,7 +268,7 @@ where
state: State::Handshake(
Some(self.srv.clone()),
Some(self.cfg.clone()),
peer_addr,
addr,
on_connect,
server::handshake(io),
),
@ -212,8 +276,9 @@ where
}
}
enum State<T: IoStream, S: Service<Request = Request>, B: MessageBody>
enum State<T, S: Service<Request = Request>, B: MessageBody>
where
T: AsyncRead + AsyncWrite + Unpin,
S::Future: 'static,
{
Incoming(Dispatcher<T, S, B>),
@ -228,7 +293,7 @@ where
pub struct H2ServiceHandlerResponse<T, S, B>
where
T: IoStream,
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S::Error: Into<Error> + 'static,
S::Future: 'static,
@ -240,7 +305,7 @@ where
impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B>
where
T: IoStream,
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S::Error: Into<Error> + 'static,
S::Future: 'static,