1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-08-22 05:35:08 +02:00

fix context poll

This commit is contained in:
Nikolay Kim
2018-01-07 17:13:49 -08:00
parent 896981cdf8
commit f802fe09e6
2 changed files with 91 additions and 53 deletions

View File

@@ -113,7 +113,13 @@ unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: HttpHandler + '
unsafe impl<T, A, H, U> Send for HttpServer<T, A, H, U> where H: HttpHandler + 'static {}
impl<T: 'static, A: 'static, H: HttpHandler + 'static, U: 'static> Actor for HttpServer<T, A, H, U> {
impl<T, A, H, U, V> Actor for HttpServer<T, A, H, U>
where A: 'static,
T: IoStream,
H: HttpHandler,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
@@ -121,13 +127,6 @@ impl<T: 'static, A: 'static, H: HttpHandler + 'static, U: 'static> Actor for Htt
}
}
impl<T: 'static, A: 'static, H: HttpHandler + 'static, U: 'static> HttpServer<T, A, H, U> {
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
}
impl<T, A, H, U, V> HttpServer<T, A, H, U>
where A: 'static,
T: IoStream,
@@ -157,6 +156,11 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
/// Set number of workers to start.
///
/// By default http server uses number of available logical cpu as threads count.
@@ -294,14 +298,15 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
}
// subscribe to os signals
fn subscribe_to_signals(&self, addr: &SyncAddress<HttpServer<T, A, H, U>>) {
if self.no_signals {
let msg = signal::Subscribe(addr.subscriber());
fn subscribe_to_signals(&self) -> Option<SyncAddress<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
signals.send(msg);
Some(signals.clone())
} else {
Arbiter::system_registry().get::<signal::ProcessSignals>().send(msg);
Some(Arbiter::system_registry().get::<signal::ProcessSignals>())
}
} else {
None
}
}
}
@@ -355,10 +360,10 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
}
// start http server actor
HttpServer::create(|ctx| {
self.subscribe_to_signals(&ctx.address());
self
})
let signals = self.subscribe_to_signals();
let addr: SyncAddress<_> = Actor::start(self);
signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber())));
addr
}
}
@@ -427,10 +432,10 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
}
// start http server actor
Ok(HttpServer::create(|ctx| {
self.subscribe_to_signals(&ctx.address());
self
}))
let signals = self.subscribe_to_signals();
let addr: SyncAddress<_> = Actor::start(self);
signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber())));
Ok(addr)
}
}
}
@@ -470,10 +475,10 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
}
// start http server actor
Ok(HttpServer::create(|ctx| {
self.subscribe_to_signals(&ctx.address());
self
}))
let signals = self.subscribe_to_signals();
let addr: SyncAddress<_> = Actor::start(self);
signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber())));
Ok(addr)
}
}
}
@@ -514,22 +519,25 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive)));
// start server
HttpServer::create(move |ctx| {
let signals = self.subscribe_to_signals();
let addr: SyncAddress<_> = HttpServer::create(move |ctx| {
ctx.add_stream(stream.map(
move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false}));
self.subscribe_to_signals(&ctx.address());
self
})
});
signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber())));
addr
}
}
/// Signals support
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and send `SystemExit(0)`
/// message to `System` actor.
impl<T, A, H, U> Handler<signal::Signal> for HttpServer<T, A, H, U>
impl<T, A, H, U, V> Handler<signal::Signal> for HttpServer<T, A, H, U>
where T: IoStream,
H: HttpHandler + 'static,
U: 'static,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
A: 'static,
{
type Result = ();
@@ -556,10 +564,11 @@ impl<T, A, H, U> Handler<signal::Signal> for HttpServer<T, A, H, U>
}
}
impl<T, A, H, U> Handler<io::Result<Conn<T>>> for HttpServer<T, A, H, U>
impl<T, A, H, U, V> Handler<io::Result<Conn<T>>> for HttpServer<T, A, H, U>
where T: IoStream,
H: HttpHandler + 'static,
U: 'static,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
A: 'static,
{
type Result = ();
@@ -595,10 +604,11 @@ pub struct StopServer {
pub graceful: bool
}
impl<T, A, H, U> Handler<PauseServer> for HttpServer<T, A, H, U>
impl<T, A, H, U, V> Handler<PauseServer> for HttpServer<T, A, H, U>
where T: IoStream,
H: HttpHandler + 'static,
U: 'static,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
A: 'static,
{
type Result = ();
@@ -612,10 +622,11 @@ impl<T, A, H, U> Handler<PauseServer> for HttpServer<T, A, H, U>
}
}
impl<T, A, H, U> Handler<ResumeServer> for HttpServer<T, A, H, U>
impl<T, A, H, U, V> Handler<ResumeServer> for HttpServer<T, A, H, U>
where T: IoStream,
H: HttpHandler + 'static,
U: 'static,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
A: 'static,
{
type Result = ();
@@ -628,10 +639,11 @@ impl<T, A, H, U> Handler<ResumeServer> for HttpServer<T, A, H, U>
}
}
impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U>
impl<T, A, H, U, V> Handler<StopServer> for HttpServer<T, A, H, U>
where T: IoStream,
H: HttpHandler + 'static,
U: 'static,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
A: 'static,
{
type Result = actix::Response<Self, StopServer>;