diff --git a/src/client/mod.rs b/src/client/mod.rs index 36a876ccb..96033a211 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,16 +1,15 @@ //! Http client api //! -//! ```rust +//! ```rust,ignore //! # extern crate actix; //! # extern crate actix_web; //! # extern crate futures; +//! # extern crate tokio; //! # use futures::Future; //! use actix_web::client; //! //! fn main() { -//! let sys = actix::System::new("test"); -//! -//! actix::Arbiter::spawn({ +//! tokio::run({ //! client::get("http://www.rust-lang.org") // <- Create request builder //! .header("User-Agent", "Actix-web") //! .finish().unwrap() @@ -18,12 +17,9 @@ //! .map_err(|_| ()) //! .and_then(|response| { // <- server http response //! println!("Response: {:?}", response); -//! # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); //! Ok(()) //! }) //! }); -//! -//! sys.run(); //! } //! ``` mod connector; @@ -60,30 +56,24 @@ impl ResponseError for SendRequestError { /// Create request builder for `GET` requests /// -/// ```rust +/// ```rust,ignore /// # extern crate actix; /// # extern crate actix_web; /// # extern crate futures; -/// # use futures::Future; +/// # use futures::{future, Future}; /// use actix_web::client; /// /// fn main() { -/// let sys = actix::System::new("test"); -/// -/// actix::Arbiter::spawn({ +/// tokio::run( /// client::get("http://www.rust-lang.org") // <- Create request builder /// .header("User-Agent", "Actix-web") /// .finish().unwrap() /// .send() // <- Send http request /// .map_err(|_| ()) -/// .and_then(|response| { // <- server http response +/// .and_then(|response| { // <- server http response /// println!("Response: {:?}", response); -/// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); /// Ok(()) -/// }) -/// }); -/// -/// sys.run(); +/// })); /// } /// ``` pub fn get>(uri: U) -> ClientRequestBuilder { diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 3f3d425d9..5543aa4c3 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -380,7 +380,10 @@ impl Pipeline { match self.timeout.as_mut().unwrap().poll() { Ok(Async::Ready(())) => return Err(SendRequestError::Timeout), Ok(Async::NotReady) => (), - Err(_) => unreachable!(), + Err(e) => { + println!("err: {:?}", e); + return Err(SendRequestError::Timeout); + } } } Ok(()) diff --git a/src/client/request.rs b/src/client/request.rs index adb1b29fe..d4baa2546 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -24,7 +24,7 @@ use httprequest::HttpRequest; /// An HTTP Client Request /// -/// ```rust +/// ```rust,ignore /// # extern crate actix; /// # extern crate actix_web; /// # extern crate futures; @@ -32,22 +32,17 @@ use httprequest::HttpRequest; /// use actix_web::client::ClientRequest; /// /// fn main() { -/// let sys = actix::System::new("test"); -/// -/// actix::Arbiter::spawn({ +/// tokio::run( /// ClientRequest::get("http://www.rust-lang.org") // <- Create request builder /// .header("User-Agent", "Actix-web") /// .finish().unwrap() /// .send() // <- Send http request /// .map_err(|_| ()) -/// .and_then(|response| { // <- server http response +/// .and_then(|response| { // <- server http response /// println!("Response: {:?}", response); -/// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); /// Ok(()) /// }) -/// }); -/// -/// sys.run(); +/// ); /// } /// ``` pub struct ClientRequest { diff --git a/src/context.rs b/src/context.rs index e298287bc..76594e7bf 100644 --- a/src/context.rs +++ b/src/context.rs @@ -90,7 +90,7 @@ where self.inner.cancel_future(handle) } #[inline] - fn address(&mut self) -> Addr { + fn address(&self) -> Addr { self.inner.address() } } diff --git a/src/middleware/session.rs b/src/middleware/session.rs index 3e6e98906..57f42a117 100644 --- a/src/middleware/session.rs +++ b/src/middleware/session.rs @@ -50,17 +50,17 @@ //! } //! //! fn main() { -//! let sys = actix::System::new("basic-example"); -//! server::new( -//! || App::new().middleware( -//! SessionStorage::new( // <- create session middleware -//! CookieSessionBackend::signed(&[0; 32]) // <- create signed cookie session backend -//! .secure(false) -//! ))) -//! .bind("127.0.0.1:59880").unwrap() -//! .start(); -//! # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); -//! let _ = sys.run(); +//! actix::System::run(|| { +//! server::new( +//! || App::new().middleware( +//! SessionStorage::new( // <- create session middleware +//! CookieSessionBackend::signed(&[0; 32]) // <- create signed cookie session backend +//! .secure(false) +//! ))) +//! .bind("127.0.0.1:59880").unwrap() +//! .start(); +//! # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); +//! }); //! } //! ``` use std::cell::RefCell; diff --git a/src/pipeline.rs b/src/pipeline.rs index e5152de50..be85dc330 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -805,7 +805,7 @@ mod tests { .unwrap(); let req = HttpRequest::default(); - let mut ctx = HttpContext::new(req.clone(), MyActor); + let ctx = HttpContext::new(req.clone(), MyActor); let addr = ctx.address(); let mut info = PipelineInfo::new(req); info.context = Some(Box::new(ctx)); diff --git a/src/server/mod.rs b/src/server/mod.rs index 36f7cfb15..268764830 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -48,16 +48,16 @@ pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536; /// use actix_web::{server, App, HttpResponse}; /// /// fn main() { -/// let sys = actix::System::new("guide"); +/// actix::System::run(|| { /// -/// server::new( -/// || App::new() -/// .resource("/", |r| r.f(|_| HttpResponse::Ok()))) -/// .bind("127.0.0.1:59090").unwrap() -/// .start(); +/// server::new( +/// || App::new() +/// .resource("/", |r| r.f(|_| HttpResponse::Ok()))) +/// .bind("127.0.0.1:59090").unwrap() +/// .start(); /// /// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); -/// let _ = sys.run(); +/// }); /// } /// ``` pub fn new(factory: F) -> HttpServer diff --git a/src/server/srv.rs b/src/server/srv.rs index 42b9d77d0..df6a4b9d4 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -410,16 +410,16 @@ impl HttpServer { /// use actix_web::{server, App, HttpResponse}; /// /// fn main() { - /// let sys = actix::System::new("example"); // <- create Actix system + /// // Run actix system, this method actually starts all async processes + /// actix::System::run(|| { /// - /// server::new( - /// || App::new() - /// .resource("/", |r| r.h(|_| HttpResponse::Ok()))) - /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") - /// .start(); + /// server::new( + /// || App::new() + /// .resource("/", |r| r.h(|_| HttpResponse::Ok()))) + /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") + /// .start(); /// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); - /// - /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes + /// }); /// } /// ``` pub fn start(mut self) -> Addr { @@ -496,9 +496,11 @@ impl HttpServer { self.no_signals = false; let _ = thread::spawn(move || { - let sys = System::new("http-server"); - self.start(); - let _ = sys.run(); + System::new("http-server") + .config(|| { + self.start(); + }) + .run(); }).join(); } } @@ -565,7 +567,7 @@ impl HttpServer { /// This method uses only one thread for handling incoming connections. pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr where - S: Stream + 'static, + S: Stream + Send + 'static, T: AsyncRead + AsyncWrite + 'static, { // set server settings @@ -588,6 +590,7 @@ impl HttpServer { })); self }); + if let Some(signals) = signals { signals.do_send(signal::Subscribe(addr.clone().recipient())) } @@ -686,12 +689,13 @@ where type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { - Arbiter::spawn(HttpChannel::new( + unimplemented!(); + /*Arbiter::spawn(HttpChannel::new( Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2, - )); + ));*/ } } diff --git a/src/server/worker.rs b/src/server/worker.rs index 64e4c403e..1ca42ccc3 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -1,9 +1,10 @@ -use futures::unsync::oneshot; +use futures::sync::oneshot; use futures::Future; use net2::TcpStreamExt; use slab::Slab; use std::rc::Rc; use std::{net, time}; +use tokio::executor::current_thread; use tokio_reactor::Handle; use tokio_tcp::TcpStream; @@ -180,7 +181,7 @@ impl StreamHandlerType { let io = TcpStream::from_std(msg.io, &Handle::default()) .expect("failed to associate TCP stream"); - Arbiter::spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); + current_thread::spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); } #[cfg(feature = "tls")] StreamHandlerType::Tls(ref acceptor) => { @@ -194,9 +195,9 @@ impl StreamHandlerType { Arbiter::spawn(TlsAcceptorExt::accept_async(acceptor, io).then( move |res| { match res { - Ok(io) => { - Arbiter::spawn(HttpChannel::new(h, io, peer, http2)) - } + Ok(io) => current_thread::spawn(HttpChannel::new( + h, io, peer, http2, + )), Err(err) => { trace!("Error during handling tls connection: {}", err) } @@ -223,7 +224,9 @@ impl StreamHandlerType { } else { false }; - Arbiter::spawn(HttpChannel::new(h, io, peer, http2)); + current_thread::spawn(HttpChannel::new( + h, io, peer, http2, + )); } Err(err) => { trace!("Error during handling tls connection: {}", err) diff --git a/src/test.rs b/src/test.rs index bd2135cc4..89fa632e1 100644 --- a/src/test.rs +++ b/src/test.rs @@ -5,15 +5,13 @@ use std::str::FromStr; use std::sync::mpsc; use std::{net, thread}; -use actix::{msgs, Actor, Addr, Arbiter, System, SystemRunner}; +use actix::{msgs, Actor, Addr, Arbiter, System}; use cookie::Cookie; use futures::Future; use http::header::HeaderName; use http::{HeaderMap, HttpTryFrom, Method, Uri, Version}; use net2::TcpBuilder; use tokio::runtime::current_thread::Runtime; -use tokio_reactor::Handle; -use tokio_tcp::TcpListener; #[cfg(feature = "alpn")] use openssl::ssl::SslAcceptor; @@ -63,10 +61,10 @@ use ws; pub struct TestServer { addr: net::SocketAddr, thread: Option>, - system: SystemRunner, server_sys: Addr, ssl: bool, conn: Addr, + rt: Runtime, } impl TestServer { @@ -113,25 +111,31 @@ impl TestServer { let sys = System::new("actix-test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - let tcp = TcpListener::from_std(tcp, &Handle::default()).unwrap(); - HttpServer::new(factory) - .disable_signals() - .start_incoming(tcp.incoming(), false); + sys.config(move || { + HttpServer::new(factory) + .disable_signals() + .listen(tcp) + .start(); - tx.send((Arbiter::system(), local_addr)).unwrap(); - let _ = sys.run(); + tx.send(( + Arbiter::system(), + local_addr, + TestServer::get_conn(), + Arbiter::registry().clone(), + )).unwrap(); + }).run(); }); - let sys = System::new("actix-test"); - let (server_sys, addr) = rx.recv().unwrap(); + let (server_sys, addr, conn, reg) = rx.recv().unwrap(); + Arbiter::set_system_reg(reg); TestServer { addr, server_sys, + conn, ssl: false, - conn: TestServer::get_conn(), thread: Some(join), - system: sys, + rt: Runtime::new().unwrap(), } } @@ -197,7 +201,7 @@ impl TestServer { where F: Future, { - self.system.run_until_complete(fut) + self.rt.block_on(fut) } /// Connect to websocket server @@ -205,9 +209,8 @@ impl TestServer { &mut self, ) -> Result<(ws::ClientReader, ws::ClientWriter), ws::ClientError> { let url = self.url("/"); - self.system.run_until_complete( - ws::Client::with_connector(url, self.conn.clone()).connect(), - ) + self.rt + .block_on(ws::Client::with_connector(url, self.conn.clone()).connect()) } /// Create `GET` request @@ -285,57 +288,64 @@ impl TestServerBuilder { // run server in separate thread let join = thread::spawn(move || { - let sys = System::new("actix-test-server"); - let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - let tcp = TcpListener::from_std(tcp, &Handle::default()).unwrap(); let state = self.state; - let srv = HttpServer::new(move || { - let mut app = TestApp::new(state()); - config(&mut app); - vec![app] - }).disable_signals(); + System::new("actix-test-server") + .config(move || { + let srv = HttpServer::new(move || { + let mut app = TestApp::new(state()); + config(&mut app); + vec![app] + }).workers(1) + .disable_signals(); - #[cfg(feature = "alpn")] - { - use futures::Stream; - use std::io; - use tokio_openssl::SslAcceptorExt; + tx.send(( + Arbiter::system(), + local_addr, + TestServer::get_conn(), + Arbiter::registry().clone(), + )).unwrap(); - let ssl = self.ssl.take(); - if let Some(ssl) = ssl { - srv.start_incoming( - tcp.incoming().and_then(move |sock| { - ssl.accept_async(sock) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - }), - false, - ); - } else { - srv.start_incoming(tcp.incoming(), false); - } - } - #[cfg(not(feature = "alpn"))] - { - srv.start_incoming(tcp.incoming(), false); - } + #[cfg(feature = "alpn")] + { + use futures::Stream; + use std::io; + use tokio_openssl::SslAcceptorExt; - tx.send((Arbiter::system(), local_addr)).unwrap(); - let _ = sys.run(); + let ssl = self.ssl.take(); + if let Some(ssl) = ssl { + srv.start_incoming( + tcp.incoming().and_then(move |sock| { + ssl.accept_async(sock).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }) + }), + false, + ); + } else { + srv.start_incoming(tcp.incoming(), false); + } + } + #[cfg(not(feature = "alpn"))] + { + srv.listen(tcp).start(); + } + }) + .run(); }); - let system = System::new("actix-test"); - let (server_sys, addr) = rx.recv().unwrap(); + let (server_sys, addr, conn, reg) = rx.recv().unwrap(); + Arbiter::set_system_reg(reg); TestServer { addr, - server_sys, ssl, - system, - conn: TestServer::get_conn(), + conn, + server_sys, thread: Some(join), + rt: Runtime::new().unwrap(), } } } diff --git a/src/ws/context.rs b/src/ws/context.rs index 03af169d6..22323f49f 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -75,7 +75,7 @@ where } #[inline] - fn address(&mut self) -> Addr { + fn address(&self) -> Addr { self.inner.address() } } diff --git a/tests/test_server.rs b/tests/test_server.rs index c02642a19..120eef06c 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1,5 +1,7 @@ extern crate actix; extern crate actix_web; +#[cfg(feature = "brotli")] +extern crate brotli2; extern crate bytes; extern crate flate2; extern crate futures; @@ -10,8 +12,9 @@ extern crate tokio; extern crate tokio_reactor; extern crate tokio_tcp; -#[cfg(feature = "brotli")] -extern crate brotli2; +use std::io::{Read, Write}; +use std::sync::{mpsc, Arc}; +use std::{net, thread, time}; #[cfg(feature = "brotli")] use brotli2::write::{BrotliDecoder, BrotliEncoder}; @@ -23,10 +26,8 @@ use futures::stream::once; use futures::{Future, Stream}; use h2::client as h2client; use modhttp::Request; +use rand::distributions::Alphanumeric; use rand::Rng; -use std::io::{Read, Write}; -use std::sync::{mpsc, Arc}; -use std::{net, thread, time}; use tokio::executor::current_thread; use tokio::runtime::current_thread::Runtime; use tokio_tcp::TcpStream; @@ -62,28 +63,29 @@ fn test_start() { let _ = test::TestServer::unused_addr(); let (tx, rx) = mpsc::channel(); - thread::spawn(move || { - let sys = System::new("test"); - let srv = server::new(|| { - vec![App::new().resource("/", |r| { - r.method(http::Method::GET).f(|_| HttpResponse::Ok()) - })] - }); + thread::spawn(|| { + System::run(move || { + let srv = server::new(|| { + vec![App::new().resource("/", |r| { + r.method(http::Method::GET).f(|_| HttpResponse::Ok()) + })] + }); - let srv = srv.bind("127.0.0.1:0").unwrap(); - let addr = srv.addrs()[0]; - let srv_addr = srv.start(); - let _ = tx.send((addr, srv_addr)); - sys.run(); + let srv = srv.bind("127.0.0.1:0").unwrap(); + let addr = srv.addrs()[0]; + let srv_addr = srv.start(); + let _ = tx.send((addr, srv_addr)); + }); }); let (addr, srv_addr) = rx.recv().unwrap(); - let mut sys = System::new("test-server"); + let _sys = System::new("test-server"); + let mut rt = Runtime::new().unwrap(); { let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) .finish() .unwrap(); - let response = sys.run_until_complete(req.send()).unwrap(); + let response = rt.block_on(req.send()).unwrap(); assert!(response.status().is_success()); } @@ -95,7 +97,7 @@ fn test_start() { .timeout(time::Duration::from_millis(200)) .finish() .unwrap(); - assert!(sys.run_until_complete(req.send()).is_err()); + assert!(rt.block_on(req.send()).is_err()); } // resume @@ -105,7 +107,7 @@ fn test_start() { let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) .finish() .unwrap(); - let response = sys.run_until_complete(req.send()).unwrap(); + let response = rt.block_on(req.send()).unwrap(); assert!(response.status().is_success()); } } @@ -116,29 +118,29 @@ fn test_shutdown() { let _ = test::TestServer::unused_addr(); let (tx, rx) = mpsc::channel(); - thread::spawn(move || { - let sys = System::new("test"); - let srv = server::new(|| { - vec![App::new().resource("/", |r| { - r.method(http::Method::GET).f(|_| HttpResponse::Ok()) - })] - }); + thread::spawn(|| { + System::run(move || { + let srv = server::new(|| { + vec![App::new().resource("/", |r| { + r.method(http::Method::GET).f(|_| HttpResponse::Ok()) + })] + }); - let srv = srv.bind("127.0.0.1:0").unwrap(); - let addr = srv.addrs()[0]; - let srv_addr = srv.shutdown_timeout(1).start(); - let _ = tx.send((addr, srv_addr)); - sys.run(); + let srv = srv.bind("127.0.0.1:0").unwrap(); + let addr = srv.addrs()[0]; + let srv_addr = srv.shutdown_timeout(1).start(); + let _ = tx.send((addr, srv_addr)); + }); }); let (addr, srv_addr) = rx.recv().unwrap(); - let mut sys = System::new("test-server"); - + let _sys = System::new("test-server"); + let mut rt = Runtime::new().unwrap(); { let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) .finish() .unwrap(); - let response = sys.run_until_complete(req.send()).unwrap(); + let response = rt.block_on(req.send()).unwrap(); srv_addr.do_send(server::StopServer { graceful: true }); assert!(response.status().is_success()); } @@ -263,7 +265,7 @@ fn test_body_gzip_large() { #[test] fn test_body_gzip_large_random() { let data = rand::thread_rng() - .gen_ascii_chars() + .sample_iter(&Alphanumeric) .take(70_000) .collect::(); let srv_data = Arc::new(data.clone()); @@ -583,7 +585,7 @@ fn test_gzip_encoding_large() { #[test] fn test_reading_gzip_encoding_large_random() { let data = rand::thread_rng() - .gen_ascii_chars() + .sample_iter(&Alphanumeric) .take(60_000) .collect::(); @@ -686,7 +688,7 @@ fn test_reading_deflate_encoding_large() { #[test] fn test_reading_deflate_encoding_large_random() { let data = rand::thread_rng() - .gen_ascii_chars() + .sample_iter(&Alphanumeric) .take(160_000) .collect::(); diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 0d75bc3f2..4f9565ddc 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -7,6 +7,7 @@ extern crate rand; use bytes::Bytes; use futures::Stream; +use rand::distributions::Alphanumeric; use rand::Rng; #[cfg(feature = "alpn")] @@ -86,7 +87,7 @@ fn test_close_description() { #[test] fn test_large_text() { let data = rand::thread_rng() - .gen_ascii_chars() + .sample_iter(&Alphanumeric) .take(65_536) .collect::(); @@ -104,7 +105,7 @@ fn test_large_text() { #[test] fn test_large_bin() { let data = rand::thread_rng() - .gen_ascii_chars() + .sample_iter(&Alphanumeric) .take(65_536) .collect::();