diff --git a/src/client/connector.rs b/src/client/connector.rs index 61dda22ed..13539b1e1 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -5,7 +5,7 @@ use std::{fmt, io, mem, time}; use actix::resolver::{Connect as ResolveConnect, Connector, ConnectorError}; use actix::{ - fut, Actor, ActorFuture, ActorResponse, Addr, AsyncContext, Context, + fut, Actor, ActorContext, ActorFuture, ActorResponse, Addr, AsyncContext, Context, ContextFutureSpawner, Handler, Message, Recipient, StreamHandler, Supervised, SystemService, WrapFuture, }; @@ -198,7 +198,7 @@ pub struct ClientConnector { acq_tx: mpsc::UnboundedSender, acq_rx: Option>, - resolver: Addr, + resolver: Option>, conn_lifetime: Duration, conn_keep_alive: Duration, limit: usize, @@ -216,6 +216,9 @@ impl Actor for ClientConnector { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { + if self.resolver.is_none() { + self.resolver = Some(Connector::from_registry()) + } self.collect_periodic(ctx); ctx.add_stream(self.acq_rx.take().unwrap()); ctx.spawn(Maintenance); @@ -242,7 +245,7 @@ impl Default for ClientConnector { subscriber: None, acq_tx: tx, acq_rx: Some(rx), - resolver: Connector::from_registry(), + resolver: None, connector: builder.build().unwrap(), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), @@ -266,7 +269,7 @@ impl Default for ClientConnector { subscriber: None, acq_tx: tx, acq_rx: Some(rx), - resolver: Connector::from_registry(), + resolver: None, conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), limit: 100, @@ -333,7 +336,7 @@ impl ClientConnector { subscriber: None, acq_tx: tx, acq_rx: Some(rx), - resolver: Connector::from_registry(), + resolver: None, conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), limit: 100, @@ -395,7 +398,7 @@ impl ClientConnector { /// Use custom resolver actor pub fn resolver(mut self, addr: Addr) -> Self { - self.resolver = addr; + self.resolver = Some(addr); self } @@ -667,6 +670,8 @@ impl Handler for ClientConnector { { ActorResponse::async( self.resolver + .as_ref() + .unwrap() .send( ResolveConnect::host_and_port(&conn.0.host, port) .timeout(conn_timeout), @@ -764,16 +769,19 @@ impl Handler for ClientConnector { } impl StreamHandler for ClientConnector { - fn handle(&mut self, msg: AcquiredConnOperation, _: &mut Context) { + fn handle( + &mut self, msg: Result, ()>, + ctx: &mut Context, + ) { let now = Instant::now(); match msg { - AcquiredConnOperation::Close(conn) => { + Ok(Some(AcquiredConnOperation::Close(conn))) => { self.release_key(&conn.key); self.to_close.push(conn); self.stats.closed += 1; } - AcquiredConnOperation::Release(conn) => { + Ok(Some(AcquiredConnOperation::Release(conn))) => { self.release_key(&conn.key); // check connection lifetime and the return to available pool @@ -784,9 +792,10 @@ impl StreamHandler for ClientConnector { .push_back(Conn(Instant::now(), conn)); } } - AcquiredConnOperation::ReleaseKey(key) => { + Ok(Some(AcquiredConnOperation::ReleaseKey(key))) => { self.release_key(&key); } + _ => ctx.stop(), } // check keep-alive diff --git a/src/server/srv.rs b/src/server/srv.rs index 24874f153..21722c334 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -4,8 +4,8 @@ use std::time::Duration; use std::{io, net, thread}; use actix::{ - fut, msgs, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, - ContextFutureSpawner, Handler, Response, StreamHandler, System, WrapFuture, + fut, msgs, signal, Actor, ActorContext, ActorFuture, Addr, Arbiter, AsyncContext, + Context, ContextFutureSpawner, Handler, Response, StreamHandler, System, WrapFuture, }; use futures::sync::mpsc; @@ -626,10 +626,11 @@ impl Handler for HttpServer { /// Commands from accept threads impl StreamHandler for HttpServer { - fn finished(&mut self, _: &mut Context) {} - fn handle(&mut self, msg: ServerCommand, _: &mut Context) { + fn handle( + &mut self, msg: Result, ()>, ctx: &mut Context, + ) { match msg { - ServerCommand::WorkerDied(idx, socks) => { + Ok(Some(ServerCommand::WorkerDied(idx, socks))) => { let mut found = false; for i in 0..self.workers.len() { if self.workers[i].0 == idx { @@ -675,6 +676,7 @@ impl StreamHandler for HttpServer { self.workers.push((new_idx, addr)); } } + _ => ctx.stop(), } } } diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 558ecb515..c68cf300c 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -25,12 +25,12 @@ //! //! // Handler for ws::Message messages //! impl StreamHandler for Ws { -//! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { +//! fn handle(&mut self, msg: Result, ws::ProtocolError>, ctx: &mut Self::Context) { //! match msg { -//! ws::Message::Ping(msg) => ctx.pong(&msg), -//! ws::Message::Text(text) => ctx.text(text), -//! ws::Message::Binary(bin) => ctx.binary(bin), -//! _ => (), +//! Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg), +//! Ok(Some(ws::Message::Text(text))) => ctx.text(text), +//! Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin), +//! _ => ctx.stop(), //! } //! } //! } diff --git a/tests/test_ws.rs b/tests/test_ws.rs index dd65d4a58..eeeffb7aa 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -23,13 +23,16 @@ impl Actor for Ws { } impl StreamHandler for Ws { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + fn handle( + &mut self, msg: Result, ws::ProtocolError>, + ctx: &mut Self::Context, + ) { match msg { - ws::Message::Ping(msg) => ctx.pong(&msg), - ws::Message::Text(text) => ctx.text(text), - ws::Message::Binary(bin) => ctx.binary(bin), - ws::Message::Close(reason) => ctx.close(reason), - _ => (), + Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg), + Ok(Some(ws::Message::Text(text))) => ctx.text(text), + Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin), + Ok(Some(ws::Message::Close(reason))) => ctx.close(reason), + _ => ctx.stop(), } } } @@ -153,13 +156,16 @@ impl Ws2 { } impl StreamHandler for Ws2 { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + fn handle( + &mut self, msg: Result, ws::ProtocolError>, + ctx: &mut Self::Context, + ) { match msg { - ws::Message::Ping(msg) => ctx.pong(&msg), - ws::Message::Text(text) => ctx.text(text), - ws::Message::Binary(bin) => ctx.binary(bin), - ws::Message::Close(reason) => ctx.close(reason), - _ => (), + Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg), + Ok(Some(ws::Message::Text(text))) => ctx.text(text), + Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin), + Ok(Some(ws::Message::Close(reason))) => ctx.close(reason), + _ => ctx.stop(), } } }