1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-12 16:47:05 +02:00

Compare commits

..

8 Commits

Author SHA1 Message Date
Nikolay Kim
41e49e8093 update changes 2019-09-25 09:32:33 +06:00
Nikolay Kim
715a770d7a deprecate test server 2019-09-25 09:31:52 +06:00
Nikolay Kim
5469d8c910 prep actix-testing release 2019-09-25 09:26:12 +06:00
Nikolay Kim
8be5f773f4 add actix-testing crate 2019-09-17 16:04:20 +06:00
karlri
b686b4c34e Feature uds: Add listen_uds to ServerBuilder (#43)
Allows directly passing an Unix Listener instead of a path. Useful
for example when running as a daemon under systemd with the systemd
crate.
2019-09-16 11:07:46 +06:00
Nikolay Kim
34a7b7f05a add TcpStreamService 2019-09-05 16:34:48 -07:00
Nikolay Kim
b1d9b06a87 Use arbiters storage for default async resolver 2019-09-02 15:15:55 -07:00
Nikolay Kim
94e673b50b Add arbiter specific storage 2019-09-02 15:03:03 -07:00
20 changed files with 479 additions and 48 deletions

View File

@@ -6,6 +6,7 @@
* Use new `Service<Request>` trait
* Add UDS listening support to `ServerBuilder`
## [0.2.4] - 2018-11-21

View File

@@ -17,14 +17,15 @@ edition = "2018"
members = [
"actix-codec",
"actix-connect",
"actix-ioframe",
"actix-rt",
"actix-service",
"actix-server",
"actix-server-config",
"actix-testing",
"actix-test-server",
"actix-threadpool",
"actix-tower",
"actix-ioframe",
"actix-utils",
"router",
]

View File

@@ -1,4 +1,4 @@
# Actix net [![Build Status](https://travis-ci.org/actix/actix-net.svg?branch=master)](https://travis-ci.org/actix/actix-net) [![codecov](https://codecov.io/gh/actix/actix-net/branch/master/graph/badge.svg)](https://codecov.io/gh/actix/actix-net) [![crates.io](https://meritbadge.herokuapp.com/actix-net)](https://crates.io/crates/actix-net) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
# Actix net [![Build Status](https://travis-ci.org/actix/actix-net.svg?branch=master)](https://travis-ci.org/actix/actix-net) [![codecov](https://codecov.io/gh/actix/actix-net/branch/master/graph/badge.svg)](https://codecov.io/gh/actix/actix-net) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Actix net - framework for composable network services

View File

@@ -1,5 +1,13 @@
# Changes
## [0.2.5] - 2019-09-05
* Add `TcpConnectService`
## [0.2.4] - 2019-09-02
* Use arbiter's storage for default async resolver
## [0.2.3] - 2019-08-05
* Add `ConnectService` and `OpensslConnectService`

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-connect"
version = "0.2.3"
version = "0.2.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"]
@@ -36,6 +36,7 @@ uri = ["http"]
actix-service = "0.4.0"
actix-codec = "0.1.2"
actix-utils = "0.4.0"
actix-rt = "0.2.5"
derive_more = "0.15"
either = "1.5.2"
futures = "0.1.25"

View File

@@ -10,8 +10,6 @@
#[macro_use]
extern crate log;
use std::cell::RefCell;
mod connect;
mod connector;
mod error;
@@ -30,8 +28,9 @@ pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{TcpConnector, TcpConnectorFactory};
pub use self::error::ConnectError;
pub use self::resolver::{Resolver, ResolverFactory};
pub use self::service::{ConnectService, ConnectServiceFactory};
pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService};
use actix_rt::Arbiter;
use actix_service::{NewService, Service, ServiceExt};
use tokio_tcp::TcpStream;
@@ -41,16 +40,12 @@ pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver
resolver
}
thread_local! {
static DEFAULT_RESOLVER: RefCell<Option<AsyncResolver>> = RefCell::new(None);
}
struct DefaultResolver(AsyncResolver);
pub(crate) fn get_default_resolver() -> AsyncResolver {
DEFAULT_RESOLVER.with(|cell| {
if let Some(ref resolver) = *cell.borrow() {
return resolver.clone();
}
if Arbiter::contains_item::<DefaultResolver>() {
return Arbiter::get_item(|item: &DefaultResolver| item.0.clone());
} else {
let (cfg, opts) = match read_system_conf() {
Ok((cfg, opts)) => (cfg, opts),
Err(e) => {
@@ -62,9 +57,9 @@ pub(crate) fn get_default_resolver() -> AsyncResolver {
let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg);
*cell.borrow_mut() = Some(resolver.clone());
Arbiter::set_item(DefaultResolver(resolver.clone()));
resolver
})
}
}
pub fn start_default_resolver() -> AsyncResolver {

View File

@@ -38,6 +38,14 @@ impl<T> ConnectServiceFactory<T> {
resolver: self.resolver.service(),
}
}
/// Construct new tcp stream service
pub fn tcp_service(&self) -> TcpConnectService<T> {
TcpConnectService {
tcp: self.tcp.service(),
resolver: self.resolver.service(),
}
}
}
impl<T> Default for ConnectServiceFactory<T> {
@@ -121,3 +129,55 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
Ok(Async::NotReady)
}
}
#[derive(Clone)]
pub struct TcpConnectService<T> {
tcp: TcpConnector<T>,
resolver: Resolver<T>,
}
impl<T: Address> Service for TcpConnectService<T> {
type Request = Connect<T>;
type Response = TcpStream;
type Error = ConnectError;
type Future = TcpConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Connect<T>) -> Self::Future {
TcpConnectServiceResponse {
fut1: Some(self.resolver.call(req)),
fut2: None,
tcp: self.tcp.clone(),
}
}
}
pub struct TcpConnectServiceResponse<T: Address> {
fut1: Option<<Resolver<T> as Service>::Future>,
fut2: Option<<TcpConnector<T> as Service>::Future>,
tcp: TcpConnector<T>,
}
impl<T: Address> Future for TcpConnectServiceResponse<T> {
type Item = TcpStream;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll());
let _ = self.fut1.take();
self.fut2 = Some(self.tcp.call(res));
}
if let Some(ref mut fut) = self.fut2 {
if let Async::Ready(conn) = fut.poll()? {
return Ok(Async::Ready(conn.into_parts().0));
}
}
Ok(Async::NotReady)
}
}

View File

@@ -1,5 +1,12 @@
# Changes
## [0.2.5] - 2019-09-02
### Added
* Add arbiter specific storage
## [0.2.4] - 2019-07-17
### Changed

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-rt"
version = "0.2.4"
version = "0.2.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -1,3 +1,4 @@
use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -17,6 +18,7 @@ thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Box<dyn Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
@@ -56,6 +58,7 @@ impl Arbiter {
let arb = Arbiter(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear());
Arbiter::spawn(ArbiterController { stop: None, rx });
arb
@@ -90,6 +93,7 @@ impl Arbiter {
let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys);
@@ -202,6 +206,50 @@ impl Arbiter {
})));
rx
}
/// Set item to arbiter storage
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}
/// Check if arbiter storage contains item
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
}
/// Get a reference to a type previously inserted on this arbiter's storage.
///
/// Panics is item is not inserted
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
let item = st
.get(&TypeId::of::<T>())
.and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref())
.unwrap();
f(item)
})
}
/// Get a mutable reference to a type previously inserted on this arbiter's storage.
///
/// Panics is item is not inserted
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
let item = st
.get_mut(&TypeId::of::<T>())
.and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
.unwrap();
f(item)
})
}
}
struct ArbiterController {

View File

@@ -185,20 +185,41 @@ impl ServerBuilder {
#[cfg(all(unix, feature = "uds"))]
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<tokio_uds::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::net::UnixListener;
// TODO: need to do something with existing paths
let _ = std::fs::remove_file(addr.as_ref());
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
let lst = UnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
#[cfg(all(unix, feature = "uds"))]
/// Add new unix domain service to the server.
/// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate.
pub fn listen_uds<F, N: AsRef<str>>(
mut self,
name: N,
lst: std::os::unix::net::UnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<tokio_uds::UnixStream>,
{
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let token = self.token.next();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(

View File

@@ -13,25 +13,10 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = ".."
[package.metadata.docs.rs]
features = ["ssl", "tls", "rust-tls"]
[lib]
name = "actix_test_server"
path = "src/lib.rs"
[features]
default = []
# tls
tls = ["native-tls", "actix-server/tls"]
# openssl
ssl = ["openssl", "actix-server/ssl"]
# rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
[dependencies]
actix-rt = "0.2.1"
actix-server = "0.5.0"
@@ -43,17 +28,5 @@ futures = "0.1"
tokio-tcp = "0.1"
tokio-reactor = "0.1"
# native-tls
native-tls = { version="0.2", optional = true }
# openssl
openssl = { version="0.10", optional = true }
#rustls
rustls = { version = "^0.15", optional = true }
tokio-rustls = { version = "^0.9", optional = true }
webpki = { version = "0.19", optional = true }
webpki-roots = { version = "0.16", optional = true }
[dev-dependencies]
actix-service = "0.4.0"

View File

@@ -0,0 +1,3 @@
# Actix test server (Deprecated)
Use [actix-testing](https://docs.rs/actix-testing/) instead

5
actix-testing/CHANGES.md Normal file
View File

@@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2019-09-25
* Initial impl

29
actix-testing/Cargo.toml Normal file
View File

@@ -0,0 +1,29 @@
[package]
name = "actix-testing"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix testing utils"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-testing/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
edition = "2018"
workspace = ".."
[lib]
name = "actix_testing"
path = "src/lib.rs"
[dependencies]
actix-rt = "0.2.1"
actix-server = "0.6.0"
actix-server-config = "0.1.0"
actix-service = "0.4.0"
log = "0.4"
net2 = "0.2"
futures = "0.1"
tokio-tcp = "0.1"
tokio-reactor = "0.1"

View File

@@ -0,0 +1 @@
../LICENSE-APACHE

1
actix-testing/LICENSE-MIT Symbolic link
View File

@@ -0,0 +1 @@
../LICENSE-MIT

9
actix-testing/README.md Normal file
View File

@@ -0,0 +1,9 @@
# Actix test utilities [![crates.io](https://meritbadge.herokuapp.com/actix-testing)](https://crates.io/crates/actix-testint) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
## Documentation & community resources
* [User Guide](https://actix.rs/docs/)
* [API Documentation](https://docs.rs/actix-testing/)
* [Chat on gitter](https://gitter.im/actix/actix)
* Cargo package: [actix-http-test](https://crates.io/crates/actix-testing)
* Minimum supported Rust version: 1.37 or later

152
actix-testing/src/lib.rs Normal file
View File

@@ -0,0 +1,152 @@
//! Various helpers for Actix applications to use during testing.
use std::sync::mpsc;
use std::{net, thread};
use actix_rt::System;
use actix_server::{Server, ServerBuilder, StreamServiceFactory};
pub use actix_server_config::{Io, ServerConfig};
use net2::TcpBuilder;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
mod rt;
pub use self::rt::*;
/// The `TestServer` type.
///
/// `TestServer` is very simple test server that simplify process of writing
/// integration tests for actix-net applications.
///
/// # Examples
///
/// ```rust
/// use actix_service::{service_fn, IntoNewService};
/// use actix_testing::TestServer;
///
/// fn main() {
/// let srv = TestServer::with(|| service_fn(
/// |sock| {
/// println!("New connection: {:?}", sock);
/// Ok::<_, ()>(())
/// }
/// ));
///
/// println!("SOCKET: {:?}", srv.connect());
/// }
/// ```
pub struct TestServer;
/// Test server runstime
pub struct TestServerRuntime {
addr: net::SocketAddr,
host: String,
port: u16,
system: System,
}
impl TestServer {
/// Start new server with server builder
pub fn new<F>(mut factory: F) -> TestServerRuntime
where
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
{
let (tx, rx) = mpsc::channel();
// run server in separate thread
thread::spawn(move || {
let sys = System::new("actix-test-server");
factory(Server::build())
.workers(1)
.disable_signals()
.start();
tx.send(System::current()).unwrap();
sys.run()
});
let system = rx.recv().unwrap();
TestServerRuntime {
system,
addr: "127.0.0.1:0".parse().unwrap(),
host: "127.0.0.1".to_string(),
port: 0,
}
}
/// Start new test server with application factory
pub fn with<F: StreamServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
let (tx, rx) = mpsc::channel();
// run server in separate thread
thread::spawn(move || {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
Server::build()
.listen("test", tcp, factory)?
.workers(1)
.disable_signals()
.start();
tx.send((System::current(), local_addr)).unwrap();
sys.run()
});
let (system, addr) = rx.recv().unwrap();
let host = format!("{}", addr.ip());
let port = addr.port();
TestServerRuntime {
system,
addr,
host,
port,
}
}
/// Get firat available unused local address
pub fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = TcpBuilder::new_v4().unwrap();
socket.bind(&addr).unwrap();
socket.reuse_address(true).unwrap();
let tcp = socket.to_tcp_listener().unwrap();
tcp.local_addr().unwrap()
}
}
impl TestServerRuntime {
/// Test server host
pub fn host(&self) -> &str {
&self.host
}
/// Test server port
pub fn port(&self) -> u16 {
self.port
}
/// Get test server address
pub fn addr(&self) -> net::SocketAddr {
self.addr
}
/// Stop http server
fn stop(&mut self) {
self.system.stop();
}
/// Connect to server, return tokio TcpStream
pub fn connect(&self) -> std::io::Result<TcpStream> {
TcpStream::from_std(net::TcpStream::connect(self.addr)?, &Handle::default())
}
}
impl Drop for TestServerRuntime {
fn drop(&mut self) {
self.stop()
}
}

116
actix-testing/src/rt.rs Normal file
View File

@@ -0,0 +1,116 @@
//! Various helpers for Actix applications to use during testing.
use std::cell::RefCell;
use actix_rt::{System, SystemRunner};
use actix_service::Service;
use futures::future::{lazy, Future, IntoFuture};
thread_local! {
static RT: RefCell<Inner> = {
RefCell::new(Inner(Some(System::builder().build())))
};
}
struct Inner(Option<SystemRunner>);
impl Inner {
fn get_mut(&mut self) -> &mut SystemRunner {
self.0.as_mut().unwrap()
}
}
impl Drop for Inner {
fn drop(&mut self) {
std::mem::forget(self.0.take().unwrap())
}
}
/// Runs the provided future, blocking the current thread until the future
/// completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function is intended to be used only for testing purpose.
/// This function panics on nested call.
pub fn block_on<F>(f: F) -> Result<F::Item, F::Error>
where
F: IntoFuture,
{
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(f.into_future()))
}
/// Runs the provided function, blocking the current thread until the result
/// future completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function is intended to be used only for testing purpose.
/// This function panics on nested call.
pub fn block_fn<F, R>(f: F) -> Result<R::Item, R::Error>
where
F: FnOnce() -> R,
R: IntoFuture,
{
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(f)))
}
/// Spawn future to the current test runtime.
pub fn spawn<F>(fut: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
run_on(move || {
actix_rt::spawn(fut);
});
}
/// Runs the provided function, with runtime enabled.
///
/// Note that this function is intended to be used only for testing purpose.
/// This function panics on nested call.
pub fn run_on<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
RT.with(move |rt| {
rt.borrow_mut()
.get_mut()
.block_on(lazy(|| Ok::<_, ()>(f())))
})
.unwrap()
}
/// Calls service and waits for response future completion.
///
/// ```rust,ignore
/// use actix_web::{test, App, HttpResponse, http::StatusCode};
/// use actix_service::Service;
///
/// #[test]
/// fn test_response() {
/// let mut app = test::init_service(
/// App::new()
/// .service(web::resource("/test").to(|| HttpResponse::Ok()))
/// );
///
/// // Create request object
/// let req = test::TestRequest::with_uri("/test").to_request();
///
/// // Call application
/// let resp = test::call_service(&mut app, req);
/// assert_eq!(resp.status(), StatusCode::OK);
/// }
/// ```
pub fn call_service<S, R>(app: &mut S, req: R) -> S::Response
where
S: Service<Request = R>,
S::Error: std::fmt::Debug,
{
block_on(run_on(move || app.call(req))).unwrap()
}