1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-14 08:50:31 +02:00

Compare commits

..

5 Commits

Author SHA1 Message Date
Rob Ede
ae9afd4de7 prepare actix-server release 2.0.0-rc.2 2021-12-27 18:33:57 +00:00
Rob Ede
01d2f18f68 simplify test server (#431) 2021-12-27 18:27:54 +00:00
Rob Ede
e92b5aaf31 expose with_tokio_rt (#430) 2021-12-27 16:00:26 +00:00
Rob Ede
459a6d1b02 update readme 2021-12-27 00:57:16 +00:00
Rob Ede
9935883905 add file reader example 2021-12-26 22:32:35 +00:00
21 changed files with 293 additions and 129 deletions

View File

@@ -3,17 +3,10 @@
> A collection of lower-level libraries for composable network services. > A collection of lower-level libraries for composable network services.
![Apache 2.0 or MIT licensed](https://img.shields.io/crates/l/actix-server) ![Apache 2.0 or MIT licensed](https://img.shields.io/crates/l/actix-server)
[![CI](https://github.com/actix/actix-net/actions/workflows/ci.yml/badge.svg?event=push)](https://github.com/actix/actix-net/actions/workflows/ci.yml)
[![codecov](https://codecov.io/gh/actix/actix-net/branch/master/graph/badge.svg)](https://codecov.io/gh/actix/actix-net) [![codecov](https://codecov.io/gh/actix/actix-net/branch/master/graph/badge.svg)](https://codecov.io/gh/actix/actix-net)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)
## Build statuses
| Platform | Build Status |
| ---------------- | ------------ |
| Linux | [![build status](https://github.com/actix/actix-net/workflows/CI%20%28Linux%29/badge.svg?branch=master&event=push)](https://github.com/actix/actix-net/actions?query=workflow%3A"CI+(Linux)") |
| macOS | [![build status](https://github.com/actix/actix-net/workflows/CI%20%28macOS%29/badge.svg?branch=master&event=push)](https://github.com/actix/actix-net/actions?query=workflow%3A"CI+(macOS)") |
| Windows | [![build status](https://github.com/actix/actix-net/workflows/CI%20%28Windows%29/badge.svg?branch=master&event=push)](https://github.com/actix/actix-net/actions?query=workflow%3A"CI+(Windows)") |
| Windows (MinGW) | [![build status](https://github.com/actix/actix-net/workflows/CI%20%28Windows-mingw%29/badge.svg?branch=master&event=push)](https://github.com/actix/actix-net/actions?query=workflow%3A"CI+(Windows-mingw)") |
## Example ## Example
See `actix-server/examples` and `actix-tls/examples` for some basic examples. See `actix-server/examples` and `actix-tls/examples` for some basic examples.

View File

@@ -156,7 +156,7 @@ impl<T, U> Framed<T, U> {
} }
impl<T, U> Framed<T, U> { impl<T, U> Framed<T, U> {
/// Serialize item and Write to the inner buffer /// Serialize item and write to the inner buffer
pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error> pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error>
where where
T: AsyncWrite, T: AsyncWrite,

View File

@@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
- Expose `System::with_tokio_rt` and `Arbiter::with_tokio_rt`. [#430]
[#430]: https://github.com/actix/actix-net/pull/430
## 2.5.0 - 2021-11-22 ## 2.5.0 - 2021-11-22

View File

@@ -108,7 +108,6 @@ impl Arbiter {
/// ///
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where where
F: Fn() -> tokio::runtime::Runtime + Send + 'static, F: Fn() -> tokio::runtime::Runtime + Send + 'static,

View File

@@ -46,7 +46,6 @@ impl System {
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
/// ///
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
where where
F: Fn() -> tokio::runtime::Runtime, F: Fn() -> tokio::runtime::Runtime,

View File

@@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.0.0-rc.2 - 2021-12-27
- Simplify `TestServer`. [#431]
[#431]: https://github.com/actix/actix-net/pull/431
## 2.0.0-rc.1 - 2021-12-05 ## 2.0.0-rc.1 - 2021-12-05
- Hide implementation details of `Server`. [#424] - Hide implementation details of `Server`. [#424]
- `Server` now runs only after awaiting it. [#425] - `Server` now runs only after awaiting it. [#425]

View File

@@ -1,14 +1,17 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "2.0.0-rc.1" version = "2.0.0-rc.2"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>", "fakeshadow <24548779@qq.com>",
"Rob Ede <robjtede@icloud.com>",
"Ali MJ Al-Nasrawy <alimjalnasrawy@gmail.com>",
] ]
description = "General purpose TCP server built for the Actix ecosystem" description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "tcp", "server", "framework", "async"]
repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
@@ -43,4 +46,4 @@ actix-rt = "2.4.0"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] }

View File

@@ -0,0 +1,93 @@
//! Simple file-reader TCP server with framed stream.
//!
//! Using the following command:
//!
//! ```sh
//! nc 127.0.0.1 8080
//! ```
//!
//! Follow the prompt and enter a file path, relative or absolute.
use std::io;
use actix_codec::{Framed, LinesCodec};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _};
use futures_util::{SinkExt as _, StreamExt as _};
use tokio::{fs::File, io::AsyncReadExt as _};
async fn run() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let addr = ("127.0.0.1", 8080);
log::info!("starting server on port: {}", &addr.0);
// Bind socket address and start worker(s). By default, the server uses the number of physical
// CPU cores as the worker count. For this reason, the closure passed to bind needs to return
// a service *factory*; so it can be created once per worker.
Server::build()
.bind("file-reader", addr, move || {
fn_service(move |stream: TcpStream| async move {
// set up codec to use with I/O resource
let mut framed = Framed::new(stream, LinesCodec::default());
loop {
// prompt for file name
framed.send("Type file name to return:").await?;
// wait for next line
match framed.next().await {
Some(Ok(line)) => {
match File::open(line).await {
Ok(mut file) => {
// read file into String buffer
let mut buf = String::new();
file.read_to_string(&mut buf).await?;
// send String into framed object
framed.send(buf).await?;
// break out of loop and
break;
}
Err(err) => {
log::error!("{}", err);
framed
.send("File not found or not readable. Try again.")
.await?;
continue;
}
};
}
// not being able to read a line from the stream is unrecoverable
Some(Err(err)) => return Err(err),
// This EOF won't be hit.
None => continue,
}
}
// close connection after file has been copied to TCP stream
Ok(())
})
.map_err(|err| log::error!("Service Error: {:?}", err))
})?
.workers(2)
.run()
.await
}
#[tokio::main]
async fn main() -> io::Result<()> {
run().await?;
Ok(())
}
// alternatively:
// #[actix_rt::main]
// async fn main() -> io::Result<()> {
// run().await?;
// Ok(())
// }

View File

@@ -26,7 +26,7 @@ use log::{error, info};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
async fn run() -> io::Result<()> { async fn run() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let count = Arc::new(AtomicUsize::new(0)); let count = Arc::new(AtomicUsize::new(0));

View File

@@ -6,7 +6,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{ use crate::{
server::ServerCommand, server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService}, service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
socket::{ socket::{
create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs, create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs,
}, },
@@ -140,10 +140,11 @@ impl ServerBuilder {
} }
/// Add new service to the server. /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServerServiceFactory<TcpStream>,
U: ToSocketAddrs, U: ToSocketAddrs,
N: AsRef<str>,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
@@ -172,7 +173,7 @@ impl ServerBuilder {
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServerServiceFactory<TcpStream>,
{ {
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let addr = lst.local_addr()?; let addr = lst.local_addr()?;
@@ -213,7 +214,7 @@ impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServerServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>, N: AsRef<str>,
U: AsRef<std::path::Path>, U: AsRef<std::path::Path>,
{ {
@@ -240,7 +241,7 @@ impl ServerBuilder {
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServerServiceFactory<actix_rt::net::UnixStream>,
{ {
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;

View File

@@ -21,7 +21,7 @@ mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::handle::ServerHandle; pub use self::handle::ServerHandle;
pub use self::server::Server; pub use self::server::Server;
pub use self::service::ServiceFactory; pub use self::service::ServerServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;
#[doc(hidden)] #[doc(hidden)]

View File

@@ -1,16 +1,21 @@
use std::marker::PhantomData; use std::{
use std::net::SocketAddr; marker::PhantomData,
use std::task::{Context, Poll}; net::SocketAddr,
task::{Context, Poll},
};
use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::future::{ready, Ready}; use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::{
use crate::worker::WorkerCounterGuard; socket::{FromStream, MioStream},
worker::WorkerCounterGuard,
};
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { #[doc(hidden)]
pub trait ServerServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>; type Factory: BaseServiceFactory<Stream, Config = ()>;
fn create(&self) -> Self::Factory; fn create(&self) -> Self::Factory;
@@ -80,7 +85,7 @@ where
} }
} }
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { pub(crate) struct StreamNewService<F: ServerServiceFactory<Io>, Io: FromStream> {
name: String, name: String,
inner: F, inner: F,
token: usize, token: usize,
@@ -90,7 +95,7 @@ pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
impl<F, Io> StreamNewService<F, Io> impl<F, Io> StreamNewService<F, Io>
where where
F: ServiceFactory<Io>, F: ServerServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
pub(crate) fn create( pub(crate) fn create(
@@ -111,7 +116,7 @@ where
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io> impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
where where
F: ServiceFactory<Io>, F: ServerServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
fn name(&self, _: usize) -> &str { fn name(&self, _: usize) -> &str {
@@ -143,7 +148,7 @@ where
} }
} }
impl<F, T, I> ServiceFactory<I> for F impl<F, T, I> ServerServiceFactory<I> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: BaseServiceFactory<I, Config = ()>, T: BaseServiceFactory<I, Config = ()>,

View File

@@ -1,19 +1,18 @@
pub(crate) use std::net::{ pub(crate) use std::net::{
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
}; };
use std::{fmt, io};
use actix_rt::net::TcpStream;
pub(crate) use mio::net::TcpListener as MioTcpListener; pub(crate) use mio::net::TcpListener as MioTcpListener;
use mio::{event::Source, Interest, Registry, Token};
#[cfg(unix)] #[cfg(unix)]
pub(crate) use { pub(crate) use {
mio::net::UnixListener as MioUnixListener, mio::net::UnixListener as MioUnixListener,
std::os::unix::net::UnixListener as StdUnixListener, std::os::unix::net::UnixListener as StdUnixListener,
}; };
use std::{fmt, io};
use actix_rt::net::TcpStream;
use mio::{event::Source, Interest, Registry, Token};
pub(crate) enum MioListener { pub(crate) enum MioListener {
Tcp(MioTcpListener), Tcp(MioTcpListener),
#[cfg(unix)] #[cfg(unix)]

View File

@@ -2,7 +2,7 @@ use std::{io, net, sync::mpsc, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory}; use crate::{Server, ServerBuilder, ServerHandle, ServerServiceFactory};
/// A testing server. /// A testing server.
/// ///
@@ -16,7 +16,7 @@ use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory};
/// ///
/// #[actix_rt::main] /// #[actix_rt::main]
/// async fn main() { /// async fn main() {
/// let srv = TestServer::with(|| fn_service( /// let srv = TestServer::start(|| fn_service(
/// |sock| async move { /// |sock| async move {
/// println!("New connection: {:?}", sock); /// println!("New connection: {:?}", sock);
/// Ok::<_, ()>(()) /// Ok::<_, ()>(())
@@ -28,8 +28,8 @@ use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory};
/// ``` /// ```
pub struct TestServer; pub struct TestServer;
/// Test server runtime /// Test server handle.
pub struct TestServerRuntime { pub struct TestServerHandle {
addr: net::SocketAddr, addr: net::SocketAddr,
host: String, host: String,
port: u16, port: u16,
@@ -38,46 +38,26 @@ pub struct TestServerRuntime {
} }
impl TestServer { impl TestServer {
/// Start new server with server builder. /// Start new `TestServer` using application factory and default server config.
pub fn start<F>(mut factory: F) -> TestServerRuntime pub fn start(factory: impl ServerServiceFactory<TcpStream>) -> TestServerHandle {
where Self::start_with_builder(Server::build(), factory)
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
{
let (tx, rx) = mpsc::channel();
// run server in separate thread
let thread_handle = thread::spawn(move || {
System::new().block_on(async {
let server = factory(Server::build()).workers(1).disable_signals().run();
tx.send(server.handle()).unwrap();
server.await
})
});
let server_handle = rx.recv().unwrap();
TestServerRuntime {
addr: "127.0.0.1:0".parse().unwrap(),
host: "127.0.0.1".to_string(),
port: 0,
server_handle,
thread_handle: Some(thread_handle),
}
} }
/// Start new test server with application factory. /// Start new `TestServer` using application factory and server builder.
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime { pub fn start_with_builder(
server_builder: ServerBuilder,
factory: impl ServerServiceFactory<TcpStream>,
) -> TestServerHandle {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread
let thread_handle = thread::spawn(move || { let thread_handle = thread::spawn(move || {
let sys = System::new(); let lst = net::TcpListener::bind("127.0.0.1:0").unwrap();
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = lst.local_addr().unwrap();
let local_addr = tcp.local_addr().unwrap();
sys.block_on(async { System::new().block_on(async {
let server = Server::build() let server = server_builder
.listen("test", tcp, factory) .listen("test", lst, factory)
.unwrap() .unwrap()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
@@ -93,7 +73,7 @@ impl TestServer {
let host = format!("{}", addr.ip()); let host = format!("{}", addr.ip());
let port = addr.port(); let port = addr.port();
TestServerRuntime { TestServerHandle {
addr, addr,
host, host,
port, port,
@@ -107,17 +87,19 @@ impl TestServer {
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = let domain = Domain::for_address(addr);
Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap(); let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).unwrap();
socket.set_reuse_address(true).unwrap(); socket.set_reuse_address(true).unwrap();
socket.set_nonblocking(true).unwrap(); socket.set_nonblocking(true).unwrap();
socket.bind(&addr.into()).unwrap(); socket.bind(&addr.into()).unwrap();
socket.listen(1024).unwrap(); socket.listen(1024).unwrap();
net::TcpListener::from(socket).local_addr().unwrap() net::TcpListener::from(socket).local_addr().unwrap()
} }
} }
impl TestServerRuntime { impl TestServerHandle {
/// Test server host. /// Test server host.
pub fn host(&self) -> &str { pub fn host(&self) -> &str {
&self.host &self.host
@@ -140,12 +122,12 @@ impl TestServerRuntime {
} }
/// Connect to server, returning a Tokio `TcpStream`. /// Connect to server, returning a Tokio `TcpStream`.
pub fn connect(&self) -> std::io::Result<TcpStream> { pub fn connect(&self) -> io::Result<TcpStream> {
TcpStream::from_std(net::TcpStream::connect(self.addr)?) TcpStream::from_std(net::TcpStream::connect(self.addr)?)
} }
} }
impl Drop for TestServerRuntime { impl Drop for TestServerHandle {
fn drop(&mut self) { fn drop(&mut self) {
self.stop() self.stop()
} }
@@ -158,8 +140,14 @@ mod tests {
use super::*; use super::*;
#[tokio::test] #[tokio::test]
async fn plain_tokio_runtime() { async fn connect_in_tokio_runtime() {
let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) })); let srv = TestServer::start(|| fn_service(|_sock| async move { Ok::<_, ()>(()) }));
assert!(srv.connect().is_ok());
}
#[actix_rt::test]
async fn connect_in_actix_runtime() {
let srv = TestServer::start(|| fn_service(|_sock| async move { Ok::<_, ()>(()) }));
assert!(srv.connect().is_ok()); assert!(srv.connect().is_ok());
} }
} }

View File

@@ -26,20 +26,54 @@ fn test_bind() {
let srv = Server::build() let srv = Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.shutdown_timeout(3600)
.bind("test", addr, move || { .bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) }) fn_service(|_| async { Ok::<_, ()>(()) })
})? })?
.run(); .run();
let _ = tx.send(srv.handle()); tx.send(srv.handle()).unwrap();
srv.await srv.await
}) })
}); });
let srv = rx.recv().unwrap(); let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
net::TcpStream::connect(addr).unwrap();
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
fn test_listen() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let lst = net::TcpListener::bind(addr).unwrap();
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.shutdown_timeout(3600)
.listen("test", lst, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
tx.send(srv.handle()).unwrap();
srv.await
})
});
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
net::TcpStream::connect(addr).unwrap();
let _ = srv.stop(true); let _ = srv.stop(true);
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
@@ -80,38 +114,6 @@ fn plain_tokio_runtime() {
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
#[test]
fn test_listen() {
let addr = unused_addr();
let lst = net::TcpListener::bind(addr).unwrap();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let srv = Server::build()
.disable_signals()
.workers(1)
.listen("test", lst, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
let _ = tx.send(srv.handle());
srv.await
})
});
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test] #[test]
#[cfg(unix)] #[cfg(unix)]
fn test_start() { fn test_start() {

View File

@@ -0,0 +1,73 @@
use std::net;
use actix_rt::net::TcpStream;
use actix_server::{Server, TestServer};
use actix_service::fn_service;
use bytes::BytesMut;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
macro_rules! await_timeout_ms {
($fut:expr, $limit:expr) => {
::actix_rt::time::timeout(::std::time::Duration::from_millis($limit), $fut)
.await
.unwrap()
.unwrap();
};
}
#[tokio::test]
async fn testing_server_echo() {
let srv = TestServer::start(|| {
fn_service(move |mut stream: TcpStream| async move {
let mut size = 0;
let mut buf = BytesMut::new();
match stream.read_buf(&mut buf).await {
Ok(0) => return Err(()),
Ok(bytes_read) => {
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
Err(_) => return Err(()),
}
Ok((buf.freeze(), size))
})
});
let mut conn = srv.connect().unwrap();
await_timeout_ms!(conn.write_all(b"test"), 200);
let mut buf = Vec::new();
await_timeout_ms!(conn.read_to_end(&mut buf), 200);
assert_eq!(&buf, b"test".as_ref());
}
#[tokio::test]
async fn new_with_builder() {
let alt_addr = TestServer::unused_addr();
let srv = TestServer::start_with_builder(
Server::build()
.bind("alt", alt_addr, || {
fn_service(|_| async { Ok::<_, ()>(()) })
})
.unwrap(),
|| {
fn_service(|mut sock: TcpStream| async move {
let mut buf = [0u8; 16];
sock.read_exact(&mut buf).await
})
},
);
// connect to test server
srv.connect().unwrap();
// connect to alt service defined in custom ServerBuilder
TcpStream::from_std(net::TcpStream::connect(alt_addr).unwrap()).unwrap();
}

View File

@@ -69,7 +69,7 @@ tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2.0" actix-rt = "2.2.0"
actix-server = "2.0.0-rc.1" actix-server = "2.0.0-rc.2"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }

View File

@@ -94,7 +94,7 @@ fn rustls_connector(_cert: String, _key: String) -> ClientConfig {
async fn accepts_connections() { async fn accepts_connections() {
let (cert, key) = new_cert_and_key(); let (cert, key) = new_cert_and_key();
let srv = TestServer::with({ let srv = TestServer::start({
let cert = cert.clone(); let cert = cert.clone();
let key = key.clone(); let key = key.clone();

View File

@@ -74,7 +74,7 @@ fn openssl_connector(cert: String, key: String) -> SslConnector {
async fn accepts_connections() { async fn accepts_connections() {
let (cert, key) = new_cert_and_key(); let (cert, key) = new_cert_and_key();
let srv = TestServer::with({ let srv = TestServer::start({
let cert = cert.clone(); let cert = cert.clone();
let key = key.clone(); let key = key.clone();

View File

@@ -17,7 +17,7 @@ use actix_tls::connect::{ConnectError, ConnectInfo, Connection, Connector, Host}
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
#[actix_rt::test] #[actix_rt::test]
async fn test_string() { async fn test_string() {
let srv = TestServer::with(|| { let srv = TestServer::start(|| {
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@@ -34,7 +34,7 @@ async fn test_string() {
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
#[actix_rt::test] #[actix_rt::test]
async fn test_rustls_string() { async fn test_rustls_string() {
let srv = TestServer::with(|| { let srv = TestServer::start(|| {
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@@ -50,7 +50,7 @@ async fn test_rustls_string() {
#[actix_rt::test] #[actix_rt::test]
async fn test_static_str() { async fn test_static_str() {
let srv = TestServer::with(|| { let srv = TestServer::start(|| {
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@@ -81,7 +81,7 @@ async fn service_factory() {
Connector::default() Connector::default()
} }
let srv = TestServer::with(|| { let srv = TestServer::start(|| {
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@@ -101,7 +101,7 @@ async fn service_factory() {
async fn test_openssl_uri() { async fn test_openssl_uri() {
use std::convert::TryFrom; use std::convert::TryFrom;
let srv = TestServer::with(|| { let srv = TestServer::start(|| {
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@@ -120,7 +120,7 @@ async fn test_openssl_uri() {
async fn test_rustls_uri() { async fn test_rustls_uri() {
use std::convert::TryFrom; use std::convert::TryFrom;
let srv = TestServer::with(|| { let srv = TestServer::start(|| {
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@@ -136,7 +136,7 @@ async fn test_rustls_uri() {
#[actix_rt::test] #[actix_rt::test]
async fn test_local_addr() { async fn test_local_addr() {
let srv = TestServer::with(|| { let srv = TestServer::start(|| {
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;

View File

@@ -53,7 +53,7 @@ async fn custom_resolver_connect() {
use trust_dns_resolver::TokioAsyncResolver; use trust_dns_resolver::TokioAsyncResolver;
let srv = let srv =
TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) })); TestServer::start(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) }));
struct MyResolver { struct MyResolver {
trust_dns: TokioAsyncResolver, trust_dns: TokioAsyncResolver,