mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-23 22:51:07 +01:00
remove and_then_send
This commit is contained in:
parent
3c6f586b89
commit
5097b12b7c
@ -19,7 +19,7 @@ use std::{
|
|||||||
|
|
||||||
use actix_rt::net::TcpStream;
|
use actix_rt::net::TcpStream;
|
||||||
use actix_server::Server;
|
use actix_server::Server;
|
||||||
use actix_service::{fn_service, ServiceFactoryExt as _};
|
use actix_service::{fn_factory, fn_service, ServiceExt as _};
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
@ -39,53 +39,62 @@ async fn main() -> io::Result<()> {
|
|||||||
// to return a service *factory*; so it can be created once per worker.
|
// to return a service *factory*; so it can be created once per worker.
|
||||||
Server::build()
|
Server::build()
|
||||||
.bind("echo", addr, {
|
.bind("echo", addr, {
|
||||||
let count = Arc::clone(&count);
|
fn_factory::<_, (), _, _, _, _>(move || {
|
||||||
let num2 = Arc::clone(&count);
|
|
||||||
|
|
||||||
let svc = fn_service(move |mut stream: TcpStream| {
|
|
||||||
let count = Arc::clone(&count);
|
let count = Arc::clone(&count);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let num = count.fetch_add(1, Ordering::SeqCst);
|
let count = Arc::clone(&count);
|
||||||
let num = num + 1;
|
let count2 = Arc::clone(&count);
|
||||||
|
|
||||||
let mut size = 0;
|
let svc = fn_service(move |mut stream: TcpStream| {
|
||||||
let mut buf = BytesMut::new();
|
let count = Arc::clone(&count);
|
||||||
|
|
||||||
loop {
|
let num = count.fetch_add(1, Ordering::SeqCst) + 1;
|
||||||
match stream.read_buf(&mut buf).await {
|
|
||||||
// end of stream; bail from loop
|
|
||||||
Ok(0) => break,
|
|
||||||
|
|
||||||
// more bytes to process
|
info!(
|
||||||
Ok(bytes_read) => {
|
"[{}] accepting connection from: {}",
|
||||||
info!("[{}] read {} bytes", num, bytes_read);
|
num,
|
||||||
stream.write_all(&buf[size..]).await.unwrap();
|
stream.peer_addr().unwrap()
|
||||||
size += bytes_read;
|
);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let mut size = 0;
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match stream.read_buf(&mut buf).await {
|
||||||
|
// end of stream; bail from loop
|
||||||
|
Ok(0) => break,
|
||||||
|
|
||||||
|
// more bytes to process
|
||||||
|
Ok(bytes_read) => {
|
||||||
|
info!("[{}] read {} bytes", num, bytes_read);
|
||||||
|
stream.write_all(&buf[size..]).await.unwrap();
|
||||||
|
size += bytes_read;
|
||||||
|
}
|
||||||
|
|
||||||
|
// stream error; bail from loop with error
|
||||||
|
Err(err) => {
|
||||||
|
error!("Stream Error: {:?}", err);
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// stream error; bail from loop with error
|
// send data down service pipeline
|
||||||
Err(err) => {
|
Ok((buf.freeze(), size))
|
||||||
error!("Stream Error: {:?}", err);
|
|
||||||
return Err(());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
.map_err(|err| error!("Service Error: {:?}", err))
|
||||||
|
.and_then(move |(_, size)| {
|
||||||
|
let num = count2.load(Ordering::SeqCst);
|
||||||
|
info!("[{}] total bytes read: {}", num, size);
|
||||||
|
async move { Ok(size) }
|
||||||
|
});
|
||||||
|
|
||||||
// send data down service pipeline
|
Ok::<_, ()>(svc.clone())
|
||||||
Ok((buf.freeze(), size))
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.map_err(|err| error!("Service Error: {:?}", err))
|
|
||||||
.and_then_send(move |(_, size)| {
|
|
||||||
let num = num2.load(Ordering::SeqCst);
|
|
||||||
info!("[{}] total bytes read: {}", num, size);
|
|
||||||
async move { Ok(size) }
|
|
||||||
});
|
|
||||||
|
|
||||||
let svc2 = svc.clone();
|
|
||||||
|
|
||||||
svc2
|
|
||||||
})?
|
})?
|
||||||
.workers(2)
|
.workers(2)
|
||||||
.run()
|
.run()
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
* Add `.and_then_send()` & `AndThenSendServiceFactory` for creating `Send`able chained services. [#403]
|
* `fn_factory[_with_config]` types now impl `Send` even when config, service, request types do not. [#403]
|
||||||
|
|
||||||
|
[#403]: https://github.com/actix/actix-net/pull/403
|
||||||
|
|
||||||
|
|
||||||
## 2.0.1 - 2021-10-11
|
## 2.0.1 - 2021-10-11
|
||||||
|
@ -261,91 +261,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `.and_then_send()` service factory combinator
|
|
||||||
pub struct AndThenSendServiceFactory<A, B, Req>
|
|
||||||
where
|
|
||||||
A: ServiceFactory<Req>,
|
|
||||||
A::Config: Clone,
|
|
||||||
B: ServiceFactory<
|
|
||||||
A::Response,
|
|
||||||
Config = A::Config,
|
|
||||||
Error = A::Error,
|
|
||||||
InitError = A::InitError,
|
|
||||||
>,
|
|
||||||
{
|
|
||||||
inner_a: A,
|
|
||||||
inner_b: B,
|
|
||||||
_phantom: PhantomData<Req>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<A, B, Req> AndThenSendServiceFactory<A, B, Req>
|
|
||||||
where
|
|
||||||
A: ServiceFactory<Req>,
|
|
||||||
A::Config: Clone,
|
|
||||||
B: ServiceFactory<
|
|
||||||
A::Response,
|
|
||||||
Config = A::Config,
|
|
||||||
Error = A::Error,
|
|
||||||
InitError = A::InitError,
|
|
||||||
>,
|
|
||||||
{
|
|
||||||
/// Create new `AndThenFactory` combinator
|
|
||||||
pub(crate) fn new(a: A, b: B) -> Self {
|
|
||||||
Self {
|
|
||||||
inner_a: a,
|
|
||||||
inner_b: b,
|
|
||||||
_phantom: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<A, B, Req> ServiceFactory<Req> for AndThenSendServiceFactory<A, B, Req>
|
|
||||||
where
|
|
||||||
A: ServiceFactory<Req>,
|
|
||||||
A::Config: Clone,
|
|
||||||
B: ServiceFactory<
|
|
||||||
A::Response,
|
|
||||||
Config = A::Config,
|
|
||||||
Error = A::Error,
|
|
||||||
InitError = A::InitError,
|
|
||||||
>,
|
|
||||||
{
|
|
||||||
type Response = B::Response;
|
|
||||||
type Error = A::Error;
|
|
||||||
|
|
||||||
type Config = A::Config;
|
|
||||||
type Service = AndThenService<A::Service, B::Service, Req>;
|
|
||||||
type InitError = A::InitError;
|
|
||||||
type Future = AndThenServiceFactoryResponse<A, B, Req>;
|
|
||||||
|
|
||||||
fn new_service(&self, cfg: A::Config) -> Self::Future {
|
|
||||||
AndThenServiceFactoryResponse::new(
|
|
||||||
self.inner_a.new_service(cfg.clone()),
|
|
||||||
self.inner_b.new_service(cfg),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<A, B, Req> Clone for AndThenSendServiceFactory<A, B, Req>
|
|
||||||
where
|
|
||||||
A: ServiceFactory<Req> + Clone,
|
|
||||||
A::Config: Clone,
|
|
||||||
B: ServiceFactory<
|
|
||||||
A::Response,
|
|
||||||
Config = A::Config,
|
|
||||||
Error = A::Error,
|
|
||||||
InitError = A::InitError,
|
|
||||||
> + Clone,
|
|
||||||
{
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self {
|
|
||||||
inner_a: self.inner_a.clone(),
|
|
||||||
inner_b: self.inner_b.clone(),
|
|
||||||
_phantom: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use alloc::rc::Rc;
|
use alloc::rc::Rc;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory},
|
and_then::{AndThenService, AndThenServiceFactory},
|
||||||
map::Map,
|
map::Map,
|
||||||
map_err::MapErr,
|
map_err::MapErr,
|
||||||
transform_err::TransformMapInitErr,
|
transform_err::TransformMapInitErr,
|
||||||
@ -105,22 +105,6 @@ pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
|
|||||||
{
|
{
|
||||||
AndThenServiceFactory::new(self, factory.into_factory())
|
AndThenServiceFactory::new(self, factory.into_factory())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Call another service after call to this one has resolved successfully.
|
|
||||||
fn and_then_send<I, SF1>(self, factory: I) -> AndThenSendServiceFactory<Self, SF1, Req>
|
|
||||||
where
|
|
||||||
Self: Sized,
|
|
||||||
Self::Config: Clone,
|
|
||||||
I: IntoServiceFactory<SF1, Self::Response>,
|
|
||||||
SF1: ServiceFactory<
|
|
||||||
Self::Response,
|
|
||||||
Config = Self::Config,
|
|
||||||
Error = Self::Error,
|
|
||||||
InitError = Self::InitError,
|
|
||||||
>,
|
|
||||||
{
|
|
||||||
AndThenSendServiceFactory::new(self, factory.into_factory())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {}
|
impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {}
|
||||||
|
@ -3,6 +3,7 @@ use core::{future::Future, marker::PhantomData};
|
|||||||
use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory};
|
use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory};
|
||||||
|
|
||||||
/// Create `ServiceFactory` for function that can act as a `Service`
|
/// Create `ServiceFactory` for function that can act as a `Service`
|
||||||
|
// TODO: remove unnecessary Cfg type param
|
||||||
pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
|
pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
|
||||||
f: F,
|
f: F,
|
||||||
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||||
@ -48,6 +49,7 @@ where
|
|||||||
/// Ok(())
|
/// Ok(())
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
|
// TODO: remove unnecessary Cfg type param
|
||||||
pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>(
|
pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>(
|
||||||
f: F,
|
f: F,
|
||||||
) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
|
) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
|
||||||
@ -160,7 +162,7 @@ where
|
|||||||
Fut: Future<Output = Result<Res, Err>>,
|
Fut: Future<Output = Result<Res, Err>>,
|
||||||
{
|
{
|
||||||
f: F,
|
f: F,
|
||||||
_t: PhantomData<(Req, Cfg)>,
|
_t: PhantomData<fn(Cfg, Req)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||||
@ -237,7 +239,7 @@ where
|
|||||||
Srv: Service<Req>,
|
Srv: Service<Req>,
|
||||||
{
|
{
|
||||||
f: F,
|
f: F,
|
||||||
_t: PhantomData<(Fut, Cfg, Req, Srv, Err)>,
|
_t: PhantomData<fn(Cfg, Req)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Fut, Cfg, Srv, Req, Err> FnServiceConfig<F, Fut, Cfg, Srv, Req, Err>
|
impl<F, Fut, Cfg, Srv, Req, Err> FnServiceConfig<F, Fut, Cfg, Srv, Req, Err>
|
||||||
@ -293,7 +295,7 @@ where
|
|||||||
Fut: Future<Output = Result<Srv, Err>>,
|
Fut: Future<Output = Result<Srv, Err>>,
|
||||||
{
|
{
|
||||||
f: F,
|
f: F,
|
||||||
_t: PhantomData<(Cfg, Req)>,
|
_t: PhantomData<fn(Cfg, Req)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Cfg, Srv, Req, Fut, Err> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
|
impl<F, Cfg, Srv, Req, Fut, Err> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
|
||||||
@ -353,10 +355,11 @@ where
|
|||||||
mod tests {
|
mod tests {
|
||||||
use core::task::Poll;
|
use core::task::Poll;
|
||||||
|
|
||||||
|
use alloc::rc::Rc;
|
||||||
use futures_util::future::lazy;
|
use futures_util::future::lazy;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{ok, Service, ServiceFactory};
|
use crate::{boxed, ok, Service, ServiceExt, ServiceFactory, ServiceFactoryExt};
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_fn_service() {
|
async fn test_fn_service() {
|
||||||
@ -391,4 +394,142 @@ mod tests {
|
|||||||
assert!(res.is_ok());
|
assert!(res.is_ok());
|
||||||
assert_eq!(res.unwrap(), ("srv", 1));
|
assert_eq!(res.unwrap(), ("srv", 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// these three properties of a service factory are usually important
|
||||||
|
fn is_static<T: 'static>(_t: &T) {}
|
||||||
|
fn impls_clone<T: Clone>(_t: &T) {}
|
||||||
|
fn impls_send<T: Send>(_t: &T) {}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_fn_factory_impl_send() {
|
||||||
|
let svc_fac = fn_factory_with_config(|cfg: usize| {
|
||||||
|
ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg))))
|
||||||
|
});
|
||||||
|
is_static(&svc_fac);
|
||||||
|
impls_clone(&svc_fac);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
|
||||||
|
// Cfg type is explicitly !Send
|
||||||
|
let svc_fac = fn_factory_with_config(|cfg: Rc<usize>| {
|
||||||
|
let cfg = Rc::clone(&cfg);
|
||||||
|
ok::<_, ()>(fn_service(move |_: ()| ok::<_, ()>(("srv", *cfg))))
|
||||||
|
});
|
||||||
|
is_static(&svc_fac);
|
||||||
|
impls_clone(&svc_fac);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
|
||||||
|
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
|
||||||
|
ok::<_, ()>(fn_service(move |()| ok::<_, ()>("srv")))
|
||||||
|
});
|
||||||
|
is_static(&svc_fac);
|
||||||
|
impls_clone(&svc_fac);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
|
||||||
|
// Req type is explicitly !Send
|
||||||
|
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
|
||||||
|
ok::<_, ()>(fn_service(move |_: Rc<()>| ok::<_, ()>("srv")))
|
||||||
|
});
|
||||||
|
is_static(&svc_fac);
|
||||||
|
impls_clone(&svc_fac);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
|
||||||
|
// Service type is explicitly !Send
|
||||||
|
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
|
||||||
|
ok::<_, ()>(boxed::rc_service(fn_service(move |_: ()| {
|
||||||
|
ok::<_, ()>("srv")
|
||||||
|
})))
|
||||||
|
});
|
||||||
|
is_static(&svc_fac);
|
||||||
|
impls_clone(&svc_fac);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_service_combinators_impls() {
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct Ident;
|
||||||
|
|
||||||
|
impl<T: 'static> Service<T> for Ident {
|
||||||
|
type Response = T;
|
||||||
|
type Error = ();
|
||||||
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
crate::always_ready!();
|
||||||
|
|
||||||
|
fn call(&self, req: T) -> Self::Future {
|
||||||
|
ok(req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let svc = Ident;
|
||||||
|
is_static(&svc);
|
||||||
|
impls_clone(&svc);
|
||||||
|
impls_send(&svc);
|
||||||
|
|
||||||
|
let svc = ServiceExt::map(Ident, core::convert::identity);
|
||||||
|
impls_send(&svc);
|
||||||
|
svc.call(()).await.unwrap();
|
||||||
|
|
||||||
|
let svc = ServiceExt::map_err(Ident, core::convert::identity);
|
||||||
|
impls_send(&svc);
|
||||||
|
svc.call(()).await.unwrap();
|
||||||
|
|
||||||
|
let svc = ServiceExt::and_then(Ident, Ident);
|
||||||
|
// impls_send(&svc); // fails to compile :(
|
||||||
|
svc.call(()).await.unwrap();
|
||||||
|
|
||||||
|
// let svc = ServiceExt::and_then_send(Ident, Ident);
|
||||||
|
// impls_send(&svc);
|
||||||
|
// svc.call(()).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_factory_combinators_impls() {
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct Ident;
|
||||||
|
|
||||||
|
impl<T: 'static> ServiceFactory<T> for Ident {
|
||||||
|
type Response = T;
|
||||||
|
type Error = ();
|
||||||
|
type Config = ();
|
||||||
|
// explicitly !Send result service
|
||||||
|
type Service = boxed::RcService<T, Self::Response, Self::Error>;
|
||||||
|
type InitError = ();
|
||||||
|
type Future = Ready<Result<Self::Service, Self::Error>>;
|
||||||
|
|
||||||
|
fn new_service(&self, _cfg: Self::Config) -> Self::Future {
|
||||||
|
ok(boxed::rc_service(fn_service(ok)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let svc_fac = Ident;
|
||||||
|
is_static(&svc_fac);
|
||||||
|
impls_clone(&svc_fac);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
|
||||||
|
let svc_fac = ServiceFactoryExt::map(Ident, core::convert::identity);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
let svc = svc_fac.new_service(()).await.unwrap();
|
||||||
|
svc.call(()).await.unwrap();
|
||||||
|
|
||||||
|
let svc_fac = ServiceFactoryExt::map_err(Ident, core::convert::identity);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
let svc = svc_fac.new_service(()).await.unwrap();
|
||||||
|
svc.call(()).await.unwrap();
|
||||||
|
|
||||||
|
let svc_fac = ServiceFactoryExt::map_init_err(Ident, core::convert::identity);
|
||||||
|
impls_send(&svc_fac);
|
||||||
|
let svc = svc_fac.new_service(()).await.unwrap();
|
||||||
|
svc.call(()).await.unwrap();
|
||||||
|
|
||||||
|
let svc_fac = ServiceFactoryExt::and_then(Ident, Ident);
|
||||||
|
// impls_send(&svc_fac); // fails to compile :(
|
||||||
|
let svc = svc_fac.new_service(()).await.unwrap();
|
||||||
|
svc.call(()).await.unwrap();
|
||||||
|
|
||||||
|
// let svc_fac = ServiceFactoryExt::and_then_send(Ident, Ident);
|
||||||
|
// impls_send(&svc_fac);
|
||||||
|
// let svc = svc_fac.new_service(()).await.unwrap();
|
||||||
|
// svc.call(()).await.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,21 +31,24 @@ use std::{
|
|||||||
|
|
||||||
use actix_rt::net::TcpStream;
|
use actix_rt::net::TcpStream;
|
||||||
use actix_server::Server;
|
use actix_server::Server;
|
||||||
use actix_service::ServiceFactoryExt as _;
|
use actix_service::{fn_factory, fn_service, ServiceExt as _, ServiceFactory};
|
||||||
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
|
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
|
||||||
use futures_util::future::ok;
|
use futures_util::future::ok;
|
||||||
use log::info;
|
use log::info;
|
||||||
use rustls::{server::ServerConfig, Certificate, PrivateKey};
|
use rustls::{server::ServerConfig, Certificate, PrivateKey};
|
||||||
use rustls_pemfile::{certs, rsa_private_keys};
|
use rustls_pemfile::{certs, rsa_private_keys};
|
||||||
|
|
||||||
|
const CERT_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/cert.pem"];
|
||||||
|
const KEY_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/key.pem"];
|
||||||
|
|
||||||
#[actix_rt::main]
|
#[actix_rt::main]
|
||||||
async fn main() -> io::Result<()> {
|
async fn main() -> io::Result<()> {
|
||||||
env::set_var("RUST_LOG", "info");
|
env::set_var("RUST_LOG", "info");
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
// Load TLS key and cert files
|
// Load TLS key and cert files
|
||||||
let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap());
|
let cert_file = &mut BufReader::new(File::open(CERT_PATH).unwrap());
|
||||||
let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap());
|
let key_file = &mut BufReader::new(File::open(KEY_PATH).unwrap());
|
||||||
|
|
||||||
let cert_chain = certs(cert_file)
|
let cert_chain = certs(cert_file)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -72,14 +75,30 @@ async fn main() -> io::Result<()> {
|
|||||||
let count = Arc::clone(&count);
|
let count = Arc::clone(&count);
|
||||||
|
|
||||||
// Set up TLS service factory
|
// Set up TLS service factory
|
||||||
tls_acceptor
|
// note: moving rustls acceptor into fn_factory scope
|
||||||
.clone()
|
fn_factory(move || {
|
||||||
.map_err(|err| println!("Rustls error: {:?}", err))
|
// manually call new_service so that and_then can be used from ServiceExt
|
||||||
.and_then_send(move |stream: TlsStream<TcpStream>| {
|
// type annotation for inner stream type is required
|
||||||
let num = count.fetch_add(1, Ordering::Relaxed);
|
let svc = <RustlsAcceptor as ServiceFactory<TcpStream>>::new_service(
|
||||||
info!("[{}] Got TLS connection: {:?}", num, &*stream);
|
&tls_acceptor,
|
||||||
ok(())
|
(),
|
||||||
})
|
);
|
||||||
|
|
||||||
|
let count = Arc::clone(&count);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let svc = svc
|
||||||
|
.await?
|
||||||
|
.map_err(|err| println!("Rustls error: {:?}", err))
|
||||||
|
.and_then(fn_service(move |stream: TlsStream<TcpStream>| {
|
||||||
|
let num = count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
info!("[{}] Got TLS connection: {:?}", num, &*stream);
|
||||||
|
ok(())
|
||||||
|
}));
|
||||||
|
|
||||||
|
Ok::<_, ()>(svc)
|
||||||
|
}
|
||||||
|
})
|
||||||
})?
|
})?
|
||||||
.workers(1)
|
.workers(1)
|
||||||
.run()
|
.run()
|
||||||
|
Loading…
Reference in New Issue
Block a user