mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-27 20:12:58 +01:00
538 lines
15 KiB
Rust
538 lines
15 KiB
Rust
#![allow(clippy::let_underscore_future)]
|
|
|
|
use std::{
|
|
net,
|
|
sync::{
|
|
atomic::{AtomicUsize, Ordering},
|
|
mpsc, Arc,
|
|
},
|
|
thread,
|
|
time::Duration,
|
|
};
|
|
|
|
use actix_rt::{net::TcpStream, time::sleep};
|
|
use actix_server::{Server, TestServer};
|
|
use actix_service::fn_service;
|
|
|
|
fn unused_addr() -> net::SocketAddr {
|
|
TestServer::unused_addr()
|
|
}
|
|
|
|
#[test]
|
|
fn test_bind() {
|
|
let addr = unused_addr();
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
let h = thread::spawn(move || {
|
|
actix_rt::System::new().block_on(async {
|
|
let srv = Server::build()
|
|
.workers(1)
|
|
.disable_signals()
|
|
.shutdown_timeout(3600)
|
|
.bind("test", addr, move || {
|
|
fn_service(|_| async { Ok::<_, ()>(()) })
|
|
})?
|
|
.run();
|
|
|
|
tx.send(srv.handle()).unwrap();
|
|
srv.await
|
|
})
|
|
});
|
|
|
|
let srv = rx.recv().unwrap();
|
|
|
|
thread::sleep(Duration::from_millis(500));
|
|
|
|
net::TcpStream::connect(addr).unwrap();
|
|
|
|
let _ = srv.stop(true);
|
|
h.join().unwrap().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn test_listen() {
|
|
let addr = unused_addr();
|
|
let (tx, rx) = mpsc::channel();
|
|
let lst = net::TcpListener::bind(addr).unwrap();
|
|
|
|
let h = thread::spawn(move || {
|
|
actix_rt::System::new().block_on(async {
|
|
let srv = Server::build()
|
|
.workers(1)
|
|
.disable_signals()
|
|
.shutdown_timeout(3600)
|
|
.listen("test", lst, move || {
|
|
fn_service(|_| async { Ok::<_, ()>(()) })
|
|
})?
|
|
.run();
|
|
|
|
tx.send(srv.handle()).unwrap();
|
|
srv.await
|
|
})
|
|
});
|
|
|
|
let srv = rx.recv().unwrap();
|
|
|
|
thread::sleep(Duration::from_millis(500));
|
|
|
|
net::TcpStream::connect(addr).unwrap();
|
|
|
|
let _ = srv.stop(true);
|
|
h.join().unwrap().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn plain_tokio_runtime() {
|
|
let addr = unused_addr();
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
let h = thread::spawn(move || {
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.unwrap();
|
|
|
|
rt.block_on(async {
|
|
let srv = Server::build()
|
|
.workers(1)
|
|
.disable_signals()
|
|
.bind("test", addr, move || {
|
|
fn_service(|_| async { Ok::<_, ()>(()) })
|
|
})?
|
|
.run();
|
|
|
|
tx.send(srv.handle()).unwrap();
|
|
|
|
srv.await
|
|
})
|
|
});
|
|
|
|
let srv = rx.recv().unwrap();
|
|
|
|
thread::sleep(Duration::from_millis(500));
|
|
assert!(net::TcpStream::connect(addr).is_ok());
|
|
|
|
let _ = srv.stop(true);
|
|
h.join().unwrap().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
#[cfg(unix)]
|
|
fn test_start() {
|
|
use std::io::Read;
|
|
|
|
use actix_codec::{BytesCodec, Framed};
|
|
use bytes::Bytes;
|
|
use futures_util::sink::SinkExt;
|
|
|
|
let addr = unused_addr();
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
let h = thread::spawn(move || {
|
|
actix_rt::System::new().block_on(async {
|
|
let srv = Server::build()
|
|
.backlog(100)
|
|
.disable_signals()
|
|
.bind("test", addr, move || {
|
|
fn_service(|io: TcpStream| async move {
|
|
let mut f = Framed::new(io, BytesCodec);
|
|
f.send(Bytes::from_static(b"test")).await.unwrap();
|
|
Ok::<_, ()>(())
|
|
})
|
|
})?
|
|
.run();
|
|
|
|
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
|
|
|
srv.await
|
|
})
|
|
});
|
|
|
|
let (srv, sys) = rx.recv().unwrap();
|
|
|
|
let mut buf = [1u8; 4];
|
|
let mut conn = net::TcpStream::connect(addr).unwrap();
|
|
let _ = conn.read_exact(&mut buf);
|
|
assert_eq!(buf, b"test"[..]);
|
|
|
|
// pause
|
|
let _ = srv.pause();
|
|
thread::sleep(Duration::from_millis(200));
|
|
let mut conn = net::TcpStream::connect(addr).unwrap();
|
|
conn.set_read_timeout(Some(Duration::from_millis(100)))
|
|
.unwrap();
|
|
let res = conn.read_exact(&mut buf);
|
|
assert!(res.is_err());
|
|
|
|
// resume
|
|
let _ = srv.resume();
|
|
thread::sleep(Duration::from_millis(100));
|
|
assert!(net::TcpStream::connect(addr).is_ok());
|
|
assert!(net::TcpStream::connect(addr).is_ok());
|
|
assert!(net::TcpStream::connect(addr).is_ok());
|
|
|
|
let mut buf = [0u8; 4];
|
|
let mut conn = net::TcpStream::connect(addr).unwrap();
|
|
let _ = conn.read_exact(&mut buf);
|
|
assert_eq!(buf, b"test"[..]);
|
|
|
|
// stop
|
|
let _ = srv.stop(false);
|
|
sys.stop();
|
|
h.join().unwrap().unwrap();
|
|
|
|
thread::sleep(Duration::from_secs(1));
|
|
assert!(net::TcpStream::connect(addr).is_err());
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn test_max_concurrent_connections() {
|
|
// Note:
|
|
// A TCP listener would accept connects based on it's backlog setting.
|
|
//
|
|
// The limit test on the other hand is only for concurrent TCP stream limiting a work
|
|
// thread accept.
|
|
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
let addr = unused_addr();
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
let counter = Arc::new(AtomicUsize::new(0));
|
|
let counter_clone = counter.clone();
|
|
|
|
let max_conn = 3;
|
|
|
|
let h = thread::spawn(move || {
|
|
actix_rt::System::new().block_on(async {
|
|
let srv = Server::build()
|
|
// Set a relative higher backlog.
|
|
.backlog(12)
|
|
// max connection for a worker is 3.
|
|
.max_concurrent_connections(max_conn)
|
|
.workers(1)
|
|
.disable_signals()
|
|
.bind("test", addr, move || {
|
|
let counter = counter.clone();
|
|
fn_service(move |_io: TcpStream| {
|
|
let counter = counter.clone();
|
|
async move {
|
|
counter.fetch_add(1, Ordering::SeqCst);
|
|
sleep(Duration::from_secs(20)).await;
|
|
counter.fetch_sub(1, Ordering::SeqCst);
|
|
Ok::<(), ()>(())
|
|
}
|
|
})
|
|
})?
|
|
.run();
|
|
|
|
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
|
|
|
srv.await
|
|
})
|
|
});
|
|
|
|
let (srv, sys) = rx.recv().unwrap();
|
|
|
|
let mut conns = vec![];
|
|
|
|
for _ in 0..12 {
|
|
let conn = tokio::net::TcpStream::connect(addr).await.unwrap();
|
|
conns.push(conn);
|
|
}
|
|
|
|
sleep(Duration::from_secs(5)).await;
|
|
|
|
// counter would remain at 3 even with 12 successful connection.
|
|
// and 9 of them remain in backlog.
|
|
assert_eq!(max_conn, counter_clone.load(Ordering::SeqCst));
|
|
|
|
for mut conn in conns {
|
|
conn.shutdown().await.unwrap();
|
|
}
|
|
|
|
srv.stop(false).await;
|
|
sys.stop();
|
|
h.join().unwrap().unwrap();
|
|
}
|
|
|
|
// TODO: race-y failures detected due to integer underflow when calling Counter::total
|
|
#[actix_rt::test]
|
|
async fn test_service_restart() {
|
|
use std::task::{Context, Poll};
|
|
|
|
use actix_service::{fn_factory, Service};
|
|
use futures_core::future::LocalBoxFuture;
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
struct TestService(Arc<AtomicUsize>);
|
|
|
|
impl Service<TcpStream> for TestService {
|
|
type Response = ();
|
|
type Error = ();
|
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
let TestService(ref counter) = self;
|
|
let c = counter.fetch_add(1, Ordering::SeqCst);
|
|
// Force the service to restart on first readiness check.
|
|
if c > 0 {
|
|
Poll::Ready(Ok(()))
|
|
} else {
|
|
Poll::Ready(Err(()))
|
|
}
|
|
}
|
|
|
|
fn call(&self, _: TcpStream) -> Self::Future {
|
|
Box::pin(async { Ok(()) })
|
|
}
|
|
}
|
|
|
|
let addr1 = unused_addr();
|
|
let addr2 = unused_addr();
|
|
let (tx, rx) = mpsc::channel();
|
|
let num = Arc::new(AtomicUsize::new(0));
|
|
let num2 = Arc::new(AtomicUsize::new(0));
|
|
|
|
let num_clone = num.clone();
|
|
let num2_clone = num2.clone();
|
|
|
|
let h = thread::spawn(move || {
|
|
let num = num.clone();
|
|
actix_rt::System::new().block_on(async {
|
|
let srv = Server::build()
|
|
.backlog(1)
|
|
.disable_signals()
|
|
.bind("addr1", addr1, move || {
|
|
let num = num.clone();
|
|
fn_factory(move || {
|
|
let num = num.clone();
|
|
async move { Ok::<_, ()>(TestService(num)) }
|
|
})
|
|
})?
|
|
.bind("addr2", addr2, move || {
|
|
let num2 = num2.clone();
|
|
fn_factory(move || {
|
|
let num2 = num2.clone();
|
|
async move { Ok::<_, ()>(TestService(num2)) }
|
|
})
|
|
})?
|
|
.workers(1)
|
|
.run();
|
|
|
|
let _ = tx.send(srv.handle());
|
|
srv.await
|
|
})
|
|
});
|
|
|
|
let srv = rx.recv().unwrap();
|
|
|
|
for _ in 0..5 {
|
|
TcpStream::connect(addr1)
|
|
.await
|
|
.unwrap()
|
|
.shutdown()
|
|
.await
|
|
.unwrap();
|
|
TcpStream::connect(addr2)
|
|
.await
|
|
.unwrap()
|
|
.shutdown()
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
sleep(Duration::from_secs(3)).await;
|
|
|
|
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
|
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
|
|
|
let _ = srv.stop(false);
|
|
h.join().unwrap().unwrap();
|
|
}
|
|
|
|
#[ignore] // non-deterministic on CI
|
|
#[actix_rt::test]
|
|
async fn worker_restart() {
|
|
use actix_service::{Service, ServiceFactory};
|
|
use futures_core::future::LocalBoxFuture;
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
struct TestServiceFactory(Arc<AtomicUsize>);
|
|
|
|
impl ServiceFactory<TcpStream> for TestServiceFactory {
|
|
type Response = ();
|
|
type Error = ();
|
|
type Config = ();
|
|
type Service = TestService;
|
|
type InitError = ();
|
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
|
|
|
fn new_service(&self, _: Self::Config) -> Self::Future {
|
|
let counter = self.0.fetch_add(1, Ordering::Relaxed);
|
|
|
|
Box::pin(async move { Ok(TestService(counter)) })
|
|
}
|
|
}
|
|
|
|
struct TestService(usize);
|
|
|
|
impl Service<TcpStream> for TestService {
|
|
type Response = ();
|
|
type Error = ();
|
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
actix_service::always_ready!();
|
|
|
|
fn call(&self, stream: TcpStream) -> Self::Future {
|
|
let counter = self.0;
|
|
|
|
let mut stream = stream.into_std().unwrap();
|
|
use std::io::Write;
|
|
let str = counter.to_string();
|
|
let buf = str.as_bytes();
|
|
|
|
let mut written = 0;
|
|
|
|
while written < buf.len() {
|
|
if let Ok(n) = stream.write(&buf[written..]) {
|
|
written += n;
|
|
}
|
|
}
|
|
stream.flush().unwrap();
|
|
stream.shutdown(net::Shutdown::Write).unwrap();
|
|
|
|
// force worker 2 to restart service once.
|
|
if counter == 2 {
|
|
panic!("panic on purpose")
|
|
} else {
|
|
Box::pin(async { Ok(()) })
|
|
}
|
|
}
|
|
}
|
|
|
|
let addr = unused_addr();
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
let counter = Arc::new(AtomicUsize::new(1));
|
|
let h = thread::spawn(move || {
|
|
let counter = counter.clone();
|
|
actix_rt::System::new().block_on(async {
|
|
let srv = Server::build()
|
|
.disable_signals()
|
|
.bind("addr", addr, move || TestServiceFactory(counter.clone()))?
|
|
.workers(2)
|
|
.run();
|
|
|
|
let _ = tx.send(srv.handle());
|
|
|
|
srv.await
|
|
})
|
|
});
|
|
|
|
let srv = rx.recv().unwrap();
|
|
|
|
sleep(Duration::from_secs(3)).await;
|
|
|
|
let mut buf = [0; 8];
|
|
|
|
// worker 1 would not restart and return it's id consistently.
|
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
let n = stream.read(&mut buf).await.unwrap();
|
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
assert_eq!("1", id);
|
|
stream.shutdown().await.unwrap();
|
|
|
|
// worker 2 dead after return response.
|
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
let n = stream.read(&mut buf).await.unwrap();
|
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
assert_eq!("2", id);
|
|
stream.shutdown().await.unwrap();
|
|
|
|
// request to worker 1
|
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
let n = stream.read(&mut buf).await.unwrap();
|
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
assert_eq!("1", id);
|
|
stream.shutdown().await.unwrap();
|
|
|
|
// TODO: Remove sleep if it can pass CI.
|
|
sleep(Duration::from_secs(3)).await;
|
|
|
|
// worker 2 restarting and work goes to worker 1.
|
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
let n = stream.read(&mut buf).await.unwrap();
|
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
assert_eq!("1", id);
|
|
stream.shutdown().await.unwrap();
|
|
|
|
// TODO: Remove sleep if it can pass CI.
|
|
sleep(Duration::from_secs(3)).await;
|
|
|
|
// worker 2 restarted but worker 1 was still the next to accept connection.
|
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
let n = stream.read(&mut buf).await.unwrap();
|
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
assert_eq!("1", id);
|
|
stream.shutdown().await.unwrap();
|
|
|
|
// TODO: Remove sleep if it can pass CI.
|
|
sleep(Duration::from_secs(3)).await;
|
|
|
|
// worker 2 accept connection again but it's id is 3.
|
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
let n = stream.read(&mut buf).await.unwrap();
|
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
assert_eq!("3", id);
|
|
stream.shutdown().await.unwrap();
|
|
|
|
let _ = srv.stop(false);
|
|
h.join().unwrap().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn no_runtime_on_init() {
|
|
use std::{thread::sleep, time::Duration};
|
|
|
|
let addr = unused_addr();
|
|
let counter = Arc::new(AtomicUsize::new(0));
|
|
|
|
let mut srv = Server::build()
|
|
.workers(2)
|
|
.disable_signals()
|
|
.bind("test", addr, {
|
|
let counter = counter.clone();
|
|
move || {
|
|
counter.fetch_add(1, Ordering::SeqCst);
|
|
fn_service(|_| async { Ok::<_, ()>(()) })
|
|
}
|
|
})
|
|
.unwrap()
|
|
.run();
|
|
|
|
fn is_send<T: Send>(_: &T) {}
|
|
is_send(&srv);
|
|
is_send(&srv.handle());
|
|
|
|
sleep(Duration::from_millis(1_000));
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.unwrap();
|
|
|
|
rt.block_on(async move {
|
|
let _ = futures_util::poll!(&mut srv);
|
|
|
|
// available after the first poll
|
|
sleep(Duration::from_millis(500));
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
|
|
let _ = srv.handle().stop(true);
|
|
srv.await
|
|
})
|
|
.unwrap();
|
|
}
|