1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-30 18:44:36 +01:00

remove ServerMessage type. remove one unused InternalServiceFactory impl (#225)

This commit is contained in:
fakeshadow 2020-12-13 08:46:32 +08:00 committed by GitHub
parent 4e43216b99
commit 049795662f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 66 deletions

View File

@ -8,10 +8,9 @@ use futures_util::future::{ok, Future, FutureExt, LocalBoxFuture};
use log::error; use log::error;
use super::builder::bind_addr; use super::builder::bind_addr;
use super::service::{ use super::service::{BoxedServerService, InternalServiceFactory, StreamService};
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
};
use super::Token; use super::Token;
use crate::socket::StdStream;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
@ -239,7 +238,7 @@ impl ServiceRuntime {
type BoxedNewService = Box< type BoxedNewService = Box<
dyn actix::ServiceFactory< dyn actix::ServiceFactory<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, StdStream),
Response = (), Response = (),
Error = (), Error = (),
InitError = (), InitError = (),
@ -261,12 +260,12 @@ where
T::Error: 'static, T::Error: 'static,
T::InitError: fmt::Debug + 'static, T::InitError: fmt::Debug + 'static,
{ {
type Request = (Option<CounterGuard>, ServerMessage); type Request = (Option<CounterGuard>, StdStream);
type Response = (); type Response = ();
type Error = (); type Error = ();
type InitError = ();
type Config = (); type Config = ();
type Service = BoxedServerService; type Service = BoxedServerService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>; type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {

View File

@ -1,7 +1,6 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::spawn; use actix_rt::spawn;
use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
@ -13,18 +12,6 @@ use log::error;
use super::Token; use super::Token;
use crate::socket::{FromStream, StdStream}; use crate::socket::{FromStream, StdStream};
/// Server message
pub(crate) enum ServerMessage {
/// New stream
Connect(StdStream),
/// Gracefully shutdown
Shutdown(Duration),
/// Force shutdown
ForceShutdown,
}
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: actix::ServiceFactory<Config = (), Request = Stream>; type Factory: actix::ServiceFactory<Config = (), Request = Stream>;
@ -41,7 +28,7 @@ pub(crate) trait InternalServiceFactory: Send {
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
dyn Service< dyn Service<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, StdStream),
Response = (), Response = (),
Error = (), Error = (),
Future = Ready<Result<(), ()>>, Future = Ready<Result<(), ()>>,
@ -65,7 +52,7 @@ where
T::Error: 'static, T::Error: 'static,
I: FromStream, I: FromStream,
{ {
type Request = (Option<CounterGuard>, ServerMessage); type Request = (Option<CounterGuard>, StdStream);
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Ready<Result<(), ()>>; type Future = Ready<Result<(), ()>>;
@ -74,26 +61,21 @@ where
self.service.poll_ready(ctx).map_err(|_| ()) self.service.poll_ready(ctx).map_err(|_| ())
} }
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, StdStream)) -> Self::Future {
match req { match FromStream::from_stdstream(req) {
ServerMessage::Connect(stream) => { Ok(stream) => {
let stream = FromStream::from_stdstream(stream).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e);
});
if let Ok(stream) = stream {
let f = self.service.call(stream); let f = self.service.call(stream);
spawn(async move { spawn(async move {
let _ = f.await; let _ = f.await;
drop(guard); drop(guard);
}); });
ok(()) ok(())
} else { }
Err(e) => {
error!("Can not convert to an async tcp stream: {}", e);
err(()) err(())
} }
} }
_ => ok(()),
}
} }
} }
@ -159,20 +141,6 @@ where
} }
} }
impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
fn name(&self, token: Token) -> &str {
self.as_ref().name(token)
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory()
}
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
self.as_ref().create()
}
}
impl<F, T, I> ServiceFactory<I> for F impl<F, T, I> ServiceFactory<I> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,

View File

@ -14,7 +14,7 @@ use futures_util::{future::Future, stream::Stream, FutureExt, TryFutureExt};
use log::{error, info, trace}; use log::{error, info, trace};
use crate::accept::AcceptNotify; use crate::accept::AcceptNotify;
use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::{SocketAddr, StdStream}; use crate::socket::{SocketAddr, StdStream};
use crate::Token; use crate::Token;
@ -228,23 +228,12 @@ impl Worker {
self.services.iter_mut().for_each(|srv| { self.services.iter_mut().for_each(|srv| {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopped; srv.status = WorkerServiceStatus::Stopped;
actix_rt::spawn(
srv.service
.call((None, ServerMessage::ForceShutdown))
.map(|_| ()),
);
} }
}); });
} else { } else {
let timeout = self.shutdown_timeout;
self.services.iter_mut().for_each(move |srv| { self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping; srv.status = WorkerServiceStatus::Stopping;
actix_rt::spawn(
srv.service
.call((None, ServerMessage::Shutdown(timeout)))
.map(|_| ()),
);
} }
}); });
} }
@ -361,7 +350,7 @@ impl Future for Worker {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[conn.token.0] let _ = self.services[conn.token.0]
.service .service
.call((Some(guard), ServerMessage::Connect(conn.io))); .call((Some(guard), conn.io));
} else { } else {
self.state = WorkerState::Available; self.state = WorkerState::Available;
self.availability.set(true); self.availability.set(true);
@ -455,7 +444,7 @@ impl Future for Worker {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[msg.token.0] let _ = self.services[msg.token.0]
.service .service
.call((Some(guard), ServerMessage::Connect(msg.io))); .call((Some(guard), msg.io));
continue; continue;
} }
Ok(false) => { Ok(false) => {