1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

new StreamHandler impl

This commit is contained in:
Nikolay Kim 2018-06-09 07:53:46 -07:00
parent 9151d61eda
commit 818d0bc187
4 changed files with 49 additions and 32 deletions

View File

@ -5,7 +5,7 @@ use std::{fmt, io, mem, time};
use actix::resolver::{Connect as ResolveConnect, Connector, ConnectorError}; use actix::resolver::{Connect as ResolveConnect, Connector, ConnectorError};
use actix::{ use actix::{
fut, Actor, ActorFuture, ActorResponse, Addr, AsyncContext, Context, fut, Actor, ActorContext, ActorFuture, ActorResponse, Addr, AsyncContext, Context,
ContextFutureSpawner, Handler, Message, Recipient, StreamHandler, Supervised, ContextFutureSpawner, Handler, Message, Recipient, StreamHandler, Supervised,
SystemService, WrapFuture, SystemService, WrapFuture,
}; };
@ -198,7 +198,7 @@ pub struct ClientConnector {
acq_tx: mpsc::UnboundedSender<AcquiredConnOperation>, acq_tx: mpsc::UnboundedSender<AcquiredConnOperation>,
acq_rx: Option<mpsc::UnboundedReceiver<AcquiredConnOperation>>, acq_rx: Option<mpsc::UnboundedReceiver<AcquiredConnOperation>>,
resolver: Addr<Connector>, resolver: Option<Addr<Connector>>,
conn_lifetime: Duration, conn_lifetime: Duration,
conn_keep_alive: Duration, conn_keep_alive: Duration,
limit: usize, limit: usize,
@ -216,6 +216,9 @@ impl Actor for ClientConnector {
type Context = Context<ClientConnector>; type Context = Context<ClientConnector>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
if self.resolver.is_none() {
self.resolver = Some(Connector::from_registry())
}
self.collect_periodic(ctx); self.collect_periodic(ctx);
ctx.add_stream(self.acq_rx.take().unwrap()); ctx.add_stream(self.acq_rx.take().unwrap());
ctx.spawn(Maintenance); ctx.spawn(Maintenance);
@ -242,7 +245,7 @@ impl Default for ClientConnector {
subscriber: None, subscriber: None,
acq_tx: tx, acq_tx: tx,
acq_rx: Some(rx), acq_rx: Some(rx),
resolver: Connector::from_registry(), resolver: None,
connector: builder.build().unwrap(), connector: builder.build().unwrap(),
conn_lifetime: Duration::from_secs(75), conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(15),
@ -266,7 +269,7 @@ impl Default for ClientConnector {
subscriber: None, subscriber: None,
acq_tx: tx, acq_tx: tx,
acq_rx: Some(rx), acq_rx: Some(rx),
resolver: Connector::from_registry(), resolver: None,
conn_lifetime: Duration::from_secs(75), conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(15),
limit: 100, limit: 100,
@ -333,7 +336,7 @@ impl ClientConnector {
subscriber: None, subscriber: None,
acq_tx: tx, acq_tx: tx,
acq_rx: Some(rx), acq_rx: Some(rx),
resolver: Connector::from_registry(), resolver: None,
conn_lifetime: Duration::from_secs(75), conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(15),
limit: 100, limit: 100,
@ -395,7 +398,7 @@ impl ClientConnector {
/// Use custom resolver actor /// Use custom resolver actor
pub fn resolver(mut self, addr: Addr<Connector>) -> Self { pub fn resolver(mut self, addr: Addr<Connector>) -> Self {
self.resolver = addr; self.resolver = Some(addr);
self self
} }
@ -667,6 +670,8 @@ impl Handler<Connect> for ClientConnector {
{ {
ActorResponse::async( ActorResponse::async(
self.resolver self.resolver
.as_ref()
.unwrap()
.send( .send(
ResolveConnect::host_and_port(&conn.0.host, port) ResolveConnect::host_and_port(&conn.0.host, port)
.timeout(conn_timeout), .timeout(conn_timeout),
@ -764,16 +769,19 @@ impl Handler<Connect> for ClientConnector {
} }
impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector { impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector {
fn handle(&mut self, msg: AcquiredConnOperation, _: &mut Context<Self>) { fn handle(
&mut self, msg: Result<Option<AcquiredConnOperation>, ()>,
ctx: &mut Context<Self>,
) {
let now = Instant::now(); let now = Instant::now();
match msg { match msg {
AcquiredConnOperation::Close(conn) => { Ok(Some(AcquiredConnOperation::Close(conn))) => {
self.release_key(&conn.key); self.release_key(&conn.key);
self.to_close.push(conn); self.to_close.push(conn);
self.stats.closed += 1; self.stats.closed += 1;
} }
AcquiredConnOperation::Release(conn) => { Ok(Some(AcquiredConnOperation::Release(conn))) => {
self.release_key(&conn.key); self.release_key(&conn.key);
// check connection lifetime and the return to available pool // check connection lifetime and the return to available pool
@ -784,9 +792,10 @@ impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector {
.push_back(Conn(Instant::now(), conn)); .push_back(Conn(Instant::now(), conn));
} }
} }
AcquiredConnOperation::ReleaseKey(key) => { Ok(Some(AcquiredConnOperation::ReleaseKey(key))) => {
self.release_key(&key); self.release_key(&key);
} }
_ => ctx.stop(),
} }
// check keep-alive // check keep-alive

View File

@ -4,8 +4,8 @@ use std::time::Duration;
use std::{io, net, thread}; use std::{io, net, thread};
use actix::{ use actix::{
fut, msgs, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, fut, msgs, signal, Actor, ActorContext, ActorFuture, Addr, Arbiter, AsyncContext,
ContextFutureSpawner, Handler, Response, StreamHandler, System, WrapFuture, Context, ContextFutureSpawner, Handler, Response, StreamHandler, System, WrapFuture,
}; };
use futures::sync::mpsc; use futures::sync::mpsc;
@ -626,10 +626,11 @@ impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H> {
/// Commands from accept threads /// Commands from accept threads
impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> { impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
fn finished(&mut self, _: &mut Context<Self>) {} fn handle(
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) { &mut self, msg: Result<Option<ServerCommand>, ()>, ctx: &mut Context<Self>,
) {
match msg { match msg {
ServerCommand::WorkerDied(idx, socks) => { Ok(Some(ServerCommand::WorkerDied(idx, socks))) => {
let mut found = false; let mut found = false;
for i in 0..self.workers.len() { for i in 0..self.workers.len() {
if self.workers[i].0 == idx { if self.workers[i].0 == idx {
@ -675,6 +676,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
self.workers.push((new_idx, addr)); self.workers.push((new_idx, addr));
} }
} }
_ => ctx.stop(),
} }
} }
} }

View File

@ -25,12 +25,12 @@
//! //!
//! // Handler for ws::Message messages //! // Handler for ws::Message messages
//! impl StreamHandler<ws::Message, ws::ProtocolError> for Ws { //! impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
//! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { //! fn handle(&mut self, msg: Result<Option<ws::Message>, ws::ProtocolError>, ctx: &mut Self::Context) {
//! match msg { //! match msg {
//! ws::Message::Ping(msg) => ctx.pong(&msg), //! Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg),
//! ws::Message::Text(text) => ctx.text(text), //! Ok(Some(ws::Message::Text(text))) => ctx.text(text),
//! ws::Message::Binary(bin) => ctx.binary(bin), //! Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin),
//! _ => (), //! _ => ctx.stop(),
//! } //! }
//! } //! }
//! } //! }

View File

@ -23,13 +23,16 @@ impl Actor for Ws {
} }
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws { impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { fn handle(
&mut self, msg: Result<Option<ws::Message>, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg { match msg {
ws::Message::Ping(msg) => ctx.pong(&msg), Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text), Ok(Some(ws::Message::Text(text))) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin), Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason), Ok(Some(ws::Message::Close(reason))) => ctx.close(reason),
_ => (), _ => ctx.stop(),
} }
} }
} }
@ -153,13 +156,16 @@ impl Ws2 {
} }
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws2 { impl StreamHandler<ws::Message, ws::ProtocolError> for Ws2 {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { fn handle(
&mut self, msg: Result<Option<ws::Message>, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg { match msg {
ws::Message::Ping(msg) => ctx.pong(&msg), Ok(Some(ws::Message::Ping(msg))) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text), Ok(Some(ws::Message::Text(text))) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin), Ok(Some(ws::Message::Binary(bin))) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason), Ok(Some(ws::Message::Close(reason))) => ctx.close(reason),
_ => (), _ => ctx.stop(),
} }
} }
} }