1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-09-02 20:06:37 +02:00

Compare commits

...

9 Commits

Author SHA1 Message Date
Rob Ede
6e2db3dc25 run io-uring tests on circle ci 2021-11-15 00:16:15 +00:00
Rob Ede
ed987eef06 prepare actix-server release 2.0.0-beta.8 2021-11-07 15:46:59 +00:00
fakeshadow
3658929010 fix io-uring feature for actix-server (#414)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-11-07 15:43:59 +00:00
fakeshadow
3f49d8ab54 remove usage of mio::net::TcpSocket (#413) 2021-11-07 14:18:23 +00:00
fakeshadow
161d1ee94b fix accept timeout and worker graceful shutdown (#412) 2021-11-07 13:00:19 +00:00
Rob Ede
81ba7cafaa fix server non-unix signal impl send (#410) 2021-11-05 02:16:13 +00:00
Rob Ede
f8f51a2240 prepare actix-server release 2.0.0-beta.7 2021-11-05 01:14:28 +00:00
Rob Ede
a2e765ea6e prepare actix-codec release 0.4.1 2021-11-05 01:05:51 +00:00
Rob Ede
03dae6a4a4 prepare actix-rt release 2.4.0 2021-11-05 00:51:34 +00:00
17 changed files with 119 additions and 65 deletions

22
.circleci/config.yml Normal file
View File

@@ -0,0 +1,22 @@
version: 2.1
jobs:
test-io-uring-features:
machine:
image: ubuntu-2004:202107-02
resource_class: large
environment:
CI: '1'
CARGO_INCREMENTAL: '0'
RUST_BACKTRACE: '1'
steps:
- checkout
- run: uname -a
- run: apt-get update; apt-get install -y liburing-dev
- run: cargo test -p=actix-rt --features=io-uring
- run: cargo test -p=actix-server --features=io-uring
workflows:
test:
jobs:
- test-io-uring-features

View File

@@ -196,13 +196,6 @@ jobs:
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1.3.0
- name: Install cargo-hack
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-hack
- name: doc tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
with: { command: ci-doctest }
- name: doc tests io-uring
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=nightly cargo ci-doctest"

View File

@@ -1,6 +1,9 @@
# Changes
## Unreleased - 2021-xx-xx
## 0.4.1 - 2021-11-05
* Added `LinesCodec.` [#338]
* `Framed::poll_ready` flushes when the buffer is full. [#409]

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-codec"
version = "0.4.0"
version = "0.4.1"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",

View File

@@ -1,6 +1,9 @@
# Changes
## Unreleased - 2021-xx-xx
## 2.4.0 - 2021-11-05
* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408]
* Start io-uring with `System::new` when feature is enabled. [#395]

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-rt"
version = "2.3.0"
version = "2.4.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",

View File

@@ -3,11 +3,11 @@
> Tokio-based single-threaded async runtime for the Actix ecosystem.
[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.3.0)](https://docs.rs/actix-rt/2.3.0)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.4.0)](https://docs.rs/actix-rt/2.4.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-rt/2.3.0/status.svg)](https://deps.rs/crate/actix-rt/2.3.0)
[![dependency status](https://deps.rs/crate/actix-rt/2.4.0/status.svg)](https://deps.rs/crate/actix-rt/2.4.0)
![Download](https://img.shields.io/crates/d/actix-rt.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb)

View File

@@ -175,9 +175,9 @@ impl System {
}
}
#[cfg(not(feature = "io-uring"))]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[cfg(not(feature = "io-uring"))]
#[must_use = "A SystemRunner does nothing unless `run` or `block_on` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: crate::runtime::Runtime,
@@ -216,9 +216,9 @@ impl SystemRunner {
}
}
#[cfg(feature = "io-uring")]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[cfg(feature = "io-uring")]
#[must_use = "A SystemRunner does nothing unless `run` or `block_on` is called."]
#[derive(Debug)]
pub struct SystemRunner;

View File

@@ -1,6 +1,15 @@
# Changes
## Unreleased - 2021-xx-xx
## 2.0.0-beta.8 - 2021-11-05
* Fix non-unix signal handler. [#410]
[#410]: https://github.com/actix/actix-net/pull/410
## 2.0.0-beta.7 - 2021-11-05
* Server can be started in regular Tokio runtime. [#408]
* Expose new `Server` type whose `Future` impl resolves when server stops. [#408]
* Rename `Server` to `ServerHandle`. [#407]

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "2.0.0-beta.6"
version = "2.0.0-beta.8"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>",
@@ -18,10 +18,10 @@ path = "src/lib.rs"
[features]
default = []
io-uring = ["actix-rt/io-uring"]
io-uring = ["tokio-uring"]
[dependencies]
actix-rt = { version = "2.0.0", default-features = false }
actix-rt = { version = "2.4.0", default-features = false }
actix-service = "2.0.0"
actix-utils = "3.0.0"
@@ -29,8 +29,12 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc
log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13"
socket2 = "0.4.2"
tokio = { version = "1.5.1", features = ["sync"] }
# runtime for io-uring feature
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies]
actix-codec = "0.4.0"
actix-rt = "2.0.0"

View File

@@ -127,7 +127,7 @@ impl Accept {
let mut events = mio::Events::with_capacity(256);
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
if let Err(e) = self.poll.poll(&mut events, self.timeout) {
match e.kind() {
io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e),

View File

@@ -8,7 +8,8 @@ use crate::{
server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
create_mio_tcp_listener, MioListener, MioTcpListener, StdSocketAddr, StdTcpListener,
ToSocketAddrs,
},
worker::ServerWorkerConfig,
Server,
@@ -263,7 +264,7 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
let mut success = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, backlog) {
match create_mio_tcp_listener(addr, backlog) {
Ok(lst) => {
success = true;
sockets.push(lst);
@@ -283,14 +284,3 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
))
}
}
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> {
let socket = match addr {
StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?,
StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?,
};
socket.set_reuseaddr(true)?;
socket.bind(addr)?;
socket.listen(backlog)
}

View File

@@ -35,7 +35,7 @@ impl fmt::Display for Signal {
/// Process signal listener.
pub(crate) struct Signals {
#[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)]
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,

View File

@@ -2,7 +2,7 @@ pub(crate) use std::net::{
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
};
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket};
pub(crate) use mio::net::TcpListener as MioTcpListener;
#[cfg(unix)]
pub(crate) use {
mio::net::UnixListener as MioUnixListener,
@@ -223,6 +223,22 @@ mod unix_impl {
}
}
pub(crate) fn create_mio_tcp_listener(
addr: StdSocketAddr,
backlog: u32,
) -> io::Result<MioTcpListener> {
use socket2::{Domain, Protocol, Socket, Type};
let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&addr.into())?;
socket.listen(backlog as i32)?;
Ok(MioTcpListener::from_std(StdTcpListener::from(socket)))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -234,11 +250,8 @@ mod tests {
assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = MioTcpSocket::new_v4().unwrap();
socket.set_reuseaddr(true).unwrap();
socket.bind(addr).unwrap();
let tcp = socket.listen(128).unwrap();
let lst = MioListener::Tcp(tcp);
let lst = create_mio_tcp_listener(addr, 128).unwrap();
let lst = MioListener::Tcp(lst);
assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1"));
}

View File

@@ -283,15 +283,6 @@ impl ServerWorker {
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
// every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
Arbiter::new()
};
// get actix system context if it is set
let sys = System::try_current();
@@ -299,6 +290,8 @@ impl ServerWorker {
// service factories initialization channel
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
// every worker runs in it's own thread and tokio runtime.
// use a custom tokio runtime builder to change the settings of runtime.
std::thread::Builder::new()
.name(format!("actix-server worker {}", idx))
.spawn(move || {
@@ -307,13 +300,7 @@ impl ServerWorker {
System::set_current(sys);
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap();
rt.block_on(tokio::task::LocalSet::new().run_until(async move {
let worker_fut = async move {
let fut = factories
.iter()
.enumerate()
@@ -368,7 +355,26 @@ impl ServerWorker {
})
.await
.expect("task 2 panic");
}))
};
#[cfg(all(target_os = "linux", feature = "io-uring"))]
{
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
tokio_uring::start(worker_fut)
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap();
rt.block_on(tokio::task::LocalSet::new().run_until(worker_fut))
}
})
.expect("worker thread error/panic");
@@ -552,6 +558,14 @@ impl Future for ServerWorker {
self.poll(cx)
}
WorkerState::Shutdown(ref mut shutdown) => {
// drop all pending connections in rx channel.
while let Poll::Ready(Some(conn)) = Pin::new(&mut this.rx).poll_recv(cx) {
// WorkerCounterGuard is needed as Accept thread has incremented counter.
// It's guard's job to decrement the counter together with drop of Conn.
let guard = this.counter.guard();
drop((conn, guard));
}
// wait for 1 second
ready!(shutdown.timer.as_mut().poll(cx));

View File

@@ -5,14 +5,17 @@ use std::{net, thread, time::Duration};
use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server;
use actix_service::fn_service;
use socket2::{Domain, Protocol, Socket, Type};
fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let tcp = socket.listen(32).unwrap();
tcp.local_addr().unwrap()
let socket =
Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap();
socket.set_reuse_address(true).unwrap();
socket.set_nonblocking(true).unwrap();
socket.bind(&addr.into()).unwrap();
socket.listen(32).unwrap();
net::TcpListener::from(socket).local_addr().unwrap()
}
#[test]

View File

@@ -62,7 +62,7 @@ tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies]
actix-rt = "2.2.0"
actix-server = "2.0.0-beta.6"
actix-server = "2.0.0-beta.8"
bytes = "1"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }