1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 21:51:06 +01:00

rename Server => ServerHandler (#407)

This commit is contained in:
Rob Ede 2021-11-01 23:36:51 +00:00 committed by GitHub
parent 1c8fcaebbc
commit 581e599209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 96 additions and 81 deletions

View File

@ -1,5 +1,6 @@
[alias] [alias]
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" lint = "clippy --workspace --tests --examples --bins -- -Dclippy::todo"
lint-all = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture"

View File

@ -1,8 +1,12 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Rename `Server` to `ServerHandle`. [#407]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#407]
* Minimum supported Rust version (MSRV) is now 1.52. * Minimum supported Rust version (MSRV) is now 1.52.
[#407]: https://github.com/actix/actix-net/pull/407
## 2.0.0-beta.6 - 2021-10-11 ## 2.0.0-beta.6 - 2021-10-11
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374] * Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]

View File

@ -8,7 +8,7 @@ use actix_rt::{
use log::{debug, error, info}; use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server; use crate::server::ServerHandle;
use crate::socket::MioListener; use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept}; use crate::worker::{Conn, WorkerHandleAccept};
@ -30,13 +30,13 @@ struct ServerSocketInfo {
/// ///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. /// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop { pub(crate) struct AcceptLoop {
srv: Option<Server>, srv: Option<ServerHandle>,
poll: Option<Poll>, poll: Option<Poll>,
waker: WakerQueue, waker: WakerQueue,
} }
impl AcceptLoop { impl AcceptLoop {
pub fn new(srv: Server) -> Self { pub fn new(srv: ServerHandle) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry()) let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
@ -74,7 +74,7 @@ struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: ServerHandle,
next: usize, next: usize,
avail: Availability, avail: Availability,
paused: bool, paused: bool,
@ -153,7 +153,7 @@ impl Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(usize, MioListener)>, socks: Vec<(usize, MioListener)>,
srv: Server, srv: ServerHandle,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
) { ) {
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
@ -176,7 +176,7 @@ impl Accept {
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(usize, MioListener)>, socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: ServerHandle,
) -> (Accept, Vec<ServerSocketInfo>) { ) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks let sockets = socks
.into_iter() .into_iter()

View File

@ -15,7 +15,7 @@ use tokio::sync::{
use crate::accept::AcceptLoop; use crate::accept::AcceptLoop;
use crate::join_all; use crate::join_all;
use crate::server::{Server, ServerCommand}; use crate::server::{ServerCommand, ServerHandle};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
@ -35,7 +35,7 @@ pub struct ServerBuilder {
exit: bool, exit: bool,
no_signals: bool, no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>, cmd: UnboundedReceiver<ServerCommand>,
server: Server, server: ServerHandle,
notify: Vec<oneshot::Sender<()>>, notify: Vec<oneshot::Sender<()>>,
worker_config: ServerWorkerConfig, worker_config: ServerWorkerConfig,
} }
@ -50,7 +50,7 @@ impl ServerBuilder {
/// Create new Server builder instance /// Create new Server builder instance
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let server = Server::new(tx); let server = ServerHandle::new(tx);
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
@ -71,8 +71,8 @@ impl ServerBuilder {
/// Set number of workers to start. /// Set number of workers to start.
/// ///
/// By default server uses number of available logical cpu as workers /// By default server uses number of available logical CPU as workers count. Workers must be
/// count. Workers must be greater than 0. /// greater than 0.
pub fn workers(mut self, num: usize) -> Self { pub fn workers(mut self, num: usize) -> Self {
assert_ne!(num, 0, "workers must be greater than 0"); assert_ne!(num, 0, "workers must be greater than 0");
self.threads = num; self.threads = num;
@ -99,10 +99,9 @@ impl ServerBuilder {
/// Set the maximum number of pending connections. /// Set the maximum number of pending connections.
/// ///
/// This refers to the number of clients that can be waiting to be served. /// This refers to the number of clients that can be waiting to be served. Exceeding this number
/// Exceeding this number results in the client getting an error when /// results in the client getting an error when attempting to connect. It should only affect
/// attempting to connect. It should only affect servers under significant /// servers under significant load.
/// load.
/// ///
/// Generally set in the 64-2048 range. Default value is 2048. /// Generally set in the 64-2048 range. Default value is 2048.
/// ///
@ -114,15 +113,21 @@ impl ServerBuilder {
/// Sets the maximum per-worker number of concurrent connections. /// Sets the maximum per-worker number of concurrent connections.
/// ///
/// All socket listeners will stop accepting connections when this limit is /// All socket listeners will stop accepting connections when this limit is reached for
/// reached for each worker. /// each worker.
/// ///
/// By default max connections is set to a 25k per worker. /// By default max connections is set to a 25k per worker.
pub fn maxconn(mut self, num: usize) -> Self { pub fn max_concurrent_connections(mut self, num: usize) -> Self {
self.worker_config.max_concurrent_connections(num); self.worker_config.max_concurrent_connections(num);
self self
} }
#[doc(hidden)]
#[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
pub fn maxconn(self, num: usize) -> Self {
self.max_concurrent_connections(num)
}
/// Stop Actix system. /// Stop Actix system.
pub fn system_exit(mut self) -> Self { pub fn system_exit(mut self) -> Self {
self.exit = true; self.exit = true;
@ -191,8 +196,8 @@ impl ServerBuilder {
} }
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
/// Useful when running as a systemd service and ///
/// a socket FD can be acquired using the systemd crate. /// Useful when running as a systemd service and a socket FD is acquired externally.
#[cfg(unix)] #[cfg(unix)]
pub fn listen_uds<F, N: AsRef<str>>( pub fn listen_uds<F, N: AsRef<str>>(
mut self, mut self,
@ -246,7 +251,7 @@ impl ServerBuilder {
} }
/// Starts processing incoming connections and return server controller. /// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server { pub fn run(mut self) -> ServerHandle {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("Server should have at least one bound socket"); panic!("Server should have at least one bound socket");
} else { } else {

View File

@ -15,7 +15,7 @@ mod waker_queue;
mod worker; mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::server::Server; pub use self::server::{Server, ServerHandle};
pub use self::service::ServiceFactory; pub use self::service::ServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;

View File

@ -24,6 +24,17 @@ pub(crate) enum ServerCommand {
Notify(oneshot::Sender<()>), Notify(oneshot::Sender<()>),
} }
#[derive(Debug)]
#[non_exhaustive]
pub struct Server;
impl Server {
/// Start server building process.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
}
/// Server handle. /// Server handle.
/// ///
/// # Shutdown Signals /// # Shutdown Signals
@ -32,19 +43,14 @@ pub(crate) enum ServerCommand {
/// ///
/// A graceful shutdown will wait for all workers to stop first. /// A graceful shutdown will wait for all workers to stop first.
#[derive(Debug)] #[derive(Debug)]
pub struct Server( pub struct ServerHandle(
UnboundedSender<ServerCommand>, UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>, Option<oneshot::Receiver<()>>,
); );
impl Server { impl ServerHandle {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self { pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None) ServerHandle(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
} }
pub(crate) fn signal(&self, sig: Signal) { pub(crate) fn signal(&self, sig: Signal) {
@ -91,13 +97,13 @@ impl Server {
} }
} }
impl Clone for Server { impl Clone for ServerHandle {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self(self.0.clone(), None) Self(self.0.clone(), None)
} }
} }
impl Future for Server { impl Future for ServerHandle {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -2,7 +2,7 @@ use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::server::Server; use crate::server::ServerHandle;
/// Types of process signals. /// Types of process signals.
#[allow(dead_code)] #[allow(dead_code)]
@ -20,7 +20,7 @@ pub(crate) enum Signal {
/// Process signal listener. /// Process signal listener.
pub(crate) struct Signals { pub(crate) struct Signals {
srv: Server, srv: ServerHandle,
#[cfg(not(unix))] #[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
@ -31,7 +31,7 @@ pub(crate) struct Signals {
impl Signals { impl Signals {
/// Spawns a signal listening future that is able to send commands to the `Server`. /// Spawns a signal listening future that is able to send commands to the `Server`.
pub(crate) fn start(srv: Server) { pub(crate) fn start(srv: ServerHandle) {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
actix_rt::spawn(Signals { actix_rt::spawn(Signals {

View File

@ -170,7 +170,7 @@ async fn test_max_concurrent_connections() {
// Set a relative higher backlog. // Set a relative higher backlog.
.backlog(12) .backlog(12)
// max connection for a worker is 3. // max connection for a worker is 3.
.maxconn(max_conn) .max_concurrent_connections(max_conn)
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, move || {

View File

@ -9,26 +9,25 @@ use pin_project_lite::pin_project;
use super::{Service, ServiceFactory}; use super::{Service, ServiceFactory};
/// Service for the `map_err` combinator, changing the type of a service's /// Service for the `map_err` combinator, changing the type of a service's error.
/// error.
/// ///
/// This is created by the `ServiceExt::map_err` method. /// This is created by the `ServiceExt::map_err` method.
pub struct MapErr<S, Req, F, E> { pub struct MapErr<S, Req, F, E> {
service: S, service: S,
f: F, mapper: F,
_t: PhantomData<(E, Req)>, _t: PhantomData<(E, Req)>,
} }
impl<S, Req, F, E> MapErr<S, Req, F, E> { impl<S, Req, F, E> MapErr<S, Req, F, E> {
/// Create new `MapErr` combinator /// Create new `MapErr` combinator
pub(crate) fn new(service: S, f: F) -> Self pub(crate) fn new(service: S, mapper: F) -> Self
where where
S: Service<Req>, S: Service<Req>,
F: Fn(S::Error) -> E, F: Fn(S::Error) -> E,
{ {
Self { Self {
service, service,
f, mapper,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -42,7 +41,7 @@ where
fn clone(&self) -> Self { fn clone(&self) -> Self {
MapErr { MapErr {
service: self.service.clone(), service: self.service.clone(),
f: self.f.clone(), mapper: self.mapper.clone(),
_t: PhantomData, _t: PhantomData,
} }
} }
@ -58,11 +57,11 @@ where
type Future = MapErrFuture<A, Req, F, E>; type Future = MapErrFuture<A, Req, F, E>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(&self.f) self.service.poll_ready(ctx).map_err(&self.mapper)
} }
fn call(&self, req: Req) -> Self::Future { fn call(&self, req: Req) -> Self::Future {
MapErrFuture::new(self.service.call(req), self.f.clone()) MapErrFuture::new(self.service.call(req), self.mapper.clone())
} }
} }
@ -105,23 +104,23 @@ where
/// service's error. /// service's error.
/// ///
/// This is created by the `NewServiceExt::map_err` method. /// This is created by the `NewServiceExt::map_err` method.
pub struct MapErrServiceFactory<A, Req, F, E> pub struct MapErrServiceFactory<SF, Req, F, E>
where where
A: ServiceFactory<Req>, SF: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone, F: Fn(SF::Error) -> E + Clone,
{ {
a: A, a: SF,
f: F, f: F,
e: PhantomData<(E, Req)>, e: PhantomData<(E, Req)>,
} }
impl<A, Req, F, E> MapErrServiceFactory<A, Req, F, E> impl<SF, Req, F, E> MapErrServiceFactory<SF, Req, F, E>
where where
A: ServiceFactory<Req>, SF: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone, F: Fn(SF::Error) -> E + Clone,
{ {
/// Create new `MapErr` new service instance /// Create new `MapErr` new service instance
pub(crate) fn new(a: A, f: F) -> Self { pub(crate) fn new(a: SF, f: F) -> Self {
Self { Self {
a, a,
f, f,
@ -130,10 +129,10 @@ where
} }
} }
impl<A, Req, F, E> Clone for MapErrServiceFactory<A, Req, F, E> impl<SF, Req, F, E> Clone for MapErrServiceFactory<SF, Req, F, E>
where where
A: ServiceFactory<Req> + Clone, SF: ServiceFactory<Req> + Clone,
F: Fn(A::Error) -> E + Clone, F: Fn(SF::Error) -> E + Clone,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
@ -144,57 +143,57 @@ where
} }
} }
impl<A, Req, F, E> ServiceFactory<Req> for MapErrServiceFactory<A, Req, F, E> impl<SF, Req, F, E> ServiceFactory<Req> for MapErrServiceFactory<SF, Req, F, E>
where where
A: ServiceFactory<Req>, SF: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone, F: Fn(SF::Error) -> E + Clone,
{ {
type Response = A::Response; type Response = SF::Response;
type Error = E; type Error = E;
type Config = A::Config; type Config = SF::Config;
type Service = MapErr<A::Service, Req, F, E>; type Service = MapErr<SF::Service, Req, F, E>;
type InitError = A::InitError; type InitError = SF::InitError;
type Future = MapErrServiceFuture<A, Req, F, E>; type Future = MapErrServiceFuture<SF, Req, F, E>;
fn new_service(&self, cfg: A::Config) -> Self::Future { fn new_service(&self, cfg: SF::Config) -> Self::Future {
MapErrServiceFuture::new(self.a.new_service(cfg), self.f.clone()) MapErrServiceFuture::new(self.a.new_service(cfg), self.f.clone())
} }
} }
pin_project! { pin_project! {
pub struct MapErrServiceFuture<A, Req, F, E> pub struct MapErrServiceFuture<SF, Req, F, E>
where where
A: ServiceFactory<Req>, SF: ServiceFactory<Req>,
F: Fn(A::Error) -> E, F: Fn(SF::Error) -> E,
{ {
#[pin] #[pin]
fut: A::Future, fut: SF::Future,
f: F, mapper: F,
} }
} }
impl<A, Req, F, E> MapErrServiceFuture<A, Req, F, E> impl<SF, Req, F, E> MapErrServiceFuture<SF, Req, F, E>
where where
A: ServiceFactory<Req>, SF: ServiceFactory<Req>,
F: Fn(A::Error) -> E, F: Fn(SF::Error) -> E,
{ {
fn new(fut: A::Future, f: F) -> Self { fn new(fut: SF::Future, mapper: F) -> Self {
MapErrServiceFuture { fut, f } MapErrServiceFuture { fut, mapper }
} }
} }
impl<A, Req, F, E> Future for MapErrServiceFuture<A, Req, F, E> impl<SF, Req, F, E> Future for MapErrServiceFuture<SF, Req, F, E>
where where
A: ServiceFactory<Req>, SF: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone, F: Fn(SF::Error) -> E + Clone,
{ {
type Output = Result<MapErr<A::Service, Req, F, E>, A::InitError>; type Output = Result<MapErr<SF::Service, Req, F, E>, SF::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? { if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) Poll::Ready(Ok(MapErr::new(svc, this.mapper.clone())))
} else { } else {
Poll::Pending Poll::Pending
} }

View File

@ -30,7 +30,7 @@ use std::{
}; };
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::{Server, ServerHandle};
use actix_service::ServiceFactoryExt as _; use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok; use futures_util::future::ok;