1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-31 11:32:10 +01:00

convert Server::bind to accept a normal service factory

This commit is contained in:
Rob Ede 2021-10-25 18:03:52 +01:00
parent 81421c2ba9
commit 4c0eaca581
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
17 changed files with 277 additions and 128 deletions

View File

@ -1,11 +1,13 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Rename `Server` to `ServerHandle`. [#???] * Rename `Server` to `ServerHandle`. [#403]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#???] * Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403]
* Remove wrapper `service::ServiceFactory` trait. [#403]
* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403]
* Minimum supported Rust version (MSRV) is now 1.52. * Minimum supported Rust version (MSRV) is now 1.52.
[#???]: https://github.com/actix/actix-net/pull/??? [#403]: https://github.com/actix/actix-net/pull/403
## 2.0.0-beta.6 - 2021-10-11 ## 2.0.0-beta.6 - 2021-10-11

View File

@ -38,4 +38,4 @@ actix-rt = "2.0.0"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1.5.1", features = ["io-util"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@ -0,0 +1,25 @@
use std::io;
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::fn_service;
use log::info;
#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info"))
.init();
let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);
Server::build()
.bind(
"startup-fail",
addr,
fn_service(move |mut _stream: TcpStream| async move { Ok::<u32, u32>(42) }),
)?
.workers(2)
.run()
.await
}

View File

@ -21,7 +21,6 @@ use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _}; use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info}; use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -39,11 +38,11 @@ async fn main() -> io::Result<()> {
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs // logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker. // to return a service *factory*; so it can be created once per worker.
Server::build() Server::build()
.bind("echo", addr, move || { .bind("echo", addr, {
let count = Arc::clone(&count); let count = Arc::clone(&count);
let num2 = Arc::clone(&count); let num2 = Arc::clone(&count);
fn_service(move |mut stream: TcpStream| { let svc = fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count); let count = Arc::clone(&count);
async move { async move {
@ -78,11 +77,15 @@ async fn main() -> io::Result<()> {
} }
}) })
.map_err(|err| error!("Service Error: {:?}", err)) .map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| { .and_then_send(move |(_, size)| {
let num = num2.load(Ordering::SeqCst); let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size); info!("[{}] total bytes read: {}", num, size);
ok(size) async move { Ok(size) }
}) });
let svc2 = svc.clone();
svc2
})? })?
.workers(1) .workers(1)
.run() .run()

View File

@ -1,4 +1,5 @@
use std::{ use std::{
fmt,
future::Future, future::Future,
io, mem, io, mem,
pin::Pin, pin::Pin,
@ -7,32 +8,25 @@ use std::{
}; };
use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use actix_service::ServiceFactory;
use log::{error, info}; use log::{error, info};
use tokio::sync::{ use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver}, mpsc::{unbounded_channel, UnboundedReceiver},
oneshot, oneshot,
}; };
use crate::accept::AcceptLoop; use crate::{
use crate::join_all; accept::AcceptLoop,
use crate::server::{ServerCommand, ServerHandle}; join_all,
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; server::{ServerCommand, ServerHandle},
use crate::signals::{Signal, Signals}; service::{InternalServiceFactory, StreamNewService},
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; signals::{Signal, Signals},
use crate::socket::{MioTcpListener, MioTcpSocket}; socket::{
use crate::waker_queue::{WakerInterest, WakerQueue}; MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; },
waker_queue::{WakerInterest, WakerQueue},
#[derive(Debug)] worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer},
#[non_exhaustive] };
pub struct Server;
impl Server {
/// Start server building process.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
}
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
@ -169,38 +163,48 @@ impl ServerBuilder {
/// Binds to all network interface addresses that resolve from the `addr` argument. /// Binds to all network interface addresses that resolve from the `addr` argument.
/// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct /// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct
/// interfaces at the same time by passing a list of socket addresses. /// interfaces at the same time by passing a list of socket addresses.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, InitErr>(
mut self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
U: ToSocketAddrs, U: ToSocketAddrs,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets { for lst in sockets {
let token = self.next_token(); let token = self.next_token();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory.clone(), factory.clone(),
lst.local_addr()?, lst.local_addr()?,
)); ));
self.sockets self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); .push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
} }
Ok(self) Ok(self)
} }
/// Bind server to existing TCP listener. /// Bind server to existing TCP listener.
/// ///
/// Useful when running as a systemd service and a socket FD can be passed to the process. /// Useful when running as a systemd service and a socket FD can be passed to the process.
pub fn listen<F, N: AsRef<str>>( pub fn listen<F, InitErr>(
mut self, mut self,
name: N, name: impl AsRef<str>,
lst: StdTcpListener, lst: StdTcpListener,
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{ {
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
@ -259,7 +263,7 @@ impl ServerBuilder {
Signals::start(self.server.clone()); Signals::start(self.server.clone());
} }
// start http server actor // start http server
let server = self.server.clone(); let server = self.server.clone();
rt::spawn(self); rt::spawn(self);
server server
@ -402,11 +406,19 @@ impl ServerBuilder {
#[cfg(unix)] #[cfg(unix)]
impl ServerBuilder { impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind_uds<F, U, InitErr>(
self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
N: AsRef<str>, + Send
+ Clone
+ 'static,
U: AsRef<std::path::Path>, U: AsRef<std::path::Path>,
InitErr: fmt::Debug + Send + 'static,
{ {
// The path must not exist when we try to bind. // The path must not exist when we try to bind.
// Try to remove it to avoid bind error. // Try to remove it to avoid bind error.
@ -424,14 +436,18 @@ impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
/// ///
/// Useful when running as a systemd service and a socket FD can be passed to the process. /// Useful when running as a systemd service and a socket FD can be passed to the process.
pub fn listen_uds<F, N: AsRef<str>>( pub fn listen_uds<F, InitErr>(
mut self, mut self,
name: N, name: impl AsRef<str>,
lst: crate::socket::StdUnixListener, lst: crate::socket::StdUnixListener,
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
+ Send
+ Clone
+ 'static,
InitErr: fmt::Debug + Send + 'static,
{ {
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};

View File

@ -14,9 +14,8 @@ mod test_server;
mod waker_queue; mod waker_queue;
mod worker; mod worker;
pub use self::builder::{Server, ServerBuilder}; pub use self::builder::ServerBuilder;
pub use self::server::{ServerHandle}; pub use self::server::{Server, ServerHandle};
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;
#[doc(hidden)] #[doc(hidden)]

View File

@ -6,7 +6,18 @@ use std::task::{Context, Poll};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::signals::Signal; use crate::{signals::Signal, ServerBuilder};
#[derive(Debug)]
#[non_exhaustive]
pub struct Server;
impl Server {
/// Start server building process.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum ServerCommand { pub(crate) enum ServerCommand {

View File

@ -1,20 +1,19 @@
use std::marker::PhantomData; use std::{
use std::net::SocketAddr; fmt,
use std::task::{Context, Poll}; marker::PhantomData,
net::SocketAddr,
task::{Context, Poll},
};
use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::future::{ready, Ready}; use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::{
use crate::worker::WorkerCounterGuard; socket::{FromStream, MioStream},
worker::WorkerCounterGuard,
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { };
type Factory: BaseServiceFactory<Stream, Config = ()>;
fn create(&self) -> Self::Factory;
}
pub(crate) trait InternalServiceFactory: Send { pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: usize) -> &str; fn name(&self, token: usize) -> &str;
@ -80,17 +79,18 @@ where
} }
} }
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { pub(crate) struct StreamNewService<F, Io, InitErr> {
name: String, name: String,
inner: F, inner: F,
token: usize, token: usize,
addr: SocketAddr, addr: SocketAddr,
_t: PhantomData<Io>, _t: PhantomData<(Io, InitErr)>,
} }
impl<F, Io> StreamNewService<F, Io> impl<F, Io, InitErr> StreamNewService<F, Io, InitErr>
where where
F: ServiceFactory<Io>, F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
pub(crate) fn create( pub(crate) fn create(
@ -109,9 +109,10 @@ where
} }
} }
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io> impl<F, Io, InitErr> InternalServiceFactory for StreamNewService<F, Io, InitErr>
where where
F: ServiceFactory<Io>, F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
fn name(&self, _: usize) -> &str { fn name(&self, _: usize) -> &str {
@ -130,28 +131,18 @@ where
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
let token = self.token; let token = self.token;
let fut = self.inner.create().new_service(()); let fut = self.inner.new_service(());
Box::pin(async move { Box::pin(async move {
match fut.await { match fut.await {
Ok(inner) => { Ok(inner) => {
let service = Box::new(StreamService::new(inner)) as _; let service = Box::new(StreamService::new(inner)) as _;
Ok((token, service)) Ok((token, service))
} }
Err(_) => Err(()), Err(err) => {
error!("{:?}", err);
Err(())
}
} }
}) })
} }
} }
impl<F, T, I> ServiceFactory<I> for F
where
F: Fn() -> T + Send + Clone + 'static,
T: BaseServiceFactory<I, Config = ()>,
I: FromStream,
{
type Factory = T;
fn create(&self) -> T {
(self)()
}
}

View File

@ -1,9 +1,10 @@
use std::sync::mpsc; use std::sync::mpsc;
use std::{net, thread}; use std::{fmt, net, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
use actix_service::ServiceFactory;
use crate::{Server, ServerBuilder, ServiceFactory}; use crate::{Server, ServerBuilder};
/// A testing server. /// A testing server.
/// ///
@ -64,7 +65,11 @@ impl TestServer {
} }
/// Start new test server with application factory. /// Start new test server with application factory.
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime { pub fn with<F, InitErr>(factory: F) -> TestServerRuntime
where
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread

View File

@ -28,6 +28,8 @@ use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream; use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
const DEFAULT_SHUTDOWN_DURATION: Duration = Duration::from_secs(30);
/// Stop worker message. Returns `true` on successful graceful shutdown. /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute. /// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop { pub(crate) struct Stop {
@ -244,8 +246,9 @@ impl Default for ServerWorkerConfig {
fn default() -> Self { fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime. // 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self { Self {
shutdown_timeout: Duration::from_secs(30), shutdown_timeout: DEFAULT_SHUTDOWN_DURATION,
max_blocking_threads, max_blocking_threads,
max_concurrent_connections: 25_600, max_concurrent_connections: 25_600,
} }
@ -314,6 +317,7 @@ impl ServerWorker {
.await .await
.into_iter() .into_iter()
.collect::<Result<Vec<_>, _>>(); .collect::<Result<Vec<_>, _>>();
let services = match res { let services = match res {
Ok(res) => res Ok(res) => res
.into_iter() .into_iter()
@ -327,8 +331,9 @@ impl ServerWorker {
services services
}) })
.into_boxed_slice(), .into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e); Err(err) => {
error!("Can not start worker: {:?}", err);
Arbiter::current().stop(); Arbiter::current().stop();
return; return;
} }

View File

@ -25,9 +25,7 @@ fn test_bind() {
let srv = Server::build() let srv = Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, fn_service(|_| async { Ok::<_, ()>(()) }))?
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.clone(), actix_rt::System::current()));
@ -56,9 +54,7 @@ fn test_listen() {
let srv = Server::build() let srv = Server::build()
.disable_signals() .disable_signals()
.workers(1) .workers(1)
.listen("test", lst, move || { .listen("test", lst, fn_service(|_| async { Ok::<_, ()>(()) }))?
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.clone(), actix_rt::System::current()));
@ -94,13 +90,15 @@ fn test_start() {
let srv = Server::build() let srv = Server::build()
.backlog(100) .backlog(100)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind(
"test",
addr,
fn_service(|io: TcpStream| async move { fn_service(|io: TcpStream| async move {
let mut f = Framed::new(io, BytesCodec); let mut f = Framed::new(io, BytesCodec);
f.send(Bytes::from_static(b"test")).await.unwrap(); f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(()) Ok::<_, ()>(())
}) }),
})? )?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.clone(), actix_rt::System::current()));
@ -173,7 +171,7 @@ async fn test_max_concurrent_connections() {
.max_concurrent_connections(max_conn) .max_concurrent_connections(max_conn)
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, {
let counter = counter.clone(); let counter = counter.clone();
fn_service(move |_io: TcpStream| { fn_service(move |_io: TcpStream| {
let counter = counter.clone(); let counter = counter.clone();
@ -263,14 +261,14 @@ async fn test_service_restart() {
let server = Server::build() let server = Server::build()
.backlog(1) .backlog(1)
.disable_signals() .disable_signals()
.bind("addr1", addr1, move || { .bind("addr1", addr1, {
let num = num.clone(); let num = num.clone();
fn_factory(move || { fn_factory(move || {
let num = num.clone(); let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) } async move { Ok::<_, ()>(TestService(num)) }
}) })
})? })?
.bind("addr2", addr2, move || { .bind("addr2", addr2, {
let num2 = num2.clone(); let num2 = num2.clone();
fn_factory(move || { fn_factory(move || {
let num2 = num2.clone(); let num2 = num2.clone();
@ -319,6 +317,7 @@ async fn worker_restart() {
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Clone)]
struct TestServiceFactory(Arc<AtomicUsize>); struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory { impl ServiceFactory<TcpStream> for TestServiceFactory {
@ -381,7 +380,7 @@ async fn worker_restart() {
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {
let server = Server::build() let server = Server::build()
.disable_signals() .disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))? .bind("addr", addr, TestServiceFactory(counter.clone()))?
.workers(2) .workers(2)
.run(); .run();

View File

@ -1,6 +1,7 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Add `.and_then_send()` & `AndThenSendServiceFactory` for creating `Send`able chained services. [#403]
## 2.0.1 - 2021-10-11 ## 2.0.1 - 2021-10-11

View File

@ -261,6 +261,91 @@ where
} }
} }
/// `.and_then_send()` service factory combinator
pub struct AndThenSendServiceFactory<A, B, Req>
where
A: ServiceFactory<Req>,
A::Config: Clone,
B: ServiceFactory<
A::Response,
Config = A::Config,
Error = A::Error,
InitError = A::InitError,
>,
{
inner_a: A,
inner_b: B,
_phantom: PhantomData<Req>,
}
impl<A, B, Req> AndThenSendServiceFactory<A, B, Req>
where
A: ServiceFactory<Req>,
A::Config: Clone,
B: ServiceFactory<
A::Response,
Config = A::Config,
Error = A::Error,
InitError = A::InitError,
>,
{
/// Create new `AndThenFactory` combinator
pub(crate) fn new(a: A, b: B) -> Self {
Self {
inner_a: a,
inner_b: b,
_phantom: PhantomData,
}
}
}
impl<A, B, Req> ServiceFactory<Req> for AndThenSendServiceFactory<A, B, Req>
where
A: ServiceFactory<Req>,
A::Config: Clone,
B: ServiceFactory<
A::Response,
Config = A::Config,
Error = A::Error,
InitError = A::InitError,
>,
{
type Response = B::Response;
type Error = A::Error;
type Config = A::Config;
type Service = AndThenService<A::Service, B::Service, Req>;
type InitError = A::InitError;
type Future = AndThenServiceFactoryResponse<A, B, Req>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
AndThenServiceFactoryResponse::new(
self.inner_a.new_service(cfg.clone()),
self.inner_b.new_service(cfg),
)
}
}
impl<A: Clone, B: Clone, Req> Clone for AndThenSendServiceFactory<A, B, Req>
where
A: ServiceFactory<Req>,
A::Config: Clone,
B: ServiceFactory<
A::Response,
Config = A::Config,
Error = A::Error,
InitError = A::InitError,
>,
{
fn clone(&self) -> Self {
Self {
inner_a: self.inner_a.clone(),
inner_b: self.inner_b.clone(),
_phantom: PhantomData,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use alloc::rc::Rc; use alloc::rc::Rc;

View File

@ -1,10 +1,4 @@
use crate::{ use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, and_then::{AndThenSendServiceFactory, AndThenService, AndThenServiceFactory}, map::Map, map_err::MapErr, transform_err::TransformMapInitErr};
and_then::{AndThenService, AndThenServiceFactory},
map::Map,
map_err::MapErr,
transform_err::TransformMapInitErr,
IntoService, IntoServiceFactory, Service, ServiceFactory, Transform,
};
/// An extension trait for [`Service`]s that provides a variety of convenient adapters. /// An extension trait for [`Service`]s that provides a variety of convenient adapters.
pub trait ServiceExt<Req>: Service<Req> { pub trait ServiceExt<Req>: Service<Req> {
@ -105,6 +99,22 @@ pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
{ {
AndThenServiceFactory::new(self, factory.into_factory()) AndThenServiceFactory::new(self, factory.into_factory())
} }
/// Call another service after call to this one has resolved successfully.
fn and_then_send<I, SF1>(self, factory: I) -> AndThenSendServiceFactory<Self, SF1, Req>
where
Self: Sized,
Self::Config: Clone,
I: IntoServiceFactory<SF1, Self::Response>,
SF1: ServiceFactory<
Self::Response,
Config = Self::Config,
Error = Self::Error,
InitError = Self::InitError,
>,
{
AndThenSendServiceFactory::new(self, factory.into_factory())
}
} }
impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {} impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {}

View File

@ -175,13 +175,13 @@ where
factory: I, factory: I,
) -> PipelineFactory< ) -> PipelineFactory<
impl ServiceFactory< impl ServiceFactory<
Req, Req,
Response = SF1::Response, Response = SF1::Response,
Error = SF::Error, Error = SF::Error,
Config = SF::Config, Config = SF::Config,
InitError = SF::InitError, InitError = SF::InitError,
Service = impl Service<Req, Response = SF1::Response, Error = SF::Error> + Clone, Service = impl Service<Req, Response = SF1::Response, Error = SF::Error> + Clone,
> + Clone, > + Clone,
Req, Req,
> >
where where

View File

@ -50,13 +50,11 @@ async fn test_rustls_string() {
#[actix_rt::test] #[actix_rt::test]
async fn test_static_str() { async fn test_static_str() {
let srv = TestServer::with(|| { let srv = TestServer::with(fn_service(|io: TcpStream| async {
fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec);
let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?;
framed.send(Bytes::from_static(b"test")).await?; Ok::<_, io::Error>(())
Ok::<_, io::Error>(()) }));
})
});
let conn = actix_connect::default_connector(); let conn = actix_connect::default_connector();
@ -75,13 +73,11 @@ async fn test_static_str() {
#[actix_rt::test] #[actix_rt::test]
async fn test_new_service() { async fn test_new_service() {
let srv = TestServer::with(|| { let srv = TestServer::with(fn_service(|io: TcpStream| async {
fn_service(|io: TcpStream| async { let mut framed = Framed::new(io, BytesCodec);
let mut framed = Framed::new(io, BytesCodec); framed.send(Bytes::from_static(b"test")).await?;
framed.send(Bytes::from_static(b"test")).await?; Ok::<_, io::Error>(())
Ok::<_, io::Error>(()) }));
})
});
let factory = actix_connect::default_connector_factory(); let factory = actix_connect::default_connector_factory();
@ -133,7 +129,7 @@ async fn test_rustls_uri() {
#[actix_rt::test] #[actix_rt::test]
async fn test_local_addr() { async fn test_local_addr() {
let srv = TestServer::with(|| { let srv = TestServer::with({
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;

View File

@ -38,8 +38,9 @@ async fn custom_resolver() {
async fn custom_resolver_connect() { async fn custom_resolver_connect() {
use trust_dns_resolver::TokioAsyncResolver; use trust_dns_resolver::TokioAsyncResolver;
let srv = let srv = TestServer::with(fn_service(|_io: TcpStream| async {
TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) })); Ok::<_, io::Error>(())
}));
struct MyResolver { struct MyResolver {
trust_dns: TokioAsyncResolver, trust_dns: TokioAsyncResolver,