diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index fda1ade9..28996b9b 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -8,10 +8,9 @@ use futures_util::future::{ok, Future, FutureExt, LocalBoxFuture}; use log::error; use super::builder::bind_addr; -use super::service::{ - BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, -}; +use super::service::{BoxedServerService, InternalServiceFactory, StreamService}; use super::Token; +use crate::socket::StdStream; pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, @@ -239,7 +238,7 @@ impl ServiceRuntime { type BoxedNewService = Box< dyn actix::ServiceFactory< - Request = (Option, ServerMessage), + Request = (Option, StdStream), Response = (), Error = (), InitError = (), @@ -261,12 +260,12 @@ where T::Error: 'static, T::InitError: fmt::Debug + 'static, { - type Request = (Option, ServerMessage); + type Request = (Option, StdStream); type Response = (); type Error = (); - type InitError = (); type Config = (); type Service = BoxedServerService; + type InitError = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 984e5228..4fc49586 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -1,7 +1,6 @@ use std::marker::PhantomData; use std::net::SocketAddr; use std::task::{Context, Poll}; -use std::time::Duration; use actix_rt::spawn; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; @@ -13,18 +12,6 @@ use log::error; use super::Token; use crate::socket::{FromStream, StdStream}; -/// Server message -pub(crate) enum ServerMessage { - /// New stream - Connect(StdStream), - - /// Gracefully shutdown - Shutdown(Duration), - - /// Force shutdown - ForceShutdown, -} - pub trait ServiceFactory: Send + Clone + 'static { type Factory: actix::ServiceFactory; @@ -41,7 +28,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - Request = (Option, ServerMessage), + Request = (Option, StdStream), Response = (), Error = (), Future = Ready>, @@ -65,7 +52,7 @@ where T::Error: 'static, I: FromStream, { - type Request = (Option, ServerMessage); + type Request = (Option, StdStream); type Response = (); type Error = (); type Future = Ready>; @@ -74,25 +61,20 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { - match req { - ServerMessage::Connect(stream) => { - let stream = FromStream::from_stdstream(stream).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); + fn call(&mut self, (guard, req): (Option, StdStream)) -> Self::Future { + match FromStream::from_stdstream(req) { + Ok(stream) => { + let f = self.service.call(stream); + spawn(async move { + let _ = f.await; + drop(guard); }); - - if let Ok(stream) = stream { - let f = self.service.call(stream); - spawn(async move { - let _ = f.await; - drop(guard); - }); - ok(()) - } else { - err(()) - } + ok(()) + } + Err(e) => { + error!("Can not convert to an async tcp stream: {}", e); + err(()) } - _ => ok(()), } } } @@ -159,20 +141,6 @@ where } } -impl InternalServiceFactory for Box { - fn name(&self, token: Token) -> &str { - self.as_ref().name(token) - } - - fn clone_factory(&self) -> Box { - self.as_ref().clone_factory() - } - - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { - self.as_ref().create() - } -} - impl ServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 35331757..bfd11979 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -14,7 +14,7 @@ use futures_util::{future::Future, stream::Stream, FutureExt, TryFutureExt}; use log::{error, info, trace}; use crate::accept::AcceptNotify; -use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; +use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::{SocketAddr, StdStream}; use crate::Token; @@ -228,23 +228,12 @@ impl Worker { self.services.iter_mut().for_each(|srv| { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopped; - actix_rt::spawn( - srv.service - .call((None, ServerMessage::ForceShutdown)) - .map(|_| ()), - ); } }); } else { - let timeout = self.shutdown_timeout; self.services.iter_mut().for_each(move |srv| { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopping; - actix_rt::spawn( - srv.service - .call((None, ServerMessage::Shutdown(timeout))) - .map(|_| ()), - ); } }); } @@ -361,7 +350,7 @@ impl Future for Worker { let guard = self.conns.get(); let _ = self.services[conn.token.0] .service - .call((Some(guard), ServerMessage::Connect(conn.io))); + .call((Some(guard), conn.io)); } else { self.state = WorkerState::Available; self.availability.set(true); @@ -455,7 +444,7 @@ impl Future for Worker { let guard = self.conns.get(); let _ = self.services[msg.token.0] .service - .call((Some(guard), ServerMessage::Connect(msg.io))); + .call((Some(guard), msg.io)); continue; } Ok(false) => {