diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 6d17fb0a..0c03c096 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -18,8 +18,8 @@ name = "actix_ioframe" path = "src/lib.rs" [dependencies] -actix-service = "0.4.0" -actix-codec = "0.1.1" +actix-service = "0.4.1" +actix-codec = "0.1.2" bytes = "0.4" either = "1.5.2" futures = "0.1.25" @@ -28,3 +28,8 @@ log = "0.4" [dev-dependencies] actix-rt = "0.2.2" +actix-connect = "0.2.0" +actix-test-server = "0.2.2" +actix-server-config = "0.1.1" +tokio-tcp = "0.1" +tokio-timer = "0.2" diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index fa7bfe42..a43646c3 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -314,7 +314,10 @@ where self.disconnect(true); Err(err) } - FramedState::Stopping => Ok(Async::Ready(())), + FramedState::Stopping => { + self.disconnect(false); + Ok(Async::Ready(())) + } } } } diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs index 543abfc9..c853372a 100644 --- a/actix-ioframe/src/service.rs +++ b/actix-ioframe/src/service.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use actix_service::{IntoNewService, IntoService, NewService, Service}; -use futures::{Async, Future, IntoFuture, Poll}; +use futures::{Async, Future, Poll}; use crate::connect::{Connect, ConnectResult}; use crate::dispatcher::FramedDispatcher; @@ -139,11 +139,11 @@ where self } - pub fn finish( + pub fn finish( self, service: F, ) -> impl NewService< - Config = (), + Config = Cfg, Request = Io, Response = (), Error = ServiceError, @@ -167,14 +167,14 @@ where } } -pub(crate) struct FramedService { +pub(crate) struct FramedService { connect: C, handler: Rc, disconnect: Option>, - _t: PhantomData<(St, Io, Codec)>, + _t: PhantomData<(St, Io, Codec, Cfg)>, } -impl NewService for FramedService +impl NewService for FramedService where St: 'static, Io: AsyncRead + AsyncWrite, @@ -192,7 +192,7 @@ where ::Item: 'static, ::Error: std::fmt::Debug, { - type Config = (); + type Config = Cfg; type Request = Io; type Response = (); type Error = ServiceError; @@ -200,7 +200,7 @@ where type Service = FramedServiceImpl; type Future = Box>; - fn new_service(&self, _: &()) -> Self::Future { + fn new_service(&self, _: &Cfg) -> Self::Future { let handler = self.handler.clone(); let disconnect = self.disconnect.clone(); diff --git a/actix-ioframe/tests/test_server.rs b/actix-ioframe/tests/test_server.rs new file mode 100644 index 00000000..f995f400 --- /dev/null +++ b/actix-ioframe/tests/test_server.rs @@ -0,0 +1,60 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use actix_codec::BytesCodec; +use actix_server_config::Io; +use actix_service::{new_apply_fn, Service}; +use actix_test_server::TestServer; +use futures::Future; +use tokio_tcp::TcpStream; +use tokio_timer::sleep; + +use actix_ioframe::{Builder, Connect}; + +struct State; + +#[test] +fn test_disconnect() -> std::io::Result<()> { + let disconnect = Arc::new(AtomicBool::new(false)); + let disconnect1 = disconnect.clone(); + + let mut srv = TestServer::with(move || { + let disconnect1 = disconnect1.clone(); + + new_apply_fn( + Builder::new() + .factory(|conn: Connect<_>| Ok(conn.codec(BytesCodec).state(State))) + .disconnect(move |_, _| { + disconnect1.store(true, Ordering::Relaxed); + }) + .finish(|_t| Ok(None)), + |io: Io, srv| srv.call(io.into_parts().0), + ) + }); + + let mut client = Builder::new() + .service(|conn: Connect<_>| { + let conn = conn.codec(BytesCodec).state(State); + conn.sink().close(); + Ok(conn) + }) + .finish(|_t| Ok(None)); + + let conn = srv + .block_on( + actix_connect::default_connector() + .call(actix_connect::Connect::with(String::new(), srv.addr())), + ) + .unwrap(); + + srv.block_on(client.call(conn.into_parts().0)).unwrap(); + let _ = srv.block_on( + sleep(Duration::from_millis(100)) + .map(|_| ()) + .map_err(|_| ()), + ); + assert!(disconnect.load(Ordering::Relaxed)); + + Ok(()) +}