1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-26 19:47:43 +02:00

migrate to tokio 0.2.2

This commit is contained in:
Nikolay Kim
2019-12-05 16:40:24 +06:00
parent d49aca9595
commit 3a858feaec
37 changed files with 281 additions and 461 deletions

View File

@ -2,6 +2,10 @@
## [1.0.0-alpha.3] - 2019-12-xx
### Changed
* Migrate to `tokio=0.2.2`
### Fixed
* Fix compilation on non-unix platforms

View File

@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "1.0.0-alpha.2"
version = "1.0.0-alpha.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server - General purpose tcp server"
keywords = ["network", "framework", "async", "futures"]
@ -22,9 +22,9 @@ default = []
[dependencies]
actix-service = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.2"
actix-codec = "0.2.0-alpha.2"
actix-utils = "1.0.0-alpha.2"
actix-rt = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.3"
actix-utils = "1.0.0-alpha.3"
log = "0.4"
num_cpus = "1.0"
@ -33,13 +33,10 @@ net2 = "0.2"
futures = "0.3.1"
slab = "0.4"
tokio-net = { version = "0.2.0-alpha.6", features = ["signal", "tcp", "uds"] }
futures-core-preview = "0.3.0-alpha.19"
# unix domain sockets
mio-uds = { version = "0.6.7" }
[dev-dependencies]
bytes = "0.4"
bytes = "0.5"
env_logger = "0.6"
actix-testing = "1.0.0-alpha.2"
actix-testing = "1.0.0-alpha.3"

View File

@ -1,10 +1,9 @@
use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant};
use std::time::Duration;
use std::{io, thread};
use actix_rt::time::delay;
use actix_rt::time::{delay_until, Instant};
use actix_rt::System;
use futures::FutureExt;
use log::{error, info};
use slab::Slab;
@ -440,13 +439,10 @@ impl Accept {
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone();
System::current().arbiter().send(
async move {
delay(Instant::now() + Duration::from_millis(510)).await;
let _ = r.set_readiness(mio::Ready::readable());
}
.boxed(),
);
System::current().arbiter().send(Box::pin(async move {
delay_until(Instant::now() + Duration::from_millis(510)).await;
let _ = r.set_readiness(mio::Ready::readable());
}));
return;
}
}

View File

@ -1,10 +1,11 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::time::Duration;
use std::{io, mem, net};
use actix_rt::net::TcpStream;
use actix_rt::{spawn, time::delay, Arbiter, System};
use actix_rt::time::{delay_until, Instant};
use actix_rt::{spawn, System};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::channel::oneshot;
use futures::future::ready;
@ -305,22 +306,11 @@ impl ServerBuilder {
}
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let timeout = self.shutdown_timeout;
let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx1, tx2, avail.clone());
let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new().send(
async move {
Worker::start(rx1, rx2, services, avail, timeout);
}
.boxed(),
);
worker
Worker::start(idx, services, avail, self.shutdown_timeout)
}
fn handle_cmd(&mut self, item: ServerCommand) {
@ -395,8 +385,10 @@ impl ServerBuilder {
if exit {
spawn(
async {
delay(Instant::now() + Duration::from_millis(300))
.await;
delay_until(
Instant::now() + Duration::from_millis(300),
)
.await;
System::current().stop();
}
.boxed(),
@ -409,10 +401,12 @@ impl ServerBuilder {
// we need to stop system if server was spawned
if self.exit {
spawn(
delay(Instant::now() + Duration::from_millis(300)).then(|_| {
System::current().stop();
ready(())
}),
delay_until(Instant::now() + Duration::from_millis(300)).then(
|_| {
System::current().stop();
ready(())
},
),
);
}
if let Some(tx) = completion {

View File

@ -3,7 +3,7 @@ use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::stream::Stream;
use futures::future::lazy;
use crate::server::Server;
@ -33,11 +33,13 @@ pub(crate) struct Signals {
impl Signals {
pub(crate) fn start(srv: Server) -> io::Result<()> {
actix_rt::spawn({
actix_rt::spawn(lazy(|_| {
#[cfg(not(unix))]
{
let stream = actix_rt::signal::ctrl_c()?;
Signals { srv, stream }
match actix_rt::signal::ctrl_c() {
Ok(stream) => actix_rt::spawn(Signals { srv, stream }),
Err(e) => log::error!("Can not initialize ctrl-c handler err: {}", e),
}
}
#[cfg(unix)]
@ -54,12 +56,19 @@ impl Signals {
];
for (kind, sig) in sig_map.iter() {
streams.push((*sig, unix::signal(*kind)?));
match unix::signal(*kind) {
Ok(stream) => streams.push((*sig, stream)),
Err(e) => log::error!(
"Can not initialize stream handler for {:?} err: {}",
sig,
e
),
}
}
Signals { srv, streams }
actix_rt::spawn(Signals { srv, streams })
}
});
}));
Ok(())
}
@ -81,7 +90,7 @@ impl Future for Signals {
{
for idx in 0..self.streams.len() {
loop {
match Pin::new(&mut self.streams[idx].1).poll_next(cx) {
match self.streams[idx].1.poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => break,
Poll::Ready(Some(_)) => {

View File

@ -3,8 +3,6 @@ use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream;
use tokio_net::driver::Handle;
pub(crate) enum StdListener {
Tcp(net::TcpListener),
#[cfg(all(unix))]
@ -152,7 +150,7 @@ pub trait FromStream: AsyncRead + AsyncWrite + Sized {
impl FromStream for TcpStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(stream) => TcpStream::from_std(stream, &Handle::default()),
StdStream::Tcp(stream) => TcpStream::from_std(stream),
#[cfg(all(unix))]
StdStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
@ -166,9 +164,7 @@ impl FromStream for actix_rt::net::UnixStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
StdStream::Uds(stream) => {
actix_rt::net::UnixStream::from_std(stream, &Handle::default())
}
StdStream::Uds(stream) => actix_rt::net::UnixStream::from_std(stream),
}
}
}

View File

@ -4,10 +4,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time;
use actix_rt::time::{delay, Delay};
use actix_rt::time::{delay_until, Delay, Instant};
use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::future::{join_all, LocalBoxFuture, MapOk};
use futures::{Future, FutureExt, Stream, TryFutureExt};
@ -161,56 +161,66 @@ enum WorkerServiceStatus {
impl Worker {
pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
) {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
rx,
rx2,
availability,
factories,
shutdown_timeout,
services: Vec::new(),
conns: conns.clone(),
state: WorkerState::Unavailable(Vec::new()),
});
) -> WorkerClient {
let (tx1, rx) = unbounded();
let (tx2, rx2) = unbounded();
let avail = availability.clone();
let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
for (idx, factory) in wrk.factories.iter().enumerate() {
fut.push(factory.create().map_ok(move |r| {
r.into_iter()
.map(|(t, s): (Token, _)| (idx, t, s))
.collect::<Vec<_>>()
}));
}
Arbiter::new().send(
async move {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
rx,
rx2,
availability,
factories,
shutdown_timeout,
services: Vec::new(),
conns: conns.clone(),
state: WorkerState::Unavailable(Vec::new()),
});
spawn(async move {
let res = join_all(fut).await;
let res: Result<Vec<_>, _> = res.into_iter().collect();
match res {
Ok(services) => {
for item in services {
for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
for (idx, factory) in wrk.factories.iter().enumerate() {
fut.push(factory.create().map_ok(move |r| {
r.into_iter()
.map(|(t, s): (Token, _)| (idx, t, s))
.collect::<Vec<_>>()
}));
}
spawn(async move {
let res = join_all(fut).await;
let res: Result<Vec<_>, _> = res.into_iter().collect();
match res {
Ok(services) => {
for item in services {
for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
}
}
}
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
}
}
}
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
}
wrk.await
});
}
wrk.await
});
.boxed(),
);
WorkerClient::new(idx, tx1, tx2, avail)
}
fn shutdown(&mut self, force: bool) {
@ -322,8 +332,8 @@ impl Future for Worker {
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
Box::pin(delay(time::Instant::now() + time::Duration::from_secs(1))),
Box::pin(delay(time::Instant::now() + self.shutdown_timeout)),
Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))),
Box::pin(delay_until(Instant::now() + self.shutdown_timeout)),
Some(result),
);
} else {
@ -399,7 +409,6 @@ impl Future for Worker {
);
}
Poll::Pending => {
// self.state = WorkerState::Restarting(idx, token, fut);
return Poll::Pending;
}
}
@ -428,19 +437,18 @@ impl Future for Worker {
match t1.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
*t1 = Box::pin(delay(
time::Instant::now() + time::Duration::from_secs(1),
*t1 = Box::pin(delay_until(
Instant::now() + time::Duration::from_secs(1),
));
let _ = t1.as_mut().poll(cx);
}
}
// self.state = WorkerState::Shutdown(t1, t2, tx);
Poll::Pending
}
WorkerState::Available => {
loop {
match Pin::new(&mut self.rx).poll_next(cx) {
// handle incoming tcp stream
// handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => {
match self.check_readiness(cx) {
Ok(true) => {

View File

@ -29,6 +29,8 @@ fn test_bind() {
let h = thread::spawn(move || {
let sys = actix_rt::System::new("test");
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || service_fn(|_| ok::<_, ()>(())))
.unwrap()
.start();
@ -51,14 +53,16 @@ fn test_listen() {
let h = thread::spawn(move || {
let sys = actix_rt::System::new("test");
let lst = net::TcpListener::bind(addr).unwrap();
let srv = Server::build()
Server::build()
.disable_signals()
.workers(1)
.listen("test", lst, move || service_fn(|_| ok::<_, ()>(())))
.unwrap()
.start();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = tx.send(actix_rt::System::current());
let _ = sys.run();
});
let (_, sys) = rx.recv().unwrap();
let sys = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
@ -72,10 +76,12 @@ fn test_start() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let _ = thread::spawn(move || {
let sys = actix_rt::System::new("test");
let srv: Server = Server::build()
.backlog(100)
.workers(1)
.disable_signals()
.bind("test", addr, move || {
service_fn(|io: TcpStream| {
async move {
@ -126,7 +132,7 @@ fn test_start() {
thread::sleep(time::Duration::from_millis(100));
let _ = sys.stop();
let _ = h.join();
// let _ = h.join();
}
#[test]
@ -142,6 +148,7 @@ fn test_configure() {
let num = num2.clone();
let sys = actix_rt::System::new("test");
let srv = Server::build()
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
let lst = net::TcpListener::bind(addr3).unwrap();