From 3a858feaec19e3f34cfe5238a7161b0330aa3933 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 5 Dec 2019 16:40:24 +0600 Subject: [PATCH] migrate to tokio 0.2.2 --- actix-connect/CHANGES.md | 6 + actix-connect/Cargo.toml | 22 ++-- actix-connect/src/lib.rs | 15 ++- actix-connect/src/{resolver.rs => resolve.rs} | 0 actix-connect/src/service.rs | 2 +- actix-connect/src/ssl/rustls.rs | 6 +- actix-connect/tests/test_connect.rs | 6 +- actix-ioframe/CHANGES.md | 6 +- actix-ioframe/Cargo.toml | 2 +- actix-rt/CHANGES.md | 4 + actix-rt/Cargo.toml | 8 +- actix-rt/src/arbiter.rs | 26 ++-- actix-rt/src/builder.rs | 75 ++---------- actix-rt/src/lib.rs | 25 +--- actix-rt/src/mod.rs | 92 -------------- actix-rt/src/runtime.rs | 115 ++++-------------- actix-rt/src/system.rs | 12 +- actix-server/CHANGES.md | 4 + actix-server/Cargo.toml | 15 +-- actix-server/src/accept.rs | 16 +-- actix-server/src/builder.rs | 34 +++--- actix-server/src/signals.rs | 25 ++-- actix-server/src/socket.rs | 8 +- actix-server/src/worker.rs | 110 +++++++++-------- actix-server/tests/test_server.rs | 17 ++- actix-testing/CHANGES.md | 4 + actix-testing/Cargo.toml | 8 +- actix-testing/src/lib.rs | 7 +- actix-tls/CHANGES.md | 8 ++ actix-tls/Cargo.toml | 17 ++- actix-utils/CHANGES.md | 2 + actix-utils/Cargo.toml | 8 +- actix-utils/src/keepalive.rs | 12 +- router/CHANGES.txt | 6 + router/Cargo.toml | 13 +- router/src/resource.rs | 3 +- router/src/url.rs | 3 +- 37 files changed, 281 insertions(+), 461 deletions(-) rename actix-connect/src/{resolver.rs => resolve.rs} (100%) delete mode 100644 actix-rt/src/mod.rs diff --git a/actix-connect/CHANGES.md b/actix-connect/CHANGES.md index 190c11a2..08bda7ce 100644 --- a/actix-connect/CHANGES.md +++ b/actix-connect/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [1.0.0-alpha.3] - 2019-12-xx + +### Changed + +* Migrate to `tokio=0.2.2` + ## [1.0.0-alpha.2] - 2019-12-02 ### Changed diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 23219e7a..5ddaf262 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-connect" -version = "1.0.0-alpha.2" +version = "1.0.0-alpha.3" authors = ["Nikolay Kim "] description = "Actix connect - tcp connector service" keywords = ["network", "framework", "async", "futures"] @@ -27,33 +27,33 @@ default = ["uri"] openssl = ["open-ssl", "tokio-openssl"] # rustls -# rustls = ["rust-tls", "tokio-rustls", "webpki"] +rustls = ["rust-tls", "tokio-rustls", "webpki"] # support http::Uri as connect address uri = ["http"] [dependencies] actix-service = "1.0.0-alpha.3" -actix-codec = "0.2.0-alpha.2" -actix-utils = "1.0.0-alpha.2" -actix-rt = "1.0.0-alpha.2" +actix-codec = "0.2.0-alpha.3" +actix-utils = "1.0.0-alpha.3" +actix-rt = "1.0.0-alpha.3" derive_more = "0.99.2" either = "1.5.2" futures = "0.3.1" -http = { version = "0.1.17", optional = true } +http = { version = "0.2.0", optional = true } log = "0.4" -trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false } +# trust-dns-resolver = { version="0.18.0", default-features = false } +trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns.git", branch = "update-tokio-0.2-alpha.2" } # openssl open-ssl = { version="0.10", package = "openssl", optional = true } -tokio-openssl = { version = "0.4.0-alpha.6", optional = true } +tokio-openssl = { version = "0.4.0", optional = true } # rustls rust-tls = { version = "0.16.0", package = "rustls", optional = true } -# tokio-rustls = { version = "0.10.0", optional = true } -# tokio-rustls = { git = "https://github.com/quininer/tokio-rustls.git", branch = "tokio-0.2", optional = true } +tokio-rustls = { version = "0.12.0", optional = true } webpki = { version = "0.21", optional = true } [dev-dependencies] -bytes = "0.4" +bytes = "0.5.2" actix-testing = { version="1.0.0-alpha.2" } diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index 33e32fae..89b51b61 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -14,7 +14,7 @@ extern crate log; mod connect; mod connector; mod error; -mod resolver; +mod resolve; mod service; pub mod ssl; @@ -23,15 +23,20 @@ mod uri; use actix_rt::{net::TcpStream, Arbiter}; use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory}; +use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use trust_dns_resolver::system_conf::read_system_conf; +use trust_dns_resolver::AsyncResolver; -pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -pub use trust_dns_resolver::system_conf::read_system_conf; -pub use trust_dns_resolver::{error::ResolveError, AsyncResolver}; +pub mod resolver { + pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; + pub use trust_dns_resolver::system_conf::read_system_conf; + pub use trust_dns_resolver::{error::ResolveError, AsyncResolver}; +} pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{TcpConnector, TcpConnectorFactory}; pub use self::error::ConnectError; -pub use self::resolver::{Resolver, ResolverFactory}; +pub use self::resolve::{Resolver, ResolverFactory}; pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver { diff --git a/actix-connect/src/resolver.rs b/actix-connect/src/resolve.rs similarity index 100% rename from actix-connect/src/resolver.rs rename to actix-connect/src/resolve.rs diff --git a/actix-connect/src/service.rs b/actix-connect/src/service.rs index be42a568..6a290929 100644 --- a/actix-connect/src/service.rs +++ b/actix-connect/src/service.rs @@ -11,7 +11,7 @@ use trust_dns_resolver::AsyncResolver; use crate::connect::{Address, Connect, Connection}; use crate::connector::{TcpConnector, TcpConnectorFactory}; use crate::error::ConnectError; -use crate::resolver::{Resolver, ResolverFactory}; +use crate::resolve::{Resolver, ResolverFactory}; pub struct ConnectServiceFactory { tcp: TcpConnectorFactory, diff --git a/actix-connect/src/ssl/rustls.rs b/actix-connect/src/ssl/rustls.rs index ef6c8278..3616c44d 100644 --- a/actix-connect/src/ssl/rustls.rs +++ b/actix-connect/src/ssl/rustls.rs @@ -62,7 +62,7 @@ where type InitError = (); type Future = Ready>; - fn new_service(&self, _: &()) -> Self::Future { + fn new_service(&self, _: ()) -> Self::Future { ok(RustlsConnectorService { connector: self.connector.clone(), _t: PhantomData, @@ -93,7 +93,7 @@ where type Error = std::io::Error; type Future = ConnectAsyncExt; - fn poll_ready(&mut self, _: &mut Context) -> Poll> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -120,7 +120,7 @@ where { type Output = Result>, std::io::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); Poll::Ready( futures::ready!(Pin::new(&mut this.fut).poll(cx)).map(|stream| { diff --git a/actix-connect/tests/test_connect.rs b/actix-connect/tests/test_connect.rs index 41ebd7a5..994daeb1 100644 --- a/actix-connect/tests/test_connect.rs +++ b/actix-connect/tests/test_connect.rs @@ -6,8 +6,8 @@ use actix_service::{service_fn, Service, ServiceFactory}; use actix_testing::TestServer; use bytes::Bytes; use futures::SinkExt; -use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use actix_connect::resolver::{ResolverConfig, ResolverOpts}; use actix_connect::Connect; #[cfg(feature = "openssl")] @@ -97,7 +97,7 @@ async fn test_new_service() { #[cfg(feature = "openssl")] #[actix_rt::test] async fn test_uri() { - use http::HttpTryFrom; + use std::convert::TryFrom; let srv = TestServer::with(|| { service_fn(|io: TcpStream| { @@ -118,7 +118,7 @@ async fn test_uri() { #[cfg(feature = "rustls")] #[actix_rt::test] async fn test_rustls_uri() { - use http::HttpTryFrom; + use std::convert::TryFrom; let srv = TestServer::with(|| { service_fn(|io: TcpStream| { diff --git a/actix-ioframe/CHANGES.md b/actix-ioframe/CHANGES.md index 204d799c..67c63f59 100644 --- a/actix-ioframe/CHANGES.md +++ b/actix-ioframe/CHANGES.md @@ -1,15 +1,17 @@ # Changes +## [0.3.0-alpha.3] - 2019-12-xx + +* Migrate to `tokio=0.2.2` + ## [0.3.0-alpha.2] - 2019-12-02 * Migrate to `std::future` - ## [0.1.1] - 2019-10-14 * Re-register task on every dispatcher poll. - ## [0.1.0] - 2019-09-25 * Initial release diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index c7f3f9a2..f4315b6f 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -22,7 +22,7 @@ actix-service = "1.0.0-alpha.3" actix-codec = "0.2.0-alpha.2" actix-utils = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2" -bytes = "0.4" +bytes = "0.5" either = "1.5.2" futures = "0.3.1" pin-project = "0.4.6" diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 522ff892..23aba775 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -6,6 +6,10 @@ * Fix compilation on non-unix platforms +### Changed + +* Migrate to `tokio=0.2.2` + ## [1.0.0-alpha.2] - 2019-12-02 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index b2315ae6..8b1a058d 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-rt" -version = "1.0.0-alpha.2" +version = "1.0.0-alpha.3" authors = ["Nikolay Kim "] description = "Actix runtime" keywords = ["network", "framework", "async", "futures"] @@ -22,8 +22,4 @@ actix-macros = "0.1.0-alpha.1" actix-threadpool = "0.3" futures = "0.3.1" copyless = "0.1.4" - -tokio = { version = "=0.2.0-alpha.6", features=["rt-current-thread","tcp","uds","udp","timer","signal"] } -tokio-executor = "=0.2.0-alpha.6" -tokio-net = "=0.2.0-alpha.6" -tokio-timer = "=0.3.0-alpha.6" \ No newline at end of file +tokio = { version = "0.2.2", default-features=false, features=["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 600128c0..a5cbe8e0 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,9 +9,8 @@ use std::{fmt, thread}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot::{channel, Canceled, Sender}; use futures::{future, Future, FutureExt, Stream}; -use tokio_executor::current_thread::spawn; -use crate::builder::Builder; +use crate::runtime::Runtime; use crate::system::System; use copyless::BoxHelper; @@ -19,7 +18,7 @@ use copyless::BoxHelper; thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); - static Q: RefCell>>> = RefCell::new(Vec::new()); + static Q: RefCell>>>> = RefCell::new(Vec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -101,7 +100,7 @@ impl Arbiter { let handle = thread::Builder::new() .name(name.clone()) .spawn(move || { - let mut rt = Builder::new().build_rt().expect("Can not create Runtime"); + let mut rt = Runtime::new().expect("Can not create Runtime"); let arb = Arbiter::with_sender(arb_tx); let (stop, stop_rx) = channel(); @@ -143,14 +142,16 @@ impl Arbiter { } } - pub(crate) fn run_system() { + pub(crate) fn run_system(rt: Option<&Runtime>) { RUNNING.with(|cell| cell.set(true)); Q.with(|cell| { let mut v = cell.borrow_mut(); for fut in v.drain(..) { - // We pin the boxed future, so it can never again be moved. - let fut = unsafe { Pin::new_unchecked(fut) }; - tokio_executor::current_thread::spawn(fut); + if let Some(rt) = rt { + rt.spawn(fut); + } else { + tokio::task::spawn_local(fut); + } } }); } @@ -169,11 +170,14 @@ impl Arbiter { RUNNING.with(move |cell| { if cell.get() { // Spawn the future on running executor - spawn(future); + tokio::task::spawn_local(future); } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now - Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future))); + Q.with(move |cell| { + cell.borrow_mut() + .push(unsafe { Pin::new_unchecked(Box::alloc().init(future)) }) + }); } }); } @@ -325,7 +329,7 @@ impl Future for ArbiterController { return Poll::Ready(()); } ArbiterCommand::Execute(fut) => { - spawn(fut); + tokio::task::spawn_local(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 8814decf..2ca77b3c 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -4,11 +4,7 @@ use std::io; use futures::channel::mpsc::unbounded; use futures::channel::oneshot::{channel, Receiver}; use futures::future::{lazy, Future, FutureExt}; - -use tokio::runtime::current_thread::Handle; -use tokio_executor::current_thread::CurrentThread; -use tokio_net::driver::Reactor; -use tokio_timer::{clock::Clock, timer::Timer}; +use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; use crate::runtime::Runtime; @@ -23,9 +19,6 @@ pub struct Builder { /// Name of the System. Defaults to "actix" if unset. name: Cow<'static, str>, - /// The clock to use - clock: Clock, - /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. stop_on_panic: bool, } @@ -34,7 +27,6 @@ impl Builder { pub(crate) fn new() -> Self { Builder { name: Cow::Borrowed("actix"), - clock: Clock::new(), stop_on_panic: false, } } @@ -45,14 +37,6 @@ impl Builder { self } - /// Set the Clock instance that will be used by this System. - /// - /// Defaults to the system clock. - pub fn clock(mut self, clock: Clock) -> Self { - self.clock = clock; - self - } - /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an /// uncaught panic is thrown from a worker thread. /// @@ -72,8 +56,8 @@ impl Builder { /// Create new System that can run asynchronously. /// /// This method panics if it cannot start the system arbiter - pub(crate) fn build_async(self, executor: Handle) -> AsyncSystemRunner { - self.create_async_runtime(executor) + pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner { + self.create_async_runtime(local) } /// This function will start tokio runtime and will finish once the @@ -86,7 +70,7 @@ impl Builder { self.create_runtime(f).run() } - fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner { + fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); @@ -96,7 +80,7 @@ impl Builder { let arb = SystemArbiter::new(stop_tx, sys_receiver); // start the system arbiter - executor.spawn(arb).expect("could not start system arbiter"); + let _ = local.spawn_local(arb); AsyncSystemRunner { stop, system } } @@ -113,40 +97,14 @@ impl Builder { // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); - let mut rt = self.build_rt().unwrap(); + let mut rt = Runtime::new().unwrap(); rt.spawn(arb); // init system arbiter and run configuration method - let _ = rt.block_on(lazy(move |_| { - f(); - Ok::<_, ()>(()) - })); + rt.block_on(lazy(move |_| f())); SystemRunner { rt, stop, system } } - - pub(crate) fn build_rt(&self) -> io::Result { - // We need a reactor to receive events about IO objects from kernel - let reactor = Reactor::new()?; - let reactor_handle = reactor.handle(); - - // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the - // reactor pick up some new external events. - let timer = Timer::new_with_now(reactor, self.clock.clone()); - let timer_handle = timer.handle(); - - // And now put a single-threaded executor on top of the timer. When there are no futures ready - // to do something, it'll let the timer or the reactor to generate some new stimuli for the - // futures to continue in their life. - let executor = CurrentThread::new_with_park(timer); - - Ok(Runtime::new2( - reactor_handle, - timer_handle, - self.clock.clone(), - executor, - )) - } } #[derive(Debug)] @@ -163,7 +121,7 @@ impl AsyncSystemRunner { // run loop lazy(|_| { - Arbiter::run_system(); + Arbiter::run_system(None); async { let res = match stop.await { Ok(code) => { @@ -202,10 +160,7 @@ impl SystemRunner { let SystemRunner { mut rt, stop, .. } = self; // run loop - let _ = rt.block_on(async { - Arbiter::run_system(); - Ok::<_, ()>(()) - }); + Arbiter::run_system(Some(&rt)); let result = match rt.block_on(stop) { Ok(code) => { if code != 0 { @@ -226,17 +181,11 @@ impl SystemRunner { /// Execute a future and wait for result. pub fn block_on(&mut self, fut: F) -> O where - F: Future, + F: Future + 'static, { - self.rt.block_on(async { - Arbiter::run_system(); - }); - + Arbiter::run_system(Some(&self.rt)); let res = self.rt.block_on(fut); - self.rt.block_on(async { - Arbiter::stop_system(); - }); - + Arbiter::stop_system(); res } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index d2dd13c0..3ef7460e 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -38,9 +38,9 @@ where pub mod signal { #[cfg(unix)] pub mod unix { - pub use tokio_net::signal::unix::*; + pub use tokio::signal::unix::*; } - pub use tokio_net::signal::{ctrl_c, CtrlC}; + pub use tokio::signal::ctrl_c; } /// TCP/UDP/Unix bindings @@ -59,21 +59,8 @@ pub mod net { /// Utilities for tracking time. pub mod time { - use std::time::{Duration, Instant}; - - pub use tokio_timer::Interval; - pub use tokio_timer::{delay, delay_for, Delay}; - pub use tokio_timer::{timeout, Timeout}; - - /// Creates new `Interval` that yields with interval of `duration`. The first - /// tick completes immediately. - pub fn interval(duration: Duration) -> Interval { - Interval::new(Instant::now(), duration) - } - - /// Creates new `Interval` that yields with interval of `period` with the - /// first tick completing at `at`. - pub fn interval_at(start: Instant, duration: Duration) -> Interval { - Interval::new(start, duration) - } + pub use tokio::time::Instant; + pub use tokio::time::{delay_for, delay_until, Delay}; + pub use tokio::time::{interval, interval_at, Interval}; + pub use tokio::time::{timeout, Timeout}; } diff --git a/actix-rt/src/mod.rs b/actix-rt/src/mod.rs deleted file mode 100644 index dca41711..00000000 --- a/actix-rt/src/mod.rs +++ /dev/null @@ -1,92 +0,0 @@ -//! A runtime implementation that runs everything on the current thread. -//! -//! [`current_thread::Runtime`][rt] is similar to the primary -//! [`Runtime`][concurrent-rt] except that it runs all components on the current -//! thread instead of using a thread pool. This means that it is able to spawn -//! futures that do not implement `Send`. -//! -//! Same as the default [`Runtime`][concurrent-rt], the -//! [`current_thread::Runtime`][rt] includes: -//! -//! * A [reactor] to drive I/O resources. -//! * An [executor] to execute tasks that use these I/O resources. -//! * A [timer] for scheduling work to run after a set period of time. -//! -//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself -//! and cannot be safely moved to other threads. -//! -//! # Spawning from other threads -//! -//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot -//! safely be moved to other threads, it provides a `Handle` that can be sent -//! to other threads and allows to spawn new tasks from there. -//! -//! For example: -//! -//! ``` -//! # extern crate tokio; -//! # extern crate futures; -//! use tokio::runtime::current_thread::Runtime; -//! use tokio::prelude::*; -//! use std::thread; -//! -//! # fn main() { -//! let mut runtime = Runtime::new().unwrap(); -//! let handle = runtime.handle(); -//! -//! thread::spawn(move || { -//! handle.spawn(future::ok(())); -//! }).join().unwrap(); -//! -//! # /* -//! runtime.run().unwrap(); -//! # */ -//! # } -//! ``` -//! -//! # Examples -//! -//! Creating a new `Runtime` and running a future `f` until its completion and -//! returning its result. -//! -//! ``` -//! use tokio::runtime::current_thread::Runtime; -//! use tokio::prelude::*; -//! -//! let mut runtime = Runtime::new().unwrap(); -//! -//! // Use the runtime... -//! // runtime.block_on(f); // where f is a future -//! ``` -//! -//! [rt]: struct.Runtime.html -//! [concurrent-rt]: ../struct.Runtime.html -//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html -//! [reactor]: ../../reactor/struct.Reactor.html -//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors -//! [timer]: ../../timer/index.html - -mod builder; -mod runtime; - -pub use self::builder::Builder; -pub use self::runtime::{Runtime, Handle}; -pub use tokio_current_thread::spawn; -pub use tokio_current_thread::TaskExecutor; - -use futures::Future; - -/// Run the provided future to completion using a runtime running on the current thread. -/// -/// This first creates a new [`Runtime`], and calls [`Runtime::block_on`] with the provided future, -/// which blocks the current thread until the provided future completes. It then calls -/// [`Runtime::run`] to wait for any other spawned futures to resolve. -pub fn block_on_all(future: F) -> Result -where - F: Future, -{ - let mut r = Runtime::new().expect("failed to start runtime on current thread"); - let v = r.block_on(future)?; - r.run().expect("failed to resolve remaining futures"); - Ok(v) -} diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 79d6477d..a03361d6 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -1,73 +1,36 @@ -use std::error::Error; -use std::{fmt, io}; - -use futures::Future; -use tokio_executor::current_thread::{self, CurrentThread}; -use tokio_net::driver::{Handle as ReactorHandle, Reactor}; -use tokio_timer::{ - clock::Clock, - timer::{self, Timer}, -}; - -use crate::builder::Builder; +use std::future::Future; +use std::io; +use tokio::{runtime, task::LocalSet}; /// Single-threaded runtime provides a way to start reactor -/// and executor on the current thread. +/// and runtime on the current thread. /// /// See [module level][mod] documentation for more details. /// /// [mod]: index.html #[derive(Debug)] pub struct Runtime { - reactor_handle: ReactorHandle, - timer_handle: timer::Handle, - clock: Clock, - executor: CurrentThread>, -} - -/// Error returned by the `run` function. -#[derive(Debug)] -pub struct RunError { - inner: current_thread::RunError, -} - -impl fmt::Display for RunError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "{}", self.inner) - } -} - -impl Error for RunError { - fn description(&self) -> &str { - self.inner.description() - } - fn cause(&self) -> Option<&dyn Error> { - self.inner.source() - } + local: LocalSet, + rt: runtime::Runtime, } impl Runtime { #[allow(clippy::new_ret_no_self)] /// Returns a new runtime initialized with default configuration values. pub fn new() -> io::Result { - Builder::new().build_rt() + let rt = runtime::Builder::new() + .enable_io() + .enable_time() + .basic_scheduler() + .build()?; + + Ok(Runtime { + rt, + local: LocalSet::new(), + }) } - pub(super) fn new2( - reactor_handle: ReactorHandle, - timer_handle: timer::Handle, - clock: Clock, - executor: CurrentThread>, - ) -> Runtime { - Runtime { - reactor_handle, - timer_handle, - clock, - executor, - } - } - - /// Spawn a future onto the single-threaded Tokio runtime. + /// Spawn a future onto the single-threaded runtime. /// /// See [module level][mod] documentation for more details. /// @@ -75,7 +38,7 @@ impl Runtime { /// /// # Examples /// - /// ```rust + /// ```rust,ignore /// # use futures::{future, Future, Stream}; /// use actix_rt::Runtime; /// @@ -95,11 +58,11 @@ impl Runtime { /// /// This function panics if the spawn fails. Failure occurs if the executor /// is currently at capacity and is unable to spawn a new future. - pub fn spawn(&mut self, future: F) -> &mut Self + pub fn spawn(&self, future: F) -> &Self where F: Future + 'static, { - self.executor.spawn(future); + self.local.spawn_local(future); self } @@ -121,41 +84,9 @@ impl Runtime { /// complete execution by calling `block_on` or `run`. pub fn block_on(&mut self, f: F) -> F::Output where - F: Future, + F: Future + 'static, { - self.enter(|executor| { - // Run the provided future - executor.block_on(f) - }) - } - - /// Run the executor to completion, blocking the thread until **all** - /// spawned futures have completed. - pub fn run(&mut self) -> Result<(), RunError> { - self.enter(|executor| executor.run()) - .map_err(|e| RunError { inner: e }) - } - - fn enter(&mut self, f: F) -> R - where - F: FnOnce(&mut CurrentThread>) -> R, - { - let Runtime { - ref reactor_handle, - ref timer_handle, - ref clock, - ref mut executor, - .. - } = *self; - - // WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered - // automatically inside its `block_on` and `run` methods - tokio_executor::with_default(&mut current_thread::TaskExecutor::current(), || { - tokio_timer::clock::with_default(clock, || { - let _reactor_guard = tokio_net::driver::set_default(reactor_handle); - let _timer_guard = tokio_timer::set_default(timer_handle); - f(executor) - }) - }) + let res = self.local.block_on(&mut self.rt, f); + res } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 7f643095..383471cd 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -4,7 +4,7 @@ use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use futures::channel::mpsc::UnboundedSender; -use tokio::runtime::current_thread::Handle; +use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; @@ -58,16 +58,16 @@ impl System { } #[allow(clippy::new_ret_no_self)] - /// Create new system using provided CurrentThread Handle. + /// Create new system using provided tokio Handle. /// /// This method panics if it can not spawn system arbiter - pub fn run_in_executor>( + pub fn run_in_tokio>( name: T, - executor: Handle, - ) -> impl Future> + Send { + local: &LocalSet, + ) -> impl Future> { Self::builder() .name(name) - .build_async(executor) + .build_async(local) .run_nonblocking() } diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 84818261..de1e9da1 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -2,6 +2,10 @@ ## [1.0.0-alpha.3] - 2019-12-xx +### Changed + +* Migrate to `tokio=0.2.2` + ### Fixed * Fix compilation on non-unix platforms diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index cb35c7c0..7f32d0da 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "1.0.0-alpha.2" +version = "1.0.0-alpha.3" authors = ["Nikolay Kim "] description = "Actix server - General purpose tcp server" keywords = ["network", "framework", "async", "futures"] @@ -22,9 +22,9 @@ default = [] [dependencies] actix-service = "1.0.0-alpha.3" -actix-rt = "1.0.0-alpha.2" -actix-codec = "0.2.0-alpha.2" -actix-utils = "1.0.0-alpha.2" +actix-rt = "1.0.0-alpha.3" +actix-codec = "0.2.0-alpha.3" +actix-utils = "1.0.0-alpha.3" log = "0.4" num_cpus = "1.0" @@ -33,13 +33,10 @@ net2 = "0.2" futures = "0.3.1" slab = "0.4" -tokio-net = { version = "0.2.0-alpha.6", features = ["signal", "tcp", "uds"] } -futures-core-preview = "0.3.0-alpha.19" - # unix domain sockets mio-uds = { version = "0.6.7" } [dev-dependencies] -bytes = "0.4" +bytes = "0.5" env_logger = "0.6" -actix-testing = "1.0.0-alpha.2" \ No newline at end of file +actix-testing = "1.0.0-alpha.3" \ No newline at end of file diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 1ee38f85..993ecab4 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,10 +1,9 @@ use std::sync::mpsc as sync_mpsc; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{io, thread}; -use actix_rt::time::delay; +use actix_rt::time::{delay_until, Instant}; use actix_rt::System; -use futures::FutureExt; use log::{error, info}; use slab::Slab; @@ -440,13 +439,10 @@ impl Accept { info.timeout = Some(Instant::now() + Duration::from_millis(500)); let r = self.timer.1.clone(); - System::current().arbiter().send( - async move { - delay(Instant::now() + Duration::from_millis(510)).await; - let _ = r.set_readiness(mio::Ready::readable()); - } - .boxed(), - ); + System::current().arbiter().send(Box::pin(async move { + delay_until(Instant::now() + Duration::from_millis(510)).await; + let _ = r.set_readiness(mio::Ready::readable()); + })); return; } } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 02f94533..0ab8b8a2 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,10 +1,11 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{io, mem, net}; use actix_rt::net::TcpStream; -use actix_rt::{spawn, time::delay, Arbiter, System}; +use actix_rt::time::{delay_until, Instant}; +use actix_rt::{spawn, System}; use futures::channel::mpsc::{unbounded, UnboundedReceiver}; use futures::channel::oneshot; use futures::future::ready; @@ -305,22 +306,11 @@ impl ServerBuilder { } fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { - let (tx1, rx1) = unbounded(); - let (tx2, rx2) = unbounded(); - let timeout = self.shutdown_timeout; let avail = WorkerAvailability::new(notify); - let worker = WorkerClient::new(idx, tx1, tx2, avail.clone()); let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); - Arbiter::new().send( - async move { - Worker::start(rx1, rx2, services, avail, timeout); - } - .boxed(), - ); - - worker + Worker::start(idx, services, avail, self.shutdown_timeout) } fn handle_cmd(&mut self, item: ServerCommand) { @@ -395,8 +385,10 @@ impl ServerBuilder { if exit { spawn( async { - delay(Instant::now() + Duration::from_millis(300)) - .await; + delay_until( + Instant::now() + Duration::from_millis(300), + ) + .await; System::current().stop(); } .boxed(), @@ -409,10 +401,12 @@ impl ServerBuilder { // we need to stop system if server was spawned if self.exit { spawn( - delay(Instant::now() + Duration::from_millis(300)).then(|_| { - System::current().stop(); - ready(()) - }), + delay_until(Instant::now() + Duration::from_millis(300)).then( + |_| { + System::current().stop(); + ready(()) + }, + ), ); } if let Some(tx) = completion { diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index c001e237..7da8ea44 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -3,7 +3,7 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use futures_core::stream::Stream; +use futures::future::lazy; use crate::server::Server; @@ -33,11 +33,13 @@ pub(crate) struct Signals { impl Signals { pub(crate) fn start(srv: Server) -> io::Result<()> { - actix_rt::spawn({ + actix_rt::spawn(lazy(|_| { #[cfg(not(unix))] { - let stream = actix_rt::signal::ctrl_c()?; - Signals { srv, stream } + match actix_rt::signal::ctrl_c() { + Ok(stream) => actix_rt::spawn(Signals { srv, stream }), + Err(e) => log::error!("Can not initialize ctrl-c handler err: {}", e), + } } #[cfg(unix)] @@ -54,12 +56,19 @@ impl Signals { ]; for (kind, sig) in sig_map.iter() { - streams.push((*sig, unix::signal(*kind)?)); + match unix::signal(*kind) { + Ok(stream) => streams.push((*sig, stream)), + Err(e) => log::error!( + "Can not initialize stream handler for {:?} err: {}", + sig, + e + ), + } } - Signals { srv, streams } + actix_rt::spawn(Signals { srv, streams }) } - }); + })); Ok(()) } @@ -81,7 +90,7 @@ impl Future for Signals { { for idx in 0..self.streams.len() { loop { - match Pin::new(&mut self.streams[idx].1).poll_next(cx) { + match self.streams[idx].1.poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Pending => break, Poll::Ready(Some(_)) => { diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 743ba462..3025660a 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -3,8 +3,6 @@ use std::{fmt, io, net}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::net::TcpStream; -use tokio_net::driver::Handle; - pub(crate) enum StdListener { Tcp(net::TcpListener), #[cfg(all(unix))] @@ -152,7 +150,7 @@ pub trait FromStream: AsyncRead + AsyncWrite + Sized { impl FromStream for TcpStream { fn from_stdstream(sock: StdStream) -> io::Result { match sock { - StdStream::Tcp(stream) => TcpStream::from_std(stream, &Handle::default()), + StdStream::Tcp(stream) => TcpStream::from_std(stream), #[cfg(all(unix))] StdStream::Uds(_) => { panic!("Should not happen, bug in server impl"); @@ -166,9 +164,7 @@ impl FromStream for actix_rt::net::UnixStream { fn from_stdstream(sock: StdStream) -> io::Result { match sock { StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"), - StdStream::Uds(stream) => { - actix_rt::net::UnixStream::from_std(stream, &Handle::default()) - } + StdStream::Uds(stream) => actix_rt::net::UnixStream::from_std(stream), } } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a64a3f05..9049e91a 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -4,10 +4,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time; -use actix_rt::time::{delay, Delay}; +use actix_rt::time::{delay_until, Delay, Instant}; use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; -use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot; use futures::future::{join_all, LocalBoxFuture, MapOk}; use futures::{Future, FutureExt, Stream, TryFutureExt}; @@ -161,56 +161,66 @@ enum WorkerServiceStatus { impl Worker { pub(crate) fn start( - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + idx: usize, factories: Vec>, availability: WorkerAvailability, shutdown_timeout: time::Duration, - ) { - availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker { - rx, - rx2, - availability, - factories, - shutdown_timeout, - services: Vec::new(), - conns: conns.clone(), - state: WorkerState::Unavailable(Vec::new()), - }); + ) -> WorkerClient { + let (tx1, rx) = unbounded(); + let (tx2, rx2) = unbounded(); + let avail = availability.clone(); - let mut fut: Vec, _>> = Vec::new(); - for (idx, factory) in wrk.factories.iter().enumerate() { - fut.push(factory.create().map_ok(move |r| { - r.into_iter() - .map(|(t, s): (Token, _)| (idx, t, s)) - .collect::>() - })); - } + Arbiter::new().send( + async move { + availability.set(false); + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { + rx, + rx2, + availability, + factories, + shutdown_timeout, + services: Vec::new(), + conns: conns.clone(), + state: WorkerState::Unavailable(Vec::new()), + }); - spawn(async move { - let res = join_all(fut).await; - let res: Result, _> = res.into_iter().collect(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); + let mut fut: Vec, _>> = Vec::new(); + for (idx, factory) in wrk.factories.iter().enumerate() { + fut.push(factory.create().map_ok(move |r| { + r.into_iter() + .map(|(t, s): (Token, _)| (idx, t, s)) + .collect::>() + })); + } + + spawn(async move { + let res = join_all(fut).await; + let res: Result, _> = res.into_iter().collect(); + match res { + Ok(services) => { + for item in services { + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + } + } + } + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::current().stop(); } } - } - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - } + wrk.await + }); } - wrk.await - }); + .boxed(), + ); + + WorkerClient::new(idx, tx1, tx2, avail) } fn shutdown(&mut self, force: bool) { @@ -322,8 +332,8 @@ impl Future for Worker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - Box::pin(delay(time::Instant::now() + time::Duration::from_secs(1))), - Box::pin(delay(time::Instant::now() + self.shutdown_timeout)), + Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))), + Box::pin(delay_until(Instant::now() + self.shutdown_timeout)), Some(result), ); } else { @@ -399,7 +409,6 @@ impl Future for Worker { ); } Poll::Pending => { - // self.state = WorkerState::Restarting(idx, token, fut); return Poll::Pending; } } @@ -428,19 +437,18 @@ impl Future for Worker { match t1.as_mut().poll(cx) { Poll::Pending => (), Poll::Ready(_) => { - *t1 = Box::pin(delay( - time::Instant::now() + time::Duration::from_secs(1), + *t1 = Box::pin(delay_until( + Instant::now() + time::Duration::from_secs(1), )); let _ = t1.as_mut().poll(cx); } } - // self.state = WorkerState::Shutdown(t1, t2, tx); Poll::Pending } WorkerState::Available => { loop { match Pin::new(&mut self.rx).poll_next(cx) { - // handle incoming tcp stream + // handle incoming io stream Poll::Ready(Some(WorkerCommand(msg))) => { match self.check_readiness(cx) { Ok(true) => { diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 26ad9ff9..b84b0396 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -29,6 +29,8 @@ fn test_bind() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); let srv = Server::build() + .workers(1) + .disable_signals() .bind("test", addr, move || service_fn(|_| ok::<_, ()>(()))) .unwrap() .start(); @@ -51,14 +53,16 @@ fn test_listen() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - let srv = Server::build() + Server::build() + .disable_signals() + .workers(1) .listen("test", lst, move || service_fn(|_| ok::<_, ()>(()))) .unwrap() .start(); - let _ = tx.send((srv, actix_rt::System::current())); + let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); - let (_, sys) = rx.recv().unwrap(); + let sys = rx.recv().unwrap(); thread::sleep(time::Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); @@ -72,10 +76,12 @@ fn test_start() { let addr = unused_addr(); let (tx, rx) = mpsc::channel(); - let h = thread::spawn(move || { + let _ = thread::spawn(move || { let sys = actix_rt::System::new("test"); let srv: Server = Server::build() .backlog(100) + .workers(1) + .disable_signals() .bind("test", addr, move || { service_fn(|io: TcpStream| { async move { @@ -126,7 +132,7 @@ fn test_start() { thread::sleep(time::Duration::from_millis(100)); let _ = sys.stop(); - let _ = h.join(); + // let _ = h.join(); } #[test] @@ -142,6 +148,7 @@ fn test_configure() { let num = num2.clone(); let sys = actix_rt::System::new("test"); let srv = Server::build() + .disable_signals() .configure(move |cfg| { let num = num.clone(); let lst = net::TcpListener::bind(addr3).unwrap(); diff --git a/actix-testing/CHANGES.md b/actix-testing/CHANGES.md index 4c14e8be..7156fb55 100644 --- a/actix-testing/CHANGES.md +++ b/actix-testing/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.0-alpha.3] - 2019-12-xx + +* Migrate to `tokio=0.2.2` + ## [1.0.0-alpha.2] - 2019-12-02 * Re-export `test` attribute macros diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index 58e48014..574f42ce 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-testing" -version = "1.0.0-alpha.2" +version = "1.0.0-alpha.3" authors = ["Nikolay Kim "] description = "Actix testing utils" keywords = ["network", "framework", "async", "futures"] @@ -17,13 +17,11 @@ name = "actix_testing" path = "src/lib.rs" [dependencies] -actix-rt = "1.0.0-alpha.2" +actix-rt = "1.0.0-alpha.3" actix-macros = "0.1.0-alpha.1" -actix-server = "1.0.0-alpha.2" +actix-server = "1.0.0-alpha.3" actix-service = "1.0.0-alpha.3" log = "0.4" net2 = "0.2" futures = "0.3.1" - -tokio-net = { version = "=0.2.0-alpha.6" } diff --git a/actix-testing/src/lib.rs b/actix-testing/src/lib.rs index f1e82e67..42452d73 100644 --- a/actix-testing/src/lib.rs +++ b/actix-testing/src/lib.rs @@ -7,9 +7,7 @@ use std::{net, thread}; use actix_rt::{net::TcpStream, System}; use actix_server::{Server, ServerBuilder, ServiceFactory}; - use net2::TcpBuilder; -use tokio_net::driver::Handle; #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub use actix_macros::test; @@ -25,7 +23,8 @@ pub use actix_macros::test; /// use actix_service::{service_fn}; /// use actix_testing::TestServer; /// -/// fn main() { +/// #[actix_rt::main] +/// async fn main() { /// let srv = TestServer::with(|| service_fn( /// |sock| async move { /// println!("New connection: {:?}", sock); @@ -142,7 +141,7 @@ impl TestServerRuntime { /// Connect to server, return tokio TcpStream pub fn connect(&self) -> std::io::Result { - TcpStream::from_std(net::TcpStream::connect(self.addr)?, &Handle::default()) + TcpStream::from_std(net::TcpStream::connect(self.addr)?) } } diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index 11cf6cf7..67fe1a97 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -1,5 +1,13 @@ # Changes +[1.0.0-alpha.3] - 2019-12-xx + +### Changed + +* Migrate to `tokio=0.2.2` + +* Enable rustls acceptor service + ## [1.0.0-alpha.1] - 2019-12-02 * Split openssl accetor from actix-server package diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index d70d16cf..4f3dbf57 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-tls" -version = "1.0.0-alpha.1" +version = "1.0.0-alpha.3" authors = ["Nikolay Kim "] description = "Actix tls services" keywords = ["network", "framework", "async", "futures"] @@ -30,9 +30,9 @@ rustls = ["rust-tls", "webpki"] [dependencies] actix-service = "1.0.0-alpha.3" -actix-codec = "0.2.0-alpha.2" -actix-utils = "1.0.0-alpha.2" -actix-rt = "1.0.0-alpha.2" +actix-codec = "0.2.0-alpha.3" +actix-utils = "1.0.0-alpha.3" +actix-rt = "1.0.0-alpha.3" derive_more = "0.99.2" either = "1.5.2" futures = "0.3.1" @@ -40,15 +40,14 @@ log = "0.4" # openssl open-ssl = { version="0.10", package = "openssl", optional = true } -tokio-openssl = { version = "=0.4.0-alpha.6", optional = true } +tokio-openssl = { version = "0.4.0", optional = true } # rustls rust-tls = { version = "0.16.0", package = "rustls", optional = true } webpki = { version = "0.21", optional = true } webpki-roots = { version = "0.17", optional = true } -# tokio-rustls = { version = "0.12.0-alpha.2", optional = true } -# tokio-rustls = { git = "https://github.com/quininer/tokio-rustls.git", branch = "tokio-0.2", optional = true } +tokio-rustls = { version = "0.12.0", optional = true } [dev-dependencies] -bytes = "0.4" -actix-testing = { version="1.0.0-alpha.2" } +bytes = "0.5" +actix-testing = { version="1.0.0-alpha.3" } diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 91a197f9..2766ef05 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -2,6 +2,8 @@ ## [1.0.0-alpha.3] - 2019-12-xx +* Migrate to `tokio=0.2.2` + * Fix oneshot ## [1.0.0-alpha.2] - 2019-12-02 diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 08e48df7..620219be 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "1.0.0-alpha.2" +version = "1.0.0-alpha.3" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] @@ -19,9 +19,9 @@ path = "src/lib.rs" [dependencies] actix-service = "1.0.0-alpha.3" -actix-rt = "1.0.0-alpha.2" -actix-codec = "0.2.0-alpha.2" -bytes = "0.4" +actix-rt = "1.0.0-alpha.3" +actix-codec = "0.2.0-alpha.3" +bytes = "0.5.2" either = "1.5.2" futures = "0.3.1" pin-project = "0.4.6" diff --git a/actix-utils/src/keepalive.rs b/actix-utils/src/keepalive.rs index ebfcfe7a..9d4f8712 100644 --- a/actix-utils/src/keepalive.rs +++ b/actix-utils/src/keepalive.rs @@ -3,9 +3,9 @@ use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; -use actix_rt::time::{delay, Delay}; +use actix_rt::time::{delay_until, Delay, Instant}; use actix_service::{Service, ServiceFactory}; use futures::future::{ok, Ready}; @@ -81,13 +81,13 @@ where F: Fn() -> E, { pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self { - let expire = time.now() + ka; + let expire = Instant::from_std(time.now() + ka); KeepAliveService { f, ka, time, expire, - delay: delay(expire), + delay: delay_until(expire), _t: PhantomData, } } @@ -105,7 +105,7 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.delay).poll(cx) { Poll::Ready(_) => { - let now = self.time.now(); + let now = Instant::from_std(self.time.now()); if self.expire <= now { Poll::Ready(Err((self.f)())) } else { @@ -119,7 +119,7 @@ where } fn call(&mut self, req: R) -> Self::Future { - self.expire = self.time.now() + self.ka; + self.expire = Instant::from_std(self.time.now() + self.ka); ok(req) } } diff --git a/router/CHANGES.txt b/router/CHANGES.txt index 516048d7..b0c52bf0 100644 --- a/router/CHANGES.txt +++ b/router/CHANGES.txt @@ -1,5 +1,11 @@ # Changes +## [0.2.0] - 2019-12-05 + +* Update http to 0.2 + +* Update regex to 1.3 + ## [0.1.5] - 2019-05-15 * Remove debug prints diff --git a/router/Cargo.toml b/router/Cargo.toml index d9e18314..7ed2dabc 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-router" -version = "0.1.5" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Path router" keywords = ["actix"] @@ -20,13 +20,12 @@ path = "src/lib.rs" default = ["http"] [dependencies] -bytes = "0.4" -regex = "1.0" +regex = "1.3.1" serde = "1.0.80" -string = "0.2.0" -log = "0.4" -http = { version="0.1.14", optional=true } +string = "0.2.1" +log = "0.4.8" +http = { version="0.2.0", optional=true } [dev-dependencies] -http = "0.1.14" +http = "0.2.0" serde_derive = "1.0" diff --git a/router/src/resource.rs b/router/src/resource.rs index 02d803bc..2f6437a4 100644 --- a/router/src/resource.rs +++ b/router/src/resource.rs @@ -491,7 +491,8 @@ pub(crate) fn insert_slash(path: &str) -> String { #[cfg(test)] mod tests { use super::*; - use http::{HttpTryFrom, Uri}; + use http::Uri; + use std::convert::TryFrom; #[test] fn test_parse_static() { diff --git a/router/src/url.rs b/router/src/url.rs index a45e2cd8..1c068199 100644 --- a/router/src/url.rs +++ b/router/src/url.rs @@ -200,7 +200,8 @@ fn restore_ch(d1: u8, d2: u8) -> Option { #[cfg(test)] mod tests { - use http::{HttpTryFrom, Uri}; + use http::Uri; + use std::convert::TryFrom; use super::*; use crate::{Path, ResourceDef};