1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 00:01:11 +01:00

fix disconnect callback

This commit is contained in:
Nikolay Kim 2019-07-03 13:02:03 +06:00
parent 922a919572
commit da302d4b7a
4 changed files with 79 additions and 11 deletions

View File

@ -18,8 +18,8 @@ name = "actix_ioframe"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "0.4.0" actix-service = "0.4.1"
actix-codec = "0.1.1" actix-codec = "0.1.2"
bytes = "0.4" bytes = "0.4"
either = "1.5.2" either = "1.5.2"
futures = "0.1.25" futures = "0.1.25"
@ -28,3 +28,8 @@ log = "0.4"
[dev-dependencies] [dev-dependencies]
actix-rt = "0.2.2" actix-rt = "0.2.2"
actix-connect = "0.2.0"
actix-test-server = "0.2.2"
actix-server-config = "0.1.1"
tokio-tcp = "0.1"
tokio-timer = "0.2"

View File

@ -314,7 +314,10 @@ where
self.disconnect(true); self.disconnect(true);
Err(err) Err(err)
} }
FramedState::Stopping => Ok(Async::Ready(())), FramedState::Stopping => {
self.disconnect(false);
Ok(Async::Ready(()))
}
} }
} }
} }

View File

@ -3,7 +3,7 @@ use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use actix_service::{IntoNewService, IntoService, NewService, Service}; use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::{Async, Future, IntoFuture, Poll}; use futures::{Async, Future, Poll};
use crate::connect::{Connect, ConnectResult}; use crate::connect::{Connect, ConnectResult};
use crate::dispatcher::FramedDispatcher; use crate::dispatcher::FramedDispatcher;
@ -139,11 +139,11 @@ where
self self
} }
pub fn finish<F, T>( pub fn finish<F, T, Cfg>(
self, self,
service: F, service: F,
) -> impl NewService< ) -> impl NewService<
Config = (), Config = Cfg,
Request = Io, Request = Io,
Response = (), Response = (),
Error = ServiceError<C::Error, Codec>, Error = ServiceError<C::Error, Codec>,
@ -167,14 +167,14 @@ where
} }
} }
pub(crate) struct FramedService<St, C, T, Io, Codec> { pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> {
connect: C, connect: C,
handler: Rc<T>, handler: Rc<T>,
disconnect: Option<Rc<Fn(&mut St, bool)>>, disconnect: Option<Rc<Fn(&mut St, bool)>>,
_t: PhantomData<(St, Io, Codec)>, _t: PhantomData<(St, Io, Codec, Cfg)>,
} }
impl<St, C, T, Io, Codec> NewService for FramedService<St, C, T, Io, Codec> impl<St, C, T, Io, Codec, Cfg> NewService for FramedService<St, C, T, Io, Codec, Cfg>
where where
St: 'static, St: 'static,
Io: AsyncRead + AsyncWrite, Io: AsyncRead + AsyncWrite,
@ -192,7 +192,7 @@ where
<Codec as Encoder>::Item: 'static, <Codec as Encoder>::Item: 'static,
<Codec as Encoder>::Error: std::fmt::Debug, <Codec as Encoder>::Error: std::fmt::Debug,
{ {
type Config = (); type Config = Cfg;
type Request = Io; type Request = Io;
type Response = (); type Response = ();
type Error = ServiceError<C::Error, Codec>; type Error = ServiceError<C::Error, Codec>;
@ -200,7 +200,7 @@ where
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>; type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>; type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &Cfg) -> Self::Future {
let handler = self.handler.clone(); let handler = self.handler.clone();
let disconnect = self.disconnect.clone(); let disconnect = self.disconnect.clone();

View File

@ -0,0 +1,60 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use actix_codec::BytesCodec;
use actix_server_config::Io;
use actix_service::{new_apply_fn, Service};
use actix_test_server::TestServer;
use futures::Future;
use tokio_tcp::TcpStream;
use tokio_timer::sleep;
use actix_ioframe::{Builder, Connect};
struct State;
#[test]
fn test_disconnect() -> std::io::Result<()> {
let disconnect = Arc::new(AtomicBool::new(false));
let disconnect1 = disconnect.clone();
let mut srv = TestServer::with(move || {
let disconnect1 = disconnect1.clone();
new_apply_fn(
Builder::new()
.factory(|conn: Connect<_>| Ok(conn.codec(BytesCodec).state(State)))
.disconnect(move |_, _| {
disconnect1.store(true, Ordering::Relaxed);
})
.finish(|_t| Ok(None)),
|io: Io<TcpStream>, srv| srv.call(io.into_parts().0),
)
});
let mut client = Builder::new()
.service(|conn: Connect<_>| {
let conn = conn.codec(BytesCodec).state(State);
conn.sink().close();
Ok(conn)
})
.finish(|_t| Ok(None));
let conn = srv
.block_on(
actix_connect::default_connector()
.call(actix_connect::Connect::with(String::new(), srv.addr())),
)
.unwrap();
srv.block_on(client.call(conn.into_parts().0)).unwrap();
let _ = srv.block_on(
sleep(Duration::from_millis(100))
.map(|_| ())
.map_err(|_| ()),
);
assert!(disconnect.load(Ordering::Relaxed));
Ok(())
}