1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 20:12:58 +01:00

fix tls examples

This commit is contained in:
Rob Ede 2021-10-25 18:42:23 +01:00
parent 9b9869f1dd
commit 448626d543
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
5 changed files with 25 additions and 24 deletions

View File

@ -19,7 +19,7 @@ use crate::{
accept::AcceptLoop,
join_all,
server::{ServerCommand, ServerHandle},
service::{InternalServiceFactory, StreamNewService},
service::{ServerServiceFactory, StreamNewService},
signals::{Signal, Signals},
socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
@ -34,7 +34,7 @@ pub struct ServerBuilder {
token: usize,
backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>,
services: Vec<Box<dyn ServerServiceFactory>>,
sockets: Vec<(usize, String, MioListener)>,
accept: AcceptLoop,
exit: bool,

View File

@ -15,11 +15,12 @@ use crate::{
worker::WorkerCounterGuard,
};
pub(crate) trait InternalServiceFactory: Send {
pub(crate) trait ServerServiceFactory: Send {
fn name(&self, token: usize) -> &str;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn clone_factory(&self) -> Box<dyn ServerServiceFactory>;
/// Initialize Mio stream handler service and return it with its service token.
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
}
@ -55,7 +56,7 @@ where
{
type Response = ();
type Error = ();
type Future = Ready<Result<(), ()>>;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(|_| ())
@ -71,8 +72,8 @@ where
});
Ok(())
}
Err(e) => {
error!("Can not convert to an async tcp stream: {}", e);
Err(err) => {
error!("Can not convert Mio stream to an async TCP stream: {}", err);
Err(())
}
})
@ -98,7 +99,7 @@ where
token: usize,
inner: F,
addr: SocketAddr,
) -> Box<dyn InternalServiceFactory> {
) -> Box<dyn ServerServiceFactory> {
Box::new(Self {
name,
token,
@ -109,7 +110,7 @@ where
}
}
impl<F, Io, InitErr> InternalServiceFactory for StreamNewService<F, Io, InitErr>
impl<F, Io, InitErr> ServerServiceFactory for StreamNewService<F, Io, InitErr>
where
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
@ -119,7 +120,7 @@ where
&self.name
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
fn clone_factory(&self) -> Box<dyn ServerServiceFactory> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
@ -134,8 +135,8 @@ where
let fut = self.inner.new_service(());
Box::pin(async move {
match fut.await {
Ok(inner) => {
let service = Box::new(StreamService::new(inner)) as _;
Ok(svc) => {
let service = Box::new(StreamService::new(svc)) as _;
Ok((token, service))
}
Err(err) => {

View File

@ -24,7 +24,7 @@ use tokio::sync::{
};
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::service::{BoxedServerService, ServerServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
@ -198,7 +198,7 @@ impl WorkerHandleServer {
/// Service worker.
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
/// Worker accepts socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
@ -206,7 +206,7 @@ pub(crate) struct ServerWorker {
rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
factories: Box<[Box<dyn ServerServiceFactory>]>,
state: WorkerState,
shutdown_timeout: Duration,
}
@ -272,7 +272,7 @@ impl ServerWorkerConfig {
impl ServerWorker {
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
factories: Vec<Box<dyn ServerServiceFactory>>,
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {

View File

@ -30,7 +30,7 @@ use std::{
};
use actix_rt::net::TcpStream;
use actix_server::ServerHandle;
use actix_server::Server;
use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok;
@ -67,15 +67,15 @@ async fn main() -> io::Result<()> {
let addr = ("127.0.0.1", 8443);
info!("starting server on port: {}", &addr.0);
ServerHandle::build()
.bind("tls-example", addr, move || {
Server::build()
.bind("tls-example", addr, {
let count = Arc::clone(&count);
// Set up TLS service factory
tls_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| {
.and_then_send(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed);
info!("[{}] Got TLS connection: {:?}", num, &*stream);
ok(())

View File

@ -17,7 +17,7 @@ use actix_tls::connect::{self as actix_connect, Connect};
#[cfg(feature = "openssl")]
#[actix_rt::test]
async fn test_string() {
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@ -34,7 +34,7 @@ async fn test_string() {
#[cfg(feature = "rustls")]
#[actix_rt::test]
async fn test_rustls_string() {
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@ -94,7 +94,7 @@ async fn test_new_service() {
async fn test_openssl_uri() {
use std::convert::TryFrom;
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@ -113,7 +113,7 @@ async fn test_openssl_uri() {
async fn test_rustls_uri() {
use std::convert::TryFrom;
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;