use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; use std::{fmt, io, net}; use futures::{future, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; use super::{Config, NewService, Service}; pub(crate) type BoxedServerService = Box< Service< Request = net::TcpStream, Response = (), Error = (), Future = Box>, >, >; pub(crate) struct ServerService { inner: T, counter: Arc, } impl Service for ServerService where T: Service, T::Future: 'static, T::Error: fmt::Display + 'static, { type Request = net::TcpStream; type Response = (); type Error = (); type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready().map_err(|_| ()) } fn call(&mut self, stream: net::TcpStream) -> Self::Future { let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); }); if let Ok(stream) = stream { let counter = self.counter.clone(); let _ = counter.fetch_add(1, Ordering::Relaxed); Box::new(self.inner.call(stream).map_err(|_| ()).map(move |_| { let _ = counter.fetch_sub(1, Ordering::Relaxed); })) } else { Box::new(future::err(())) } } } pub(crate) struct ServerNewService where F: Fn() -> T + Send + Clone { inner: F, config: C, counter: Arc, } impl ServerNewService where F: Fn() -> T + Send + Clone + 'static, T: NewService + 'static, T::Service: 'static, T::Future: 'static, T::Error: fmt::Display, { pub(crate) fn create(inner: F, config: C) -> Box + Send> { Box::new(Self { inner, config, counter: Arc::new(AtomicUsize::new(0)), }) } } pub trait ServerServiceFactory { fn counter(&self) -> Arc; fn clone_factory(&self) -> Box + Send>; fn create(&self) -> Box>; } impl ServerServiceFactory for ServerNewService where F: Fn() -> T + Send + Clone + 'static, T: NewService + 'static, T::Service: 'static, T::Future: 'static, T::Error: fmt::Display, { fn counter(&self) -> Arc { self.counter.clone() } fn clone_factory(&self) -> Box + Send> { Box::new(Self { inner: self.inner.clone(), config: self.config.fork(), counter: Arc::new(AtomicUsize::new(0)), }) } fn create(&self) -> Box> { let counter = self.counter.clone(); Box::new( (self.inner)() .new_service(self.config.clone()) .map_err(|_| ()) .map(move |inner| { let service: BoxedServerService = Box::new(ServerService { inner, counter }); service }), ) } } impl ServerServiceFactory for Box> { fn counter(&self) -> Arc { self.as_ref().counter() } fn clone_factory(&self) -> Box + Send> { self.as_ref().clone_factory() } fn create(&self) -> Box> { self.as_ref().create() } }