From 690169db8917c3e47b297edce8af4fd8c48210eb Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 24 May 2018 21:03:16 -0700 Subject: [PATCH] migrate to tokio --- .travis.yml | 6 +-- CHANGES.md | 4 ++ Cargo.toml | 10 +++-- README.md | 2 +- src/client/connector.rs | 15 ++++--- src/client/mod.rs | 4 +- src/client/pipeline.rs | 12 +++--- src/client/request.rs | 2 +- src/error.rs | 4 ++ src/lib.rs | 5 ++- src/multipart.rs | 6 +-- src/payload.rs | 30 +++++++------- src/pipeline.rs | 6 +-- src/server/h1.rs | 10 ++--- src/server/h2.rs | 14 +++---- src/server/mod.rs | 2 +- src/server/srv.rs | 9 ++-- src/server/worker.rs | 89 ++++++++++++++++++++-------------------- src/test.rs | 18 ++++---- tests/test_handlers.rs | 36 +++++++--------- tests/test_middleware.rs | 22 ++++------ tests/test_server.rs | 18 ++++---- 22 files changed, 162 insertions(+), 162 deletions(-) diff --git a/.travis.yml b/.travis.yml index f930f508f..fd9bfc0bf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ cache: matrix: include: - - rust: 1.24.0 + - rust: 1.25.0 - rust: stable - rust: beta - rust: nightly @@ -31,12 +31,12 @@ before_script: script: - | - if [[ "$TRAVIS_RUST_VERSION" != "1.24.0" ]]; then + if [[ "$TRAVIS_RUST_VERSION" != "1.25.0" ]]; then cargo clean cargo test --features="alpn,tls" -- --nocapture fi - | - if [[ "$TRAVIS_RUST_VERSION" == "1.24.0" ]]; then + if [[ "$TRAVIS_RUST_VERSION" == "1.25.0" ]]; then bash <(curl https://raw.githubusercontent.com/xd009642/tarpaulin/master/travis-install.sh) USE_SKEPTIC=1 cargo tarpaulin --out Xml --no-count bash <(curl -s https://codecov.io/bash) diff --git a/CHANGES.md b/CHANGES.md index 4f267d6dc..8fd1689be 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ ## [0.7.0] - 2018-xx-xx +### Changed + +* Migrate to tokio + ## [0.6.10] - 2018-05-24 diff --git a/Cargo.toml b/Cargo.toml index e1a11c2ab..603bd1bff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,8 @@ flate2-c = ["flate2/miniz-sys"] flate2-rust = ["flate2/rust_backend"] [dependencies] -actix = "^0.5.5" +#actix = "^0.6.0" +actix = { git="https://github.com/actix/actix.git" } base64 = "0.9" bitflags = "1.0" @@ -62,7 +63,7 @@ mime = "0.3" mime_guess = "2.0.0-alpha" num_cpus = "1.0" percent-encoding = "1.0" -rand = "0.4" +rand = "0.5" regex = "1.0" serde = "1.0" serde_json = "1.0" @@ -86,8 +87,11 @@ byteorder = "1" futures = "0.1" futures-cpupool = "0.1" slab = "0.4" +tokio = "0.1" tokio-io = "0.1" -tokio-core = "0.1" +tokio-tcp = "0.1" +tokio-timer = "0.2" +tokio-reactor = "0.1" # native-tls native-tls = { version="0.1", optional = true } diff --git a/README.md b/README.md index ae0bc645e..9629da4e4 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Actix web is a simple, pragmatic and extremely fast web framework for Rust. * [API Documentation (Releases)](https://docs.rs/actix-web/) * [Chat on gitter](https://gitter.im/actix/actix) * Cargo package: [actix-web](https://crates.io/crates/actix-web) -* Minimum supported Rust version: 1.24 or later +* Minimum supported Rust version: 1.25 or later ## Example diff --git a/src/client/connector.rs b/src/client/connector.rs index 6389b8972..e9301ae9d 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -9,16 +9,16 @@ use actix::actors::{Connect as ResolveConnect, Connector, ConnectorError}; use actix::fut::WrapFuture; use actix::registry::ArbiterService; use actix::{ - fut, Actor, ActorFuture, ActorResponse, Arbiter, AsyncContext, Context, - ContextFutureSpawner, Handler, Message, Recipient, Supervised, Syn, + fut, Actor, ActorFuture, ActorResponse, AsyncContext, Context, ContextFutureSpawner, + Handler, Message, Recipient, Supervised, Syn, }; use futures::task::{current as current_task, Task}; use futures::unsync::oneshot; use futures::{Async, Future, Poll}; use http::{Error as HttpError, HttpTryFrom, Uri}; -use tokio_core::reactor::Timeout; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; #[cfg(feature = "alpn")] use openssl::ssl::{Error as OpensslError, SslConnector, SslMethod}; @@ -190,8 +190,8 @@ pub struct ClientConnector { available: HashMap>, to_close: Vec, waiters: HashMap>, - wait_timeout: Option<(Instant, Timeout)>, - paused: Option>, + wait_timeout: Option<(Instant, Delay)>, + paused: Option>, } impl Actor for ClientConnector { @@ -563,8 +563,7 @@ impl ClientConnector { } } - let mut timeout = - Timeout::new(time - Instant::now(), Arbiter::handle()).unwrap(); + let mut timeout = Delay::new(time); let _ = timeout.poll(); self.wait_timeout = Some((time, timeout)); } @@ -597,7 +596,7 @@ impl Handler for ClientConnector { fn handle(&mut self, msg: Pause, _: &mut Self::Context) { if let Some(time) = msg.time { let when = Instant::now() + time; - let mut timeout = Timeout::new(time, Arbiter::handle()).unwrap(); + let mut timeout = Delay::new(when); let _ = timeout.poll(); self.paused = Some(Some((when, timeout))); } else if self.paused.is_none() { diff --git a/src/client/mod.rs b/src/client/mod.rs index 9fd885faa..36a876ccb 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -10,7 +10,7 @@ //! fn main() { //! let sys = actix::System::new("test"); //! -//! actix::Arbiter::handle().spawn({ +//! actix::Arbiter::spawn({ //! client::get("http://www.rust-lang.org") // <- Create request builder //! .header("User-Agent", "Actix-web") //! .finish().unwrap() @@ -70,7 +70,7 @@ impl ResponseError for SendRequestError { /// fn main() { /// let sys = actix::System::new("test"); /// -/// actix::Arbiter::handle().spawn({ +/// actix::Arbiter::spawn({ /// client::get("http://www.rust-lang.org") // <- Create request builder /// .header("User-Agent", "Actix-web") /// .finish().unwrap() diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index dae7bbaf8..c75280739 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -2,9 +2,9 @@ use bytes::{Bytes, BytesMut}; use futures::unsync::oneshot; use futures::{Async, Future, Poll}; use http::header::CONTENT_ENCODING; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{io, mem}; -use tokio_core::reactor::Timeout; +use tokio_timer::Delay; use actix::prelude::*; @@ -71,7 +71,7 @@ pub struct SendRequest { conn: Addr, conn_timeout: Duration, wait_timeout: Duration, - timeout: Option, + timeout: Option, } impl SendRequest { @@ -108,7 +108,7 @@ impl SendRequest { /// Request timeout is the total time before a response must be received. /// Default value is 5 seconds. pub fn timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(Timeout::new(timeout, Arbiter::handle()).unwrap()); + self.timeout = Some(Delay::new(Instant::now() + timeout)); self } @@ -174,7 +174,7 @@ impl Future for SendRequest { }; let timeout = self.timeout.take().unwrap_or_else(|| { - Timeout::new(Duration::from_secs(5), Arbiter::handle()).unwrap() + Delay::new(Instant::now() + Duration::from_secs(5)) }); let pl = Box::new(Pipeline { @@ -229,7 +229,7 @@ pub(crate) struct Pipeline { decompress: Option, should_decompress: bool, write_state: RunningState, - timeout: Option, + timeout: Option, } enum IoBody { diff --git a/src/client/request.rs b/src/client/request.rs index 2f9ce12f9..9a3d0fb1d 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -34,7 +34,7 @@ use httprequest::HttpRequest; /// fn main() { /// let sys = actix::System::new("test"); /// -/// actix::Arbiter::handle().spawn({ +/// actix::Arbiter::spawn({ /// ClientRequest::get("http://www.rust-lang.org") // <- Create request builder /// .header("User-Agent", "Actix-web") /// .finish().unwrap() diff --git a/src/error.rs b/src/error.rs index 1ec394e33..6d739702d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,6 +16,7 @@ use http_range::HttpRangeParseError; use httparse; use serde::de::value::Error as DeError; use serde_json::error::Error as JsonError; +use tokio_timer::Error as TimerError; pub use url::ParseError as UrlParseError; // re-exports @@ -126,6 +127,9 @@ impl From for Error { /// `InternalServerError` for `JsonError` impl ResponseError for JsonError {} +/// `InternalServerError` for `TimerError` +impl ResponseError for TimerError {} + /// `InternalServerError` for `UrlParseError` impl ResponseError for UrlParseError {} diff --git a/src/lib.rs b/src/lib.rs index 9912d4ada..4dd1b215c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -112,8 +112,11 @@ extern crate mio; extern crate net2; extern crate rand; extern crate slab; -extern crate tokio_core; +extern crate tokio; extern crate tokio_io; +extern crate tokio_reactor; +extern crate tokio_tcp; +extern crate tokio_timer; extern crate url; #[macro_use] extern crate serde; diff --git a/src/multipart.rs b/src/multipart.rs index 365a101c1..106961aec 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -663,7 +663,7 @@ mod tests { use bytes::Bytes; use futures::future::{lazy, result}; use payload::{Payload, PayloadWriter}; - use tokio_core::reactor::Core; + use tokio::runtime::current_thread::Runtime; #[test] fn test_boundary() { @@ -710,9 +710,9 @@ mod tests { #[test] fn test_multipart() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); let bytes = Bytes::from( diff --git a/src/payload.rs b/src/payload.rs index dd0b197b4..12a4ae268 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -524,7 +524,7 @@ mod tests { use failure::Fail; use futures::future::{lazy, result}; use std::io; - use tokio_core::reactor::Core; + use tokio::runtime::current_thread::Runtime; #[test] fn test_error() { @@ -542,9 +542,9 @@ mod tests { #[test] fn test_basic() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (_, payload) = Payload::new(false); let mut payload = PayloadHelper::new(payload); @@ -559,9 +559,9 @@ mod tests { #[test] fn test_eof() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadHelper::new(payload); @@ -584,9 +584,9 @@ mod tests { #[test] fn test_err() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadHelper::new(payload); @@ -602,9 +602,9 @@ mod tests { #[test] fn test_readany() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadHelper::new(payload); @@ -631,9 +631,9 @@ mod tests { #[test] fn test_readexactly() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadHelper::new(payload); @@ -665,9 +665,9 @@ mod tests { #[test] fn test_readuntil() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadHelper::new(payload); @@ -699,9 +699,9 @@ mod tests { #[test] fn test_unread_data() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let (_, mut payload) = Payload::new(false); payload.unread_data(Bytes::from("data")); diff --git a/src/pipeline.rs b/src/pipeline.rs index f5c338e6b..e315d4c09 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -770,7 +770,7 @@ mod tests { use actix::*; use context::HttpContext; use futures::future::{lazy, result}; - use tokio_core::reactor::Core; + use tokio::runtime::current_thread::Runtime; impl PipelineState { fn is_none(&self) -> Option { @@ -796,9 +796,9 @@ mod tests { #[test] fn test_completed() { - Core::new() + Runtime::new() .unwrap() - .run(lazy(|| { + .block_on(lazy(|| { let mut info = PipelineInfo::new(HttpRequest::default()); Completed::<(), Inner<()>>::init(&mut info) .is_none() diff --git a/src/server/h1.rs b/src/server/h1.rs index 7b4d8a973..e5fa2fe92 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -2,12 +2,11 @@ use std::collections::VecDeque; use std::io; use std::net::SocketAddr; use std::rc::Rc; -use std::time::Duration; +use std::time::{Duration, Instant}; -use actix::Arbiter; use bytes::{BufMut, BytesMut}; use futures::{Async, Future, Poll}; -use tokio_core::reactor::Timeout; +use tokio_timer::Delay; use error::PayloadError; use httprequest::HttpRequest; @@ -53,7 +52,7 @@ pub(crate) struct Http1 { payload: Option, buf: BytesMut, tasks: VecDeque, - keepalive_timer: Option, + keepalive_timer: Option, } struct Entry { @@ -295,8 +294,7 @@ where if self.keepalive_timer.is_none() && keep_alive > 0 { trace!("Start keep-alive timer"); let mut timer = - Timeout::new(Duration::new(keep_alive, 0), Arbiter::handle()) - .unwrap(); + Delay::new(Instant::now() + Duration::new(keep_alive, 0)); // register timer let _ = timer.poll(); self.keepalive_timer = Some(timer); diff --git a/src/server/h2.rs b/src/server/h2.rs index c730ac409..a73cc599a 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -4,17 +4,16 @@ use std::collections::VecDeque; use std::io::{Read, Write}; use std::net::SocketAddr; use std::rc::Rc; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{cmp, io, mem}; -use actix::Arbiter; use bytes::{Buf, Bytes}; use futures::{Async, Future, Poll, Stream}; use http2::server::{self, Connection, Handshake, SendResponse}; use http2::{Reason, RecvStream}; use modhttp::request::Parts; -use tokio_core::reactor::Timeout; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; use error::PayloadError; use httpmessage::HttpMessage; @@ -46,7 +45,7 @@ where addr: Option, state: State>, tasks: VecDeque>, - keepalive_timer: Option, + keepalive_timer: Option, } enum State { @@ -218,9 +217,10 @@ where let keep_alive = self.settings.keep_alive(); if keep_alive > 0 && self.keepalive_timer.is_none() { trace!("Start keep-alive timer"); - let mut timeout = Timeout::new( - Duration::new(keep_alive, 0), - Arbiter::handle()).unwrap(); + let mut timeout = Delay::new( + Instant::now() + + Duration::new(keep_alive, 0), + ); // register timeout let _ = timeout.poll(); self.keepalive_timer = Some(timeout); diff --git a/src/server/mod.rs b/src/server/mod.rs index 7347b7254..36f7cfb15 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,8 +5,8 @@ use std::{io, time}; use actix; use bytes::BytesMut; use futures::{Async, Poll}; -use tokio_core::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_tcp::TcpStream; mod channel; pub(crate) mod encoding; diff --git a/src/server/srv.rs b/src/server/srv.rs index 18663103b..94e132e15 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -563,11 +563,10 @@ impl HttpServer { /// Start listening for incoming connections from a stream. /// /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr + pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr where - S: Stream + 'static, + S: Stream + 'static, T: AsyncRead + AsyncWrite + 'static, - A: 'static, { // set server settings let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); @@ -581,7 +580,7 @@ impl HttpServer { // start server let signals = self.subscribe_to_signals(); let addr: Addr = HttpServer::create(move |ctx| { - ctx.add_message_stream(stream.map_err(|_| ()).map(move |(t, _)| Conn { + ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { io: WrapperStream::new(t), token: 0, peer: None, @@ -687,7 +686,7 @@ where type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { - Arbiter::handle().spawn(HttpChannel::new( + Arbiter::spawn(HttpChannel::new( Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, diff --git a/src/server/worker.rs b/src/server/worker.rs index f045074de..64e4c403e 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -4,8 +4,8 @@ use net2::TcpStreamExt; use slab::Slab; use std::rc::Rc; use std::{net, time}; -use tokio_core::net::TcpStream; -use tokio_core::reactor::Handle; +use tokio_reactor::Handle; +use tokio_tcp::TcpStream; #[cfg(any(feature = "tls", feature = "alpn"))] use futures::future; @@ -60,7 +60,6 @@ where H: HttpHandler + 'static, { settings: Rc>, - hnd: Handle, socks: Slab, tcp_ka: Option, } @@ -77,7 +76,6 @@ impl Worker { Worker { settings: Rc::new(WorkerSettings::new(h, keep_alive)), - hnd: Arbiter::handle().clone(), socks, tcp_ka, } @@ -130,11 +128,11 @@ where if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() { error!("Can not set socket keep-alive option"); } - self.socks.get_mut(msg.token).unwrap().htype.handle( - Rc::clone(&self.settings), - &self.hnd, - msg, - ); + self.socks + .get_mut(msg.token) + .unwrap() + .htype + .handle(Rc::clone(&self.settings), msg); } } @@ -174,15 +172,15 @@ pub(crate) enum StreamHandlerType { impl StreamHandlerType { fn handle( - &mut self, h: Rc>, hnd: &Handle, msg: Conn, + &mut self, h: Rc>, msg: Conn, ) { match *self { StreamHandlerType::Normal => { let _ = msg.io.set_nodelay(true); - let io = TcpStream::from_stream(msg.io, hnd) + let io = TcpStream::from_std(msg.io, &Handle::default()) .expect("failed to associate TCP stream"); - hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); + Arbiter::spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); } #[cfg(feature = "tls")] StreamHandlerType::Tls(ref acceptor) => { @@ -190,47 +188,50 @@ impl StreamHandlerType { io, peer, http2, .. } = msg; let _ = io.set_nodelay(true); - let io = TcpStream::from_stream(io, hnd) + let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); - hnd.spawn(TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { - match res { - Ok(io) => { - Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2)) - } - Err(err) => { - trace!("Error during handling tls connection: {}", err) - } - }; - future::result(Ok(())) - })); + Arbiter::spawn(TlsAcceptorExt::accept_async(acceptor, io).then( + move |res| { + match res { + Ok(io) => { + Arbiter::spawn(HttpChannel::new(h, io, peer, http2)) + } + Err(err) => { + trace!("Error during handling tls connection: {}", err) + } + }; + future::result(Ok(())) + }, + )); } #[cfg(feature = "alpn")] StreamHandlerType::Alpn(ref acceptor) => { let Conn { io, peer, .. } = msg; let _ = io.set_nodelay(true); - let io = TcpStream::from_stream(io, hnd) + let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); - hnd.spawn(SslAcceptorExt::accept_async(acceptor, io).then(move |res| { - match res { - Ok(io) => { - let http2 = if let Some(p) = - io.get_ref().ssl().selected_alpn_protocol() - { - p.len() == 2 && &p == b"h2" - } else { - false - }; - Arbiter::handle() - .spawn(HttpChannel::new(h, io, peer, http2)); - } - Err(err) => { - trace!("Error during handling tls connection: {}", err) - } - }; - future::result(Ok(())) - })); + Arbiter::spawn(SslAcceptorExt::accept_async(acceptor, io).then( + move |res| { + match res { + Ok(io) => { + let http2 = if let Some(p) = + io.get_ref().ssl().selected_alpn_protocol() + { + p.len() == 2 && &p == b"h2" + } else { + false + }; + Arbiter::spawn(HttpChannel::new(h, io, peer, http2)); + } + Err(err) => { + trace!("Error during handling tls connection: {}", err) + } + }; + future::result(Ok(())) + }, + )); } } } diff --git a/src/test.rs b/src/test.rs index 5e99652b2..f950c17b5 100644 --- a/src/test.rs +++ b/src/test.rs @@ -11,8 +11,9 @@ use futures::Future; use http::header::HeaderName; use http::{HeaderMap, HttpTryFrom, Method, Uri, Version}; use net2::TcpBuilder; -use tokio_core::net::TcpListener; -use tokio_core::reactor::Core; +use tokio::runtime::current_thread::Runtime; +use tokio_reactor::Handle; +use tokio_tcp::TcpListener; #[cfg(feature = "alpn")] use openssl::ssl::SslAcceptor; @@ -112,8 +113,7 @@ impl TestServer { let sys = System::new("actix-test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - let tcp = - TcpListener::from_listener(tcp, &local_addr, Arbiter::handle()).unwrap(); + let tcp = TcpListener::from_std(tcp, &Handle::default()).unwrap(); HttpServer::new(factory) .disable_signals() @@ -289,8 +289,7 @@ impl TestServerBuilder { let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - let tcp = - TcpListener::from_listener(tcp, &local_addr, Arbiter::handle()).unwrap(); + let tcp = TcpListener::from_std(tcp, &Handle::default()).unwrap(); let state = self.state; @@ -309,10 +308,9 @@ impl TestServerBuilder { let ssl = self.ssl.take(); if let Some(ssl) = ssl { srv.start_incoming( - tcp.incoming().and_then(move |(sock, addr)| { + tcp.incoming().and_then(move |sock| { ssl.accept_async(sock) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - .map(move |s| (s, addr)) }), false, ); @@ -616,8 +614,8 @@ impl TestRequest { let req = self.finish(); let fut = h(req.clone()); - let mut core = Core::new().unwrap(); - match core.run(fut) { + let mut core = Runtime::new().unwrap(); + match core.block_on(fut) { Ok(r) => match r.respond_to(&req) { Ok(reply) => match reply.into().into() { AsyncResultItem::Ok(resp) => Ok(resp), diff --git a/tests/test_handlers.rs b/tests/test_handlers.rs index 9507f1e9a..8544b9bf2 100644 --- a/tests/test_handlers.rs +++ b/tests/test_handlers.rs @@ -4,21 +4,20 @@ extern crate bytes; extern crate futures; extern crate h2; extern crate http; -extern crate tokio_core; +extern crate tokio_timer; #[macro_use] extern crate serde_derive; extern crate serde_json; use std::io; -use std::time::Duration; +use std::time::{Duration, Instant}; -use actix::*; use actix_web::*; use bytes::Bytes; use futures::Future; use http::StatusCode; use serde_json::Value; -use tokio_core::reactor::Timeout; +use tokio_timer::Delay; #[derive(Deserialize)] struct PParam { @@ -75,8 +74,7 @@ fn test_async_extractor_async() { let mut srv = test::TestServer::new(|app| { app.resource("/{username}/index.html", |r| { r.route().with(|data: Json| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(move |_| Ok(format!("{}", data.0))) .responder() }) @@ -171,8 +169,7 @@ fn test_path_and_query_extractor2_async() { app.resource("/{username}/index.html", |r| { r.route() .with3(|p: Path, _: Query, data: Json| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(move |_| { Ok(format!("Welcome {} - {}!", p.username, data.0)) }) @@ -201,8 +198,7 @@ fn test_path_and_query_extractor3_async() { let mut srv = test::TestServer::new(|app| { app.resource("/{username}/index.html", |r| { r.route().with2(|p: Path, data: Json| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(move |_| { Ok(format!("Welcome {} - {}!", p.username, data.0)) }) @@ -227,8 +223,7 @@ fn test_path_and_query_extractor4_async() { let mut srv = test::TestServer::new(|app| { app.resource("/{username}/index.html", |r| { r.route().with2(|data: Json, p: Path| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(move |_| { Ok(format!("Welcome {} - {}!", p.username, data.0)) }) @@ -254,8 +249,7 @@ fn test_path_and_query_extractor2_async2() { app.resource("/{username}/index.html", |r| { r.route() .with3(|p: Path, data: Json, _: Query| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(move |_| { Ok(format!("Welcome {} - {}!", p.username, data.0)) }) @@ -294,8 +288,7 @@ fn test_path_and_query_extractor2_async3() { app.resource("/{username}/index.html", |r| { r.route() .with3(|data: Json, p: Path, _: Query| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(move |_| { Ok(format!("Welcome {} - {}!", p.username, data.0)) }) @@ -334,8 +327,7 @@ fn test_path_and_query_extractor2_async4() { app.resource("/{username}/index.html", |r| { r.route() .with(|data: (Json, Path, Query)| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(move |_| { Ok(format!("Welcome {} - {}!", data.1.username, (data.0).0)) }) @@ -443,8 +435,8 @@ fn test_nested_scope_and_path_extractor() { fn test_impl_trait( data: (Json, Path, Query), ) -> impl Future { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "timeout")) .and_then(move |_| Ok(format!("Welcome {} - {}!", data.1.username, (data.0).0))) } @@ -452,8 +444,8 @@ fn test_impl_trait( fn test_impl_trait_err( _data: (Json, Path, Query), ) -> impl Future { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "timeout")) .and_then(move |_| Err(io::Error::new(io::ErrorKind::Other, "other"))) } diff --git a/tests/test_middleware.rs b/tests/test_middleware.rs index 8435e7464..4996542e2 100644 --- a/tests/test_middleware.rs +++ b/tests/test_middleware.rs @@ -1,17 +1,16 @@ extern crate actix; extern crate actix_web; extern crate futures; -extern crate tokio_core; +extern crate tokio_timer; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; -use actix::*; use actix_web::*; use futures::{future, Future}; -use tokio_core::reactor::Timeout; +use tokio_timer::Delay; struct MiddlewareTest { start: Arc, @@ -245,8 +244,7 @@ fn test_middleware_async_handler() { }) .resource("/", |r| { r.route().a(|_| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(|_| Ok(HttpResponse::Ok())) }) }) @@ -281,8 +279,7 @@ fn test_resource_middleware_async_handler() { App::new().resource("/test", |r| { r.middleware(mw); r.route().a(|_| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(|_| Ok(HttpResponse::Ok())) }) }) @@ -317,8 +314,7 @@ fn test_scope_middleware_async_handler() { }) .resource("/test", |r| { r.route().a(|_| { - Timeout::new(Duration::from_millis(10), &Arbiter::handle()) - .unwrap() + Delay::new(Instant::now() + Duration::from_millis(10)) .and_then(|_| Ok(HttpResponse::Ok())) }) }) @@ -436,7 +432,7 @@ struct MiddlewareAsyncTest { impl middleware::Middleware for MiddlewareAsyncTest { fn start(&self, _: &mut HttpRequest) -> Result { - let to = Timeout::new(Duration::from_millis(10), &Arbiter::handle()).unwrap(); + let to = Delay::new(Instant::now() + Duration::from_millis(10)); let start = Arc::clone(&self.start); Ok(middleware::Started::Future(Box::new( @@ -450,7 +446,7 @@ impl middleware::Middleware for MiddlewareAsyncTest { fn response( &self, _: &mut HttpRequest, resp: HttpResponse, ) -> Result { - let to = Timeout::new(Duration::from_millis(10), &Arbiter::handle()).unwrap(); + let to = Delay::new(Instant::now() + Duration::from_millis(10)); let response = Arc::clone(&self.response); Ok(middleware::Response::Future(Box::new( @@ -462,7 +458,7 @@ impl middleware::Middleware for MiddlewareAsyncTest { } fn finish(&self, _: &mut HttpRequest, _: &HttpResponse) -> middleware::Finished { - let to = Timeout::new(Duration::from_millis(10), &Arbiter::handle()).unwrap(); + let to = Delay::new(Instant::now() + Duration::from_millis(10)); let finish = Arc::clone(&self.finish); middleware::Finished::Future(Box::new(to.from_err().and_then(move |_| { diff --git a/tests/test_server.rs b/tests/test_server.rs index 1fcbfd5e4..c02642a19 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -6,7 +6,9 @@ extern crate futures; extern crate h2; extern crate http as modhttp; extern crate rand; -extern crate tokio_core; +extern crate tokio; +extern crate tokio_reactor; +extern crate tokio_tcp; #[cfg(feature = "brotli")] extern crate brotli2; @@ -25,8 +27,9 @@ use rand::Rng; use std::io::{Read, Write}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; -use tokio_core::net::TcpStream; -use tokio_core::reactor::Core; +use tokio::executor::current_thread; +use tokio::runtime::current_thread::Runtime; +use tokio_tcp::TcpStream; use actix::System; use actix_web::*; @@ -790,9 +793,8 @@ fn test_h2() { let srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR))); let addr = srv.addr(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); - let tcp = TcpStream::connect(&addr, &handle); + let mut core = Runtime::new().unwrap(); + let tcp = TcpStream::connect(&addr); let tcp = tcp .then(|res| h2client::handshake(res.unwrap())) @@ -806,7 +808,7 @@ fn test_h2() { let (response, _) = client.send_request(request, false).unwrap(); // Spawn a task to run the conn... - handle.spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); + current_thread::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); response.and_then(|response| { assert_eq!(response.status(), http::StatusCode::OK); @@ -819,7 +821,7 @@ fn test_h2() { }) }) }); - let _res = core.run(tcp); + let _res = core.block_on(tcp); // assert_eq!(_res.unwrap(), Bytes::from_static(STR.as_ref())); }