1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-19 06:04:41 +01:00

538 lines
15 KiB
Rust
Raw Normal View History

2023-04-01 05:24:00 +01:00
#![allow(clippy::let_underscore_future)]
2021-11-15 18:48:37 +00:00
use std::{
net,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
},
thread,
time::Duration,
};
use actix_rt::{net::TcpStream, time::sleep};
2021-11-15 18:48:37 +00:00
use actix_server::{Server, TestServer};
2019-12-08 19:05:05 +06:00
use actix_service::fn_service;
fn unused_addr() -> net::SocketAddr {
2021-11-15 18:48:37 +00:00
TestServer::unused_addr()
}
#[test]
fn test_bind() {
let addr = unused_addr();
2019-03-14 12:56:59 -07:00
let (tx, rx) = mpsc::channel();
2019-03-14 12:56:59 -07:00
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
2021-12-26 22:32:35 +00:00
.shutdown_timeout(3600)
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
2021-12-26 22:32:35 +00:00
tx.send(srv.handle()).unwrap();
srv.await
})
2019-03-09 07:27:56 -08:00
});
2021-12-26 22:32:35 +00:00
2021-11-14 19:45:15 +00:00
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
2021-12-26 22:32:35 +00:00
net::TcpStream::connect(addr).unwrap();
2021-11-14 19:45:15 +00:00
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
2021-12-26 22:32:35 +00:00
fn test_listen() {
2021-11-14 19:45:15 +00:00
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
2021-12-26 22:32:35 +00:00
let lst = net::TcpListener::bind(addr).unwrap();
2021-11-14 19:45:15 +00:00
let h = thread::spawn(move || {
2021-12-26 22:32:35 +00:00
actix_rt::System::new().block_on(async {
2021-11-14 19:45:15 +00:00
let srv = Server::build()
.workers(1)
.disable_signals()
2021-12-26 22:32:35 +00:00
.shutdown_timeout(3600)
.listen("test", lst, move || {
2021-11-14 19:45:15 +00:00
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
tx.send(srv.handle()).unwrap();
srv.await
})
});
let srv = rx.recv().unwrap();
2019-03-09 07:27:56 -08:00
thread::sleep(Duration::from_millis(500));
2021-12-26 22:32:35 +00:00
net::TcpStream::connect(addr).unwrap();
let _ = srv.stop(true);
h.join().unwrap().unwrap();
2019-03-09 07:27:56 -08:00
}
#[test]
2021-12-26 22:32:35 +00:00
fn plain_tokio_runtime() {
let addr = unused_addr();
2019-03-14 12:56:59 -07:00
let (tx, rx) = mpsc::channel();
2019-03-14 12:56:59 -07:00
let h = thread::spawn(move || {
2021-12-26 22:32:35 +00:00
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let srv = Server::build()
.workers(1)
2021-12-26 22:32:35 +00:00
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
2021-12-26 22:32:35 +00:00
tx.send(srv.handle()).unwrap();
srv.await
})
});
2021-11-14 19:45:15 +00:00
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
2019-03-11 12:01:55 -07:00
#[test]
#[cfg(unix)]
fn test_start() {
use std::io::Read;
2020-02-01 23:32:08 +09:00
use actix_codec::{BytesCodec, Framed};
use bytes::Bytes;
use futures_util::sink::SinkExt;
2020-02-01 23:32:08 +09:00
2019-03-11 12:01:55 -07:00
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let srv = Server::build()
.backlog(100)
.disable_signals()
.bind("test", addr, move || {
fn_service(|io: TcpStream| async move {
let mut f = Framed::new(io, BytesCodec);
f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(())
})
})?
.run();
2021-11-04 20:30:43 +00:00
let _ = tx.send((srv.handle(), actix_rt::System::current()));
2019-03-11 12:01:55 -07:00
srv.await
})
2019-03-11 12:01:55 -07:00
});
2019-03-11 12:01:55 -07:00
let (srv, sys) = rx.recv().unwrap();
Migrate actix-net to std::future (#64) * Migrate actix-codec, actix-rt, and actix-threadpool to std::future * update to latest tokio alpha and futures-rs * Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. * update futures-rs and tokio * Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' * Migrate actix-rt to std::future (#47) * remove Pin from Service::poll_ready(); simplify combinators api; make code compile * disable tests * update travis config * refactor naming * drop IntoFuture trait * Migrate actix-server to std::future (#50) Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch * update actix-server * rename Factor to ServiceFactory * start server worker in start mehtod * update actix-utils * remove IntoTransform trait * Migrate actix-server::ssl::nativetls to std futures (#61) * Refactor 'nativetls' module * Migrate 'actix-server-config' to std futures - remove "uds" feature - disable features by default * Switch NativeTlsAcceptor to use 'tokio-tls' crate * Bikeshed features names and remove unnecessary dependencies for 'actix-server-config' crate * update openssl impl * migrate actix-connect to std::future * migrate actix-ioframe to std::future * update version to alpha.1 * fix boxed service * migrate server rustls support * migratte openssl and rustls connecttors * store the thread's handle with arbiter (#62) * update ssl connect tests * restore service tests * update readme
2019-11-14 18:38:24 +06:00
let mut buf = [1u8; 4];
2019-03-11 14:33:19 -07:00
let mut conn = net::TcpStream::connect(addr).unwrap();
let _ = conn.read_exact(&mut buf);
assert_eq!(buf, b"test"[..]);
2019-03-11 12:01:55 -07:00
// pause
let _ = srv.pause();
thread::sleep(Duration::from_millis(200));
2019-03-11 14:33:19 -07:00
let mut conn = net::TcpStream::connect(addr).unwrap();
conn.set_read_timeout(Some(Duration::from_millis(100)))
2019-03-11 14:33:19 -07:00
.unwrap();
let res = conn.read_exact(&mut buf);
assert!(res.is_err());
2019-03-11 12:01:55 -07:00
// resume
let _ = srv.resume();
thread::sleep(Duration::from_millis(100));
2019-03-11 12:01:55 -07:00
assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok());
2019-03-11 14:33:19 -07:00
let mut buf = [0u8; 4];
let mut conn = net::TcpStream::connect(addr).unwrap();
let _ = conn.read_exact(&mut buf);
assert_eq!(buf, b"test"[..]);
2019-03-11 12:01:55 -07:00
// stop
let _ = srv.stop(false);
2020-01-28 20:27:33 +09:00
sys.stop();
h.join().unwrap().unwrap();
thread::sleep(Duration::from_secs(1));
assert!(net::TcpStream::connect(addr).is_err());
2019-03-11 12:01:55 -07:00
}
2019-12-04 21:34:48 +06:00
#[actix_rt::test]
async fn test_max_concurrent_connections() {
// Note:
// A TCP listener would accept connects based on it's backlog setting.
//
// The limit test on the other hand is only for concurrent TCP stream limiting a work
// thread accept.
use tokio::io::AsyncWriteExt;
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let max_conn = 3;
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
2021-11-04 20:30:43 +00:00
let srv = Server::build()
// Set a relative higher backlog.
.backlog(12)
// max connection for a worker is 3.
2021-11-01 23:36:51 +00:00
.max_concurrent_connections(max_conn)
.workers(1)
.disable_signals()
.bind("test", addr, move || {
let counter = counter.clone();
fn_service(move |_io: TcpStream| {
let counter = counter.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_secs(20)).await;
counter.fetch_sub(1, Ordering::SeqCst);
Ok::<(), ()>(())
}
})
})?
.run();
2021-11-04 20:30:43 +00:00
let _ = tx.send((srv.handle(), actix_rt::System::current()));
2021-11-04 20:30:43 +00:00
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
let mut conns = vec![];
for _ in 0..12 {
let conn = tokio::net::TcpStream::connect(addr).await.unwrap();
conns.push(conn);
}
sleep(Duration::from_secs(5)).await;
// counter would remain at 3 even with 12 successful connection.
// and 9 of them remain in backlog.
assert_eq!(max_conn, counter_clone.load(Ordering::SeqCst));
for mut conn in conns {
conn.shutdown().await.unwrap();
}
srv.stop(false).await;
sys.stop();
h.join().unwrap().unwrap();
}
2022-03-08 22:13:55 +00:00
// TODO: race-y failures detected due to integer underflow when calling Counter::total
#[actix_rt::test]
async fn test_service_restart() {
use std::task::{Context, Poll};
use actix_service::{fn_factory, Service};
use futures_core::future::LocalBoxFuture;
use tokio::io::AsyncWriteExt;
struct TestService(Arc<AtomicUsize>);
impl Service<TcpStream> for TestService {
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let TestService(ref counter) = self;
let c = counter.fetch_add(1, Ordering::SeqCst);
// Force the service to restart on first readiness check.
if c > 0 {
Poll::Ready(Ok(()))
} else {
Poll::Ready(Err(()))
}
}
fn call(&self, _: TcpStream) -> Self::Future {
Box::pin(async { Ok(()) })
}
}
let addr1 = unused_addr();
let addr2 = unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = Arc::new(AtomicUsize::new(0));
let num_clone = num.clone();
let num2_clone = num2.clone();
let h = thread::spawn(move || {
let num = num.clone();
actix_rt::System::new().block_on(async {
2021-11-04 20:30:43 +00:00
let srv = Server::build()
.backlog(1)
.disable_signals()
.bind("addr1", addr1, move || {
let num = num.clone();
fn_factory(move || {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
})
})?
.bind("addr2", addr2, move || {
let num2 = num2.clone();
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
})
})?
.workers(1)
.run();
2021-11-14 19:45:15 +00:00
let _ = tx.send(srv.handle());
2021-11-04 20:30:43 +00:00
srv.await
})
});
2021-11-14 19:45:15 +00:00
let srv = rx.recv().unwrap();
for _ in 0..5 {
TcpStream::connect(addr1)
.await
.unwrap()
.shutdown()
.await
.unwrap();
TcpStream::connect(addr2)
.await
.unwrap()
.shutdown()
.await
.unwrap();
}
sleep(Duration::from_secs(3)).await;
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
2021-11-04 20:30:43 +00:00
let _ = srv.stop(false);
h.join().unwrap().unwrap();
}
2021-11-04 23:00:43 +00:00
#[ignore] // non-deterministic on CI
#[actix_rt::test]
async fn worker_restart() {
use actix_service::{Service, ServiceFactory};
use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory {
type Response = ();
type Error = ();
type Config = ();
type Service = TestService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: Self::Config) -> Self::Future {
let counter = self.0.fetch_add(1, Ordering::Relaxed);
Box::pin(async move { Ok(TestService(counter)) })
}
}
struct TestService(usize);
impl Service<TcpStream> for TestService {
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!();
fn call(&self, stream: TcpStream) -> Self::Future {
let counter = self.0;
let mut stream = stream.into_std().unwrap();
use std::io::Write;
let str = counter.to_string();
let buf = str.as_bytes();
let mut written = 0;
while written < buf.len() {
if let Ok(n) = stream.write(&buf[written..]) {
written += n;
}
}
stream.flush().unwrap();
stream.shutdown(net::Shutdown::Write).unwrap();
// force worker 2 to restart service once.
if counter == 2 {
panic!("panic on purpose")
} else {
Box::pin(async { Ok(()) })
}
}
}
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let counter = Arc::new(AtomicUsize::new(1));
let h = thread::spawn(move || {
let counter = counter.clone();
actix_rt::System::new().block_on(async {
2021-11-04 20:30:43 +00:00
let srv = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))?
.workers(2)
.run();
2021-11-14 19:45:15 +00:00
let _ = tx.send(srv.handle());
2021-11-04 20:30:43 +00:00
srv.await
})
});
2021-11-14 19:45:15 +00:00
let srv = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await;
let mut buf = [0; 8];
// worker 1 would not restart and return it's id consistently.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// worker 2 dead after return response.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("2", id);
stream.shutdown().await.unwrap();
// request to worker 1
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarting and work goes to worker 1.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarted but worker 1 was still the next to accept connection.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 accept connection again but it's id is 3.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("3", id);
stream.shutdown().await.unwrap();
2021-11-04 20:30:43 +00:00
let _ = srv.stop(false);
h.join().unwrap().unwrap();
}
#[test]
fn no_runtime_on_init() {
use std::{thread::sleep, time::Duration};
let addr = unused_addr();
let counter = Arc::new(AtomicUsize::new(0));
let mut srv = Server::build()
.workers(2)
.disable_signals()
.bind("test", addr, {
let counter = counter.clone();
move || {
counter.fetch_add(1, Ordering::SeqCst);
fn_service(|_| async { Ok::<_, ()>(()) })
}
})
.unwrap()
.run();
fn is_send<T: Send>(_: &T) {}
is_send(&srv);
is_send(&srv.handle());
sleep(Duration::from_millis(1_000));
assert_eq!(counter.load(Ordering::SeqCst), 0);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
let _ = futures_util::poll!(&mut srv);
// available after the first poll
sleep(Duration::from_millis(500));
assert_eq!(counter.load(Ordering::SeqCst), 2);
let _ = srv.handle().stop(true);
srv.await
})
.unwrap();
}