From 4c0eaca5811b20b1295f16474047ba7324985650 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 25 Oct 2021 18:03:52 +0100 Subject: [PATCH] convert Server::bind to accept a normal service factory --- actix-server/CHANGES.md | 8 ++- actix-server/Cargo.toml | 2 +- actix-server/examples/startup-fail.rs | 25 ++++++++ actix-server/examples/tcp-echo.rs | 15 +++-- actix-server/src/builder.rs | 80 +++++++++++++++---------- actix-server/src/lib.rs | 5 +- actix-server/src/server.rs | 13 +++- actix-server/src/service.rs | 57 ++++++++---------- actix-server/src/test_server.rs | 11 +++- actix-server/src/worker.rs | 11 +++- actix-server/tests/test_server.rs | 25 ++++---- actix-service/CHANGES.md | 1 + actix-service/src/and_then.rs | 85 +++++++++++++++++++++++++++ actix-service/src/ext.rs | 24 +++++--- actix-service/src/pipeline.rs | 12 ++-- actix-tls/tests/test_connect.rs | 26 ++++---- actix-tls/tests/test_resolvers.rs | 5 +- 17 files changed, 277 insertions(+), 128 deletions(-) create mode 100644 actix-server/examples/startup-fail.rs diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index a52b6faf..c6b79d44 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,11 +1,13 @@ # Changes ## Unreleased - 2021-xx-xx -* Rename `Server` to `ServerHandle`. [#???] -* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#???] +* Rename `Server` to `ServerHandle`. [#403] +* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403] +* Remove wrapper `service::ServiceFactory` trait. [#403] +* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403] * Minimum supported Rust version (MSRV) is now 1.52. -[#???]: https://github.com/actix/actix-net/pull/??? +[#403]: https://github.com/actix/actix-net/pull/403 ## 2.0.0-beta.6 - 2021-10-11 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 46a0ad1d..a4c403f1 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -38,4 +38,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1.5.1", features = ["io-util"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/startup-fail.rs b/actix-server/examples/startup-fail.rs new file mode 100644 index 00000000..9631b2d1 --- /dev/null +++ b/actix-server/examples/startup-fail.rs @@ -0,0 +1,25 @@ +use std::io; + +use actix_rt::net::TcpStream; +use actix_server::Server; +use actix_service::fn_service; +use log::info; + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info")) + .init(); + + let addr = ("127.0.0.1", 8080); + info!("starting server on port: {}", &addr.0); + + Server::build() + .bind( + "startup-fail", + addr, + fn_service(move |mut _stream: TcpStream| async move { Ok::(42) }), + )? + .workers(2) + .run() + .await +} diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 8b038da4..e5e288fc 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -21,7 +21,6 @@ use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; -use futures_util::future::ok; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -39,11 +38,11 @@ async fn main() -> io::Result<()> { // logical CPU cores as the worker count. For this reason, the closure passed to bind needs // to return a service *factory*; so it can be created once per worker. Server::build() - .bind("echo", addr, move || { + .bind("echo", addr, { let count = Arc::clone(&count); let num2 = Arc::clone(&count); - fn_service(move |mut stream: TcpStream| { + let svc = fn_service(move |mut stream: TcpStream| { let count = Arc::clone(&count); async move { @@ -78,11 +77,15 @@ async fn main() -> io::Result<()> { } }) .map_err(|err| error!("Service Error: {:?}", err)) - .and_then(move |(_, size)| { + .and_then_send(move |(_, size)| { let num = num2.load(Ordering::SeqCst); info!("[{}] total bytes read: {}", num, size); - ok(size) - }) + async move { Ok(size) } + }); + + let svc2 = svc.clone(); + + svc2 })? .workers(1) .run() diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index f6c5f3c1..9eddb776 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,4 +1,5 @@ use std::{ + fmt, future::Future, io, mem, pin::Pin, @@ -7,32 +8,25 @@ use std::{ }; use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; +use actix_service::ServiceFactory; use log::{error, info}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver}, oneshot, }; -use crate::accept::AcceptLoop; -use crate::join_all; -use crate::server::{ServerCommand, ServerHandle}; -use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; - -#[derive(Debug)] -#[non_exhaustive] -pub struct Server; - -impl Server { - /// Start server building process. - pub fn build() -> ServerBuilder { - ServerBuilder::default() - } -} +use crate::{ + accept::AcceptLoop, + join_all, + server::{ServerCommand, ServerHandle}, + service::{InternalServiceFactory, StreamNewService}, + signals::{Signal, Signals}, + socket::{ + MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, + }, + waker_queue::{WakerInterest, WakerQueue}, + worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}, +}; /// Server builder pub struct ServerBuilder { @@ -169,38 +163,48 @@ impl ServerBuilder { /// Binds to all network interface addresses that resolve from the `addr` argument. /// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct /// interfaces at the same time by passing a list of socket addresses. - pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result + pub fn bind( + mut self, + name: impl AsRef, + addr: U, + factory: F, + ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { let token = self.next_token(); + self.services.push(StreamNewService::create( name.as_ref().to_string(), token, factory.clone(), lst.local_addr()?, )); + self.sockets .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } + Ok(self) } /// Bind server to existing TCP listener. /// /// Useful when running as a systemd service and a socket FD can be passed to the process. - pub fn listen>( + pub fn listen( mut self, - name: N, + name: impl AsRef, lst: StdTcpListener, factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, { lst.set_nonblocking(true)?; @@ -259,7 +263,7 @@ impl ServerBuilder { Signals::start(self.server.clone()); } - // start http server actor + // start http server let server = self.server.clone(); rt::spawn(self); server @@ -402,11 +406,19 @@ impl ServerBuilder { #[cfg(unix)] impl ServerBuilder { /// Add new unix domain service to the server. - pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result + pub fn bind_uds( + self, + name: impl AsRef, + addr: U, + factory: F, + ) -> io::Result where - F: ServiceFactory, - N: AsRef, + F: ServiceFactory + + Send + + Clone + + 'static, U: AsRef, + InitErr: fmt::Debug + Send + 'static, { // The path must not exist when we try to bind. // Try to remove it to avoid bind error. @@ -424,14 +436,18 @@ impl ServerBuilder { /// Add new unix domain service to the server. /// /// Useful when running as a systemd service and a socket FD can be passed to the process. - pub fn listen_uds>( + pub fn listen_uds( mut self, - name: N, + name: impl AsRef, lst: crate::socket::StdUnixListener, factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory + + Send + + Clone + + 'static, + InitErr: fmt::Debug + Send + 'static, { use std::net::{IpAddr, Ipv4Addr}; diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 6893d9c1..ca8fd053 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -14,9 +14,8 @@ mod test_server; mod waker_queue; mod worker; -pub use self::builder::{Server, ServerBuilder}; -pub use self::server::{ServerHandle}; -pub use self::service::ServiceFactory; +pub use self::builder::ServerBuilder; +pub use self::server::{Server, ServerHandle}; pub use self::test_server::TestServer; #[doc(hidden)] diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index c515164c..c04bc8be 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -6,7 +6,18 @@ use std::task::{Context, Poll}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; -use crate::signals::Signal; +use crate::{signals::Signal, ServerBuilder}; + +#[derive(Debug)] +#[non_exhaustive] +pub struct Server; + +impl Server { + /// Start server building process. + pub fn build() -> ServerBuilder { + ServerBuilder::default() + } +} #[derive(Debug)] pub(crate) enum ServerCommand { diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 28ffb4f1..77b31afa 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -1,20 +1,19 @@ -use std::marker::PhantomData; -use std::net::SocketAddr; -use std::task::{Context, Poll}; +use std::{ + fmt, + marker::PhantomData, + net::SocketAddr, + task::{Context, Poll}, +}; -use actix_service::{Service, ServiceFactory as BaseServiceFactory}; +use actix_service::{Service, ServiceFactory}; use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; -use crate::socket::{FromStream, MioStream}; -use crate::worker::WorkerCounterGuard; - -pub trait ServiceFactory: Send + Clone + 'static { - type Factory: BaseServiceFactory; - - fn create(&self) -> Self::Factory; -} +use crate::{ + socket::{FromStream, MioStream}, + worker::WorkerCounterGuard, +}; pub(crate) trait InternalServiceFactory: Send { fn name(&self, token: usize) -> &str; @@ -80,17 +79,18 @@ where } } -pub(crate) struct StreamNewService, Io: FromStream> { +pub(crate) struct StreamNewService { name: String, inner: F, token: usize, addr: SocketAddr, - _t: PhantomData, + _t: PhantomData<(Io, InitErr)>, } -impl StreamNewService +impl StreamNewService where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, Io: FromStream + Send + 'static, { pub(crate) fn create( @@ -109,9 +109,10 @@ where } } -impl InternalServiceFactory for StreamNewService +impl InternalServiceFactory for StreamNewService where - F: ServiceFactory, + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, Io: FromStream + Send + 'static, { fn name(&self, _: usize) -> &str { @@ -130,28 +131,18 @@ where fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; - let fut = self.inner.create().new_service(()); + let fut = self.inner.new_service(()); Box::pin(async move { match fut.await { Ok(inner) => { let service = Box::new(StreamService::new(inner)) as _; Ok((token, service)) } - Err(_) => Err(()), + Err(err) => { + error!("{:?}", err); + Err(()) + } } }) } } - -impl ServiceFactory for F -where - F: Fn() -> T + Send + Clone + 'static, - T: BaseServiceFactory, - I: FromStream, -{ - type Factory = T; - - fn create(&self) -> T { - (self)() - } -} diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index ad6ee8ee..77ddad3e 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,9 +1,10 @@ use std::sync::mpsc; -use std::{net, thread}; +use std::{fmt, net, thread}; use actix_rt::{net::TcpStream, System}; +use actix_service::ServiceFactory; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{Server, ServerBuilder}; /// A testing server. /// @@ -64,7 +65,11 @@ impl TestServer { } /// Start new test server with application factory. - pub fn with>(factory: F) -> TestServerRuntime { + pub fn with(factory: F) -> TestServerRuntime + where + F: ServiceFactory + Send + Clone + 'static, + InitErr: fmt::Debug + Send + 'static, + { let (tx, rx) = mpsc::channel(); // run server in separate thread diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a3faf172..89d93b16 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -28,6 +28,8 @@ use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; +const DEFAULT_SHUTDOWN_DURATION: Duration = Duration::from_secs(30); + /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. pub(crate) struct Stop { @@ -244,8 +246,9 @@ impl Default for ServerWorkerConfig { fn default() -> Self { // 512 is the default max blocking thread count of tokio runtime. let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); + Self { - shutdown_timeout: Duration::from_secs(30), + shutdown_timeout: DEFAULT_SHUTDOWN_DURATION, max_blocking_threads, max_concurrent_connections: 25_600, } @@ -314,6 +317,7 @@ impl ServerWorker { .await .into_iter() .collect::, _>>(); + let services = match res { Ok(res) => res .into_iter() @@ -327,8 +331,9 @@ impl ServerWorker { services }) .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); + + Err(err) => { + error!("Can not start worker: {:?}", err); Arbiter::current().stop(); return; } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 5919438b..0605dba2 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -25,9 +25,7 @@ fn test_bind() { let srv = Server::build() .workers(1) .disable_signals() - .bind("test", addr, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) - })? + .bind("test", addr, fn_service(|_| async { Ok::<_, ()>(()) }))? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -56,9 +54,7 @@ fn test_listen() { let srv = Server::build() .disable_signals() .workers(1) - .listen("test", lst, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) - })? + .listen("test", lst, fn_service(|_| async { Ok::<_, ()>(()) }))? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -94,13 +90,15 @@ fn test_start() { let srv = Server::build() .backlog(100) .disable_signals() - .bind("test", addr, move || { + .bind( + "test", + addr, fn_service(|io: TcpStream| async move { let mut f = Framed::new(io, BytesCodec); f.send(Bytes::from_static(b"test")).await.unwrap(); Ok::<_, ()>(()) - }) - })? + }), + )? .run(); let _ = tx.send((srv.clone(), actix_rt::System::current())); @@ -173,7 +171,7 @@ async fn test_max_concurrent_connections() { .max_concurrent_connections(max_conn) .workers(1) .disable_signals() - .bind("test", addr, move || { + .bind("test", addr, { let counter = counter.clone(); fn_service(move |_io: TcpStream| { let counter = counter.clone(); @@ -263,14 +261,14 @@ async fn test_service_restart() { let server = Server::build() .backlog(1) .disable_signals() - .bind("addr1", addr1, move || { + .bind("addr1", addr1, { let num = num.clone(); fn_factory(move || { let num = num.clone(); async move { Ok::<_, ()>(TestService(num)) } }) })? - .bind("addr2", addr2, move || { + .bind("addr2", addr2, { let num2 = num2.clone(); fn_factory(move || { let num2 = num2.clone(); @@ -319,6 +317,7 @@ async fn worker_restart() { use futures_core::future::LocalBoxFuture; use tokio::io::{AsyncReadExt, AsyncWriteExt}; + #[derive(Debug, Clone)] struct TestServiceFactory(Arc); impl ServiceFactory for TestServiceFactory { @@ -381,7 +380,7 @@ async fn worker_restart() { actix_rt::System::new().block_on(async { let server = Server::build() .disable_signals() - .bind("addr", addr, move || TestServiceFactory(counter.clone()))? + .bind("addr", addr, TestServiceFactory(counter.clone()))? .workers(2) .run(); diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index c01fd8dd..5aaffc06 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,6 +1,7 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `.and_then_send()` & `AndThenSendServiceFactory` for creating `Send`able chained services. [#403] ## 2.0.1 - 2021-10-11 diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 38980079..fdf219f0 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -261,6 +261,91 @@ where } } +/// `.and_then_send()` service factory combinator +pub struct AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + inner_a: A, + inner_b: B, + _phantom: PhantomData, +} + +impl AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + /// Create new `AndThenFactory` combinator + pub(crate) fn new(a: A, b: B) -> Self { + Self { + inner_a: a, + inner_b: b, + _phantom: PhantomData, + } + } +} + +impl ServiceFactory for AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + type Response = B::Response; + type Error = A::Error; + + type Config = A::Config; + type Service = AndThenService; + type InitError = A::InitError; + type Future = AndThenServiceFactoryResponse; + + fn new_service(&self, cfg: A::Config) -> Self::Future { + AndThenServiceFactoryResponse::new( + self.inner_a.new_service(cfg.clone()), + self.inner_b.new_service(cfg), + ) + } +} + +impl Clone for AndThenSendServiceFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory< + A::Response, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, +{ + fn clone(&self) -> Self { + Self { + inner_a: self.inner_a.clone(), + inner_b: self.inner_b.clone(), + _phantom: PhantomData, + } + } +} + #[cfg(test)] mod tests { use alloc::rc::Rc; diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index f5fe6ed1..48ddb127 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,10 +1,4 @@ -use crate::{ - and_then::{AndThenService, AndThenServiceFactory}, - map::Map, - map_err::MapErr, - transform_err::TransformMapInitErr, - IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, -}; +use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory}, map::Map, map_err::MapErr, transform_err::TransformMapInitErr}; /// An extension trait for [`Service`]s that provides a variety of convenient adapters. pub trait ServiceExt: Service { @@ -105,6 +99,22 @@ pub trait ServiceFactoryExt: ServiceFactory { { AndThenServiceFactory::new(self, factory.into_factory()) } + + /// Call another service after call to this one has resolved successfully. + fn and_then_send(self, factory: I) -> AndThenSendServiceFactory + where + Self: Sized, + Self::Config: Clone, + I: IntoServiceFactory, + SF1: ServiceFactory< + Self::Response, + Config = Self::Config, + Error = Self::Error, + InitError = Self::InitError, + >, + { + AndThenSendServiceFactory::new(self, factory.into_factory()) + } } impl ServiceFactoryExt for SF where SF: ServiceFactory {} diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 2c71a74b..b3cb72f2 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -175,13 +175,13 @@ where factory: I, ) -> PipelineFactory< impl ServiceFactory< - Req, - Response = SF1::Response, - Error = SF::Error, - Config = SF::Config, - InitError = SF::InitError, + Req, + Response = SF1::Response, + Error = SF::Error, + Config = SF::Config, + InitError = SF::InitError, Service = impl Service + Clone, - > + Clone, + > + Clone, Req, > where diff --git a/actix-tls/tests/test_connect.rs b/actix-tls/tests/test_connect.rs index 564151ce..9738d9e0 100755 --- a/actix-tls/tests/test_connect.rs +++ b/actix-tls/tests/test_connect.rs @@ -50,13 +50,11 @@ async fn test_rustls_string() { #[actix_rt::test] async fn test_static_str() { - let srv = TestServer::with(|| { - fn_service(|io: TcpStream| async { - let mut framed = Framed::new(io, BytesCodec); - framed.send(Bytes::from_static(b"test")).await?; - Ok::<_, io::Error>(()) - }) - }); + let srv = TestServer::with(fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + })); let conn = actix_connect::default_connector(); @@ -75,13 +73,11 @@ async fn test_static_str() { #[actix_rt::test] async fn test_new_service() { - let srv = TestServer::with(|| { - fn_service(|io: TcpStream| async { - let mut framed = Framed::new(io, BytesCodec); - framed.send(Bytes::from_static(b"test")).await?; - Ok::<_, io::Error>(()) - }) - }); + let srv = TestServer::with(fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + })); let factory = actix_connect::default_connector_factory(); @@ -133,7 +129,7 @@ async fn test_rustls_uri() { #[actix_rt::test] async fn test_local_addr() { - let srv = TestServer::with(|| { + let srv = TestServer::with({ fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?; diff --git a/actix-tls/tests/test_resolvers.rs b/actix-tls/tests/test_resolvers.rs index 40ee21fa..37958c98 100644 --- a/actix-tls/tests/test_resolvers.rs +++ b/actix-tls/tests/test_resolvers.rs @@ -38,8 +38,9 @@ async fn custom_resolver() { async fn custom_resolver_connect() { use trust_dns_resolver::TokioAsyncResolver; - let srv = - TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) })); + let srv = TestServer::with(fn_service(|_io: TcpStream| async { + Ok::<_, io::Error>(()) + })); struct MyResolver { trust_dns: TokioAsyncResolver,