1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-30 16:34:36 +01:00

add disconnect callback

This commit is contained in:
Nikolay Kim 2019-07-02 12:10:05 +06:00
parent 5445e341c3
commit 5a62175b6e
2 changed files with 63 additions and 2 deletions

View File

@ -1,6 +1,7 @@
//! Framed dispatcher service and related utilities //! Framed dispatcher service and related utilities
use std::collections::VecDeque; use std::collections::VecDeque;
use std::mem; use std::mem;
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service}; use actix_service::{IntoService, Service};
@ -43,6 +44,7 @@ where
framed: Framed<T, U>, framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>, rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>, inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
disconnect: Option<Rc<Fn(&mut St, bool)>>,
} }
impl<St, S, T, U> FramedDispatcher<St, S, T, U> impl<St, S, T, U> FramedDispatcher<St, S, T, U>
@ -61,11 +63,13 @@ where
service: F, service: F,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>, rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
sink: Sink<<U as Encoder>::Item>, sink: Sink<<U as Encoder>::Item>,
disconnect: Option<Rc<Fn(&mut St, bool)>>,
) -> Self { ) -> Self {
FramedDispatcher { FramedDispatcher {
framed, framed,
state, state,
sink, sink,
disconnect,
rx: Some(rx), rx: Some(rx),
service: service.into_service(), service: service.into_service(),
dispatch_state: FramedState::Processing, dispatch_state: FramedState::Processing,
@ -124,6 +128,12 @@ where
<U as Encoder>::Item: 'static, <U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug, <U as Encoder>::Error: std::fmt::Debug,
{ {
fn disconnect(&mut self, error: bool) {
if let Some(ref disconnect) = self.disconnect {
(&*disconnect)(&mut *self.state.get_mut(), error);
}
}
fn poll_read(&mut self) -> bool { fn poll_read(&mut self) -> bool {
loop { loop {
match self.service.poll_ready() { match self.service.poll_ready() {
@ -274,6 +284,7 @@ where
if self.framed.is_write_buf_empty() if self.framed.is_write_buf_empty()
|| (self.poll_write() || self.framed.is_write_buf_empty()) || (self.poll_write() || self.framed.is_write_buf_empty())
{ {
self.disconnect(true);
Err(err) Err(err)
} else { } else {
self.dispatch_state = FramedState::Error(err); self.dispatch_state = FramedState::Error(err);
@ -296,9 +307,13 @@ where
for tx in vec.drain(..) { for tx in vec.drain(..) {
let _ = tx.send(()); let _ = tx.send(());
} }
self.disconnect(false);
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
FramedState::FramedError(err) => Err(err), FramedState::FramedError(err) => {
self.disconnect(true);
Err(err)
}
FramedState::Stopping => Ok(Async::Ready(())), FramedState::Stopping => Ok(Async::Ready(())),
} }
} }

View File

@ -3,7 +3,7 @@ use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use actix_service::{IntoNewService, IntoService, NewService, Service}; use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::{Async, Future, Poll}; use futures::{Async, Future, IntoFuture, Poll};
use crate::connect::{Connect, ConnectResult}; use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::FramedDispatcher; use crate::dispatcher::FramedDispatcher;
@ -33,6 +33,7 @@ impl<St, Codec> Builder<St, Codec> {
{ {
ServiceBuilder { ServiceBuilder {
connect: connect.into_service(), connect: connect.into_service(),
disconnect: None,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -53,6 +54,7 @@ impl<St, Codec> Builder<St, Codec> {
{ {
NewServiceBuilder { NewServiceBuilder {
connect: connect.into_new_service(), connect: connect.into_new_service(),
disconnect: None,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -60,6 +62,7 @@ impl<St, Codec> Builder<St, Codec> {
pub struct ServiceBuilder<St, C, Io, Codec> { pub struct ServiceBuilder<St, C, Io, Codec> {
connect: C, connect: C,
disconnect: Option<Rc<Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec)>,
} }
@ -73,6 +76,23 @@ where
<Codec as Encoder>::Item: 'static, <Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug, <Codec as Encoder>::Error: std::fmt::Debug,
{ {
/// Callback to execute on disconnect
///
/// Second parameter indicates error occured during disconnect.
pub fn disconnect<F, Out>(mut self, disconnect: F) -> Self
where
F: Fn(&mut St, bool) -> Out + 'static,
Out: IntoFuture,
Out::Future: 'static,
{
self.disconnect = Some(Rc::new(move |st, error| {
let fut = disconnect(st, error).into_future();
tokio_current_thread::spawn(fut.map_err(|_| ()).map(|_| ()));
}));
self
}
/// Provide stream items handler service and construct service factory.
pub fn finish<F, T>( pub fn finish<F, T>(
self, self,
service: F, service: F,
@ -90,6 +110,7 @@ where
FramedServiceImpl { FramedServiceImpl {
connect: self.connect, connect: self.connect,
handler: Rc::new(service.into_new_service()), handler: Rc::new(service.into_new_service()),
disconnect: self.disconnect.clone(),
_t: PhantomData, _t: PhantomData,
} }
} }
@ -97,6 +118,7 @@ where
pub struct NewServiceBuilder<St, C, Io, Codec> { pub struct NewServiceBuilder<St, C, Io, Codec> {
connect: C, connect: C,
disconnect: Option<Rc<Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec)>,
} }
@ -111,6 +133,22 @@ where
<Codec as Encoder>::Item: 'static, <Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug, <Codec as Encoder>::Error: std::fmt::Debug,
{ {
/// Callback to execute on disconnect
///
/// Second parameter indicates error occured during disconnect.
pub fn disconnect<F, Out>(mut self, disconnect: F) -> Self
where
F: Fn(&mut St, bool) -> Out + 'static,
Out: IntoFuture,
Out::Future: 'static,
{
self.disconnect = Some(Rc::new(move |st, error| {
let fut = disconnect(st, error).into_future();
tokio_current_thread::spawn(fut.map_err(|_| ()).map(|_| ()));
}));
self
}
pub fn finish<F, T>( pub fn finish<F, T>(
self, self,
service: F, service: F,
@ -133,6 +171,7 @@ where
FramedService { FramedService {
connect: self.connect, connect: self.connect,
handler: Rc::new(service.into_new_service()), handler: Rc::new(service.into_new_service()),
disconnect: self.disconnect,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -141,6 +180,7 @@ where
pub(crate) struct FramedService<St, C, T, Io, Codec> { pub(crate) struct FramedService<St, C, T, Io, Codec> {
connect: C, connect: C,
handler: Rc<T>, handler: Rc<T>,
disconnect: Option<Rc<Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec)>,
} }
@ -172,6 +212,7 @@ where
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
let handler = self.handler.clone(); let handler = self.handler.clone();
let disconnect = self.disconnect.clone();
// create connect service and then create service impl // create connect service and then create service impl
Box::new( Box::new(
@ -180,6 +221,7 @@ where
.map(move |connect| FramedServiceImpl { .map(move |connect| FramedServiceImpl {
connect, connect,
handler, handler,
disconnect,
_t: PhantomData, _t: PhantomData,
}), }),
) )
@ -189,6 +231,7 @@ where
pub struct FramedServiceImpl<St, C, T, Io, Codec> { pub struct FramedServiceImpl<St, C, T, Io, Codec> {
connect: C, connect: C,
handler: Rc<T>, handler: Rc<T>,
disconnect: Option<Rc<Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec)>,
} }
@ -224,6 +267,7 @@ where
self.connect.call(Connect::new(req)), self.connect.call(Connect::new(req)),
self.handler.clone(), self.handler.clone(),
), ),
disconnect: self.disconnect.clone(),
} }
} }
} }
@ -246,6 +290,7 @@ where
<Codec as Encoder>::Error: std::fmt::Debug, <Codec as Encoder>::Error: std::fmt::Debug,
{ {
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>, inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
disconnect: Option<Rc<Fn(&mut St, bool)>>,
} }
enum FramedServiceImplResponseInner<St, Io, Codec, C, T> enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
@ -315,6 +360,7 @@ where
handler, handler,
res.rx, res.rx,
res.sink, res.sink,
self.disconnect.clone(),
)); ));
self.poll() self.poll()
} }