1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-28 08:40:37 +02:00

refactor service and state manahement

This commit is contained in:
Nikolay Kim
2019-12-29 13:42:42 +06:00
parent 1918c8d4f8
commit 5779da0f49
9 changed files with 345 additions and 527 deletions

View File

@ -1,43 +1,49 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::cell::Cell;
use std::rc::Rc;
use actix_codec::BytesCodec;
use actix_rt::time::delay_for;
use actix_service::{fn_service, Service};
use actix_service::{fn_factory_with_config, fn_service, IntoService, Service};
use actix_testing::TestServer;
use actix_utils::mpsc;
use bytes::{Bytes, BytesMut};
use futures::future::ok;
use actix_ioframe::{Builder, Connect};
use actix_ioframe::{Builder, Connect, FactoryBuilder};
#[derive(Clone)]
struct State;
struct State(Option<mpsc::Sender<Bytes>>);
#[actix_rt::test]
async fn test_disconnect() -> std::io::Result<()> {
let disconnect = Arc::new(AtomicBool::new(false));
let disconnect1 = disconnect.clone();
async fn test_basic() {
let client_item = Rc::new(Cell::new(false));
let srv = TestServer::with(move || {
let disconnect1 = disconnect1.clone();
Builder::new()
.factory(fn_service(|conn: Connect<_, _>| {
ok(conn.codec(BytesCodec).state(State))
}))
.disconnect(move |_, _| {
disconnect1.store(true, Ordering::Relaxed);
})
.finish(fn_service(|_t| ok(None)))
FactoryBuilder::new(fn_service(|conn: Connect<_, _>| {
ok(conn.codec(BytesCodec).state(State(None)))
}))
// echo
.build(fn_service(|t: BytesMut| ok(Some(t.freeze()))))
});
let mut client = Builder::new()
.service(|conn: Connect<_, _>| {
let conn = conn.codec(BytesCodec).state(State);
conn.sink().close();
ok(conn)
let item = client_item.clone();
let mut client = Builder::new(fn_service(move |conn: Connect<_, _>| {
async move {
let (tx, rx) = mpsc::channel();
let _ = tx.send(Bytes::from_static(b"Hello"));
Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx))))
}
}))
.build(fn_factory_with_config(move |mut cfg: State| {
let item = item.clone();
ok((move |t: BytesMut| {
assert_eq!(t.freeze(), Bytes::from_static(b"Hello"));
item.set(true);
// drop Sender, which will close connection
cfg.0.take();
ok::<_, ()>(None)
})
.finish(fn_service(|_t| ok(None)));
.into_service())
}));
let conn = actix_connect::default_connector()
.call(actix_connect::Connect::with(String::new(), srv.addr()))
@ -45,8 +51,5 @@ async fn test_disconnect() -> std::io::Result<()> {
.unwrap();
client.call(conn.into_parts().0).await.unwrap();
let _ = delay_for(Duration::from_millis(100)).await;
assert!(disconnect.load(Ordering::Relaxed));
Ok(())
assert!(client_item.get());
}