1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 07:14:35 +01:00

use new actix system api

This commit is contained in:
Nikolay Kim 2018-05-29 10:31:37 -07:00
parent fb582a6bca
commit ecd05662c0
13 changed files with 173 additions and 165 deletions

View File

@ -1,16 +1,15 @@
//! Http client api
//!
//! ```rust
//! ```rust,ignore
//! # extern crate actix;
//! # extern crate actix_web;
//! # extern crate futures;
//! # extern crate tokio;
//! # use futures::Future;
//! use actix_web::client;
//!
//! fn main() {
//! let sys = actix::System::new("test");
//!
//! actix::Arbiter::spawn({
//! tokio::run({
//! client::get("http://www.rust-lang.org") // <- Create request builder
//! .header("User-Agent", "Actix-web")
//! .finish().unwrap()
@ -18,12 +17,9 @@
//! .map_err(|_| ())
//! .and_then(|response| { // <- server http response
//! println!("Response: {:?}", response);
//! # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
//! Ok(())
//! })
//! });
//!
//! sys.run();
//! }
//! ```
mod connector;
@ -60,30 +56,24 @@ impl ResponseError for SendRequestError {
/// Create request builder for `GET` requests
///
/// ```rust
/// ```rust,ignore
/// # extern crate actix;
/// # extern crate actix_web;
/// # extern crate futures;
/// # use futures::Future;
/// # use futures::{future, Future};
/// use actix_web::client;
///
/// fn main() {
/// let sys = actix::System::new("test");
///
/// actix::Arbiter::spawn({
/// tokio::run(
/// client::get("http://www.rust-lang.org") // <- Create request builder
/// .header("User-Agent", "Actix-web")
/// .finish().unwrap()
/// .send() // <- Send http request
/// .map_err(|_| ())
/// .and_then(|response| { // <- server http response
/// .and_then(|response| { // <- server http response
/// println!("Response: {:?}", response);
/// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
/// Ok(())
/// })
/// });
///
/// sys.run();
/// }));
/// }
/// ```
pub fn get<U: AsRef<str>>(uri: U) -> ClientRequestBuilder {

View File

@ -380,7 +380,10 @@ impl Pipeline {
match self.timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(())) => return Err(SendRequestError::Timeout),
Ok(Async::NotReady) => (),
Err(_) => unreachable!(),
Err(e) => {
println!("err: {:?}", e);
return Err(SendRequestError::Timeout);
}
}
}
Ok(())

View File

@ -24,7 +24,7 @@ use httprequest::HttpRequest;
/// An HTTP Client Request
///
/// ```rust
/// ```rust,ignore
/// # extern crate actix;
/// # extern crate actix_web;
/// # extern crate futures;
@ -32,22 +32,17 @@ use httprequest::HttpRequest;
/// use actix_web::client::ClientRequest;
///
/// fn main() {
/// let sys = actix::System::new("test");
///
/// actix::Arbiter::spawn({
/// tokio::run(
/// ClientRequest::get("http://www.rust-lang.org") // <- Create request builder
/// .header("User-Agent", "Actix-web")
/// .finish().unwrap()
/// .send() // <- Send http request
/// .map_err(|_| ())
/// .and_then(|response| { // <- server http response
/// .and_then(|response| { // <- server http response
/// println!("Response: {:?}", response);
/// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
/// Ok(())
/// })
/// });
///
/// sys.run();
/// );
/// }
/// ```
pub struct ClientRequest {

View File

@ -90,7 +90,7 @@ where
self.inner.cancel_future(handle)
}
#[inline]
fn address(&mut self) -> Addr<A> {
fn address(&self) -> Addr<A> {
self.inner.address()
}
}

View File

@ -50,17 +50,17 @@
//! }
//!
//! fn main() {
//! let sys = actix::System::new("basic-example");
//! server::new(
//! || App::new().middleware(
//! SessionStorage::new( // <- create session middleware
//! CookieSessionBackend::signed(&[0; 32]) // <- create signed cookie session backend
//! .secure(false)
//! )))
//! .bind("127.0.0.1:59880").unwrap()
//! .start();
//! # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
//! let _ = sys.run();
//! actix::System::run(|| {
//! server::new(
//! || App::new().middleware(
//! SessionStorage::new( // <- create session middleware
//! CookieSessionBackend::signed(&[0; 32]) // <- create signed cookie session backend
//! .secure(false)
//! )))
//! .bind("127.0.0.1:59880").unwrap()
//! .start();
//! # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
//! });
//! }
//! ```
use std::cell::RefCell;

View File

@ -805,7 +805,7 @@ mod tests {
.unwrap();
let req = HttpRequest::default();
let mut ctx = HttpContext::new(req.clone(), MyActor);
let ctx = HttpContext::new(req.clone(), MyActor);
let addr = ctx.address();
let mut info = PipelineInfo::new(req);
info.context = Some(Box::new(ctx));

View File

@ -48,16 +48,16 @@ pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
/// use actix_web::{server, App, HttpResponse};
///
/// fn main() {
/// let sys = actix::System::new("guide");
/// actix::System::run(|| {
///
/// server::new(
/// || App::new()
/// .resource("/", |r| r.f(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:59090").unwrap()
/// .start();
/// server::new(
/// || App::new()
/// .resource("/", |r| r.f(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:59090").unwrap()
/// .start();
///
/// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
/// let _ = sys.run();
/// });
/// }
/// ```
pub fn new<F, U, H>(factory: F) -> HttpServer<H>

View File

@ -410,16 +410,16 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// use actix_web::{server, App, HttpResponse};
///
/// fn main() {
/// let sys = actix::System::new("example"); // <- create Actix system
/// // Run actix system, this method actually starts all async processes
/// actix::System::run(|| {
///
/// server::new(
/// || App::new()
/// .resource("/", |r| r.h(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
/// .start();
/// server::new(
/// || App::new()
/// .resource("/", |r| r.h(|_| HttpResponse::Ok())))
/// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
/// .start();
/// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
///
/// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes
/// });
/// }
/// ```
pub fn start(mut self) -> Addr<Self> {
@ -496,9 +496,11 @@ impl<H: IntoHttpHandler> HttpServer<H> {
self.no_signals = false;
let _ = thread::spawn(move || {
let sys = System::new("http-server");
self.start();
let _ = sys.run();
System::new("http-server")
.config(|| {
self.start();
})
.run();
}).join();
}
}
@ -565,7 +567,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// This method uses only one thread for handling incoming connections.
pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Self>
where
S: Stream<Item = T, Error = io::Error> + 'static,
S: Stream<Item = T, Error = io::Error> + Send + 'static,
T: AsyncRead + AsyncWrite + 'static,
{
// set server settings
@ -588,6 +590,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
}));
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
@ -686,12 +689,13 @@ where
type Result = ();
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::spawn(HttpChannel::new(
unimplemented!();
/*Arbiter::spawn(HttpChannel::new(
Rc::clone(self.h.as_ref().unwrap()),
msg.io,
msg.peer,
msg.http2,
));
));*/
}
}

View File

@ -1,9 +1,10 @@
use futures::unsync::oneshot;
use futures::sync::oneshot;
use futures::Future;
use net2::TcpStreamExt;
use slab::Slab;
use std::rc::Rc;
use std::{net, time};
use tokio::executor::current_thread;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
@ -180,7 +181,7 @@ impl StreamHandlerType {
let io = TcpStream::from_std(msg.io, &Handle::default())
.expect("failed to associate TCP stream");
Arbiter::spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
current_thread::spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
}
#[cfg(feature = "tls")]
StreamHandlerType::Tls(ref acceptor) => {
@ -194,9 +195,9 @@ impl StreamHandlerType {
Arbiter::spawn(TlsAcceptorExt::accept_async(acceptor, io).then(
move |res| {
match res {
Ok(io) => {
Arbiter::spawn(HttpChannel::new(h, io, peer, http2))
}
Ok(io) => current_thread::spawn(HttpChannel::new(
h, io, peer, http2,
)),
Err(err) => {
trace!("Error during handling tls connection: {}", err)
}
@ -223,7 +224,9 @@ impl StreamHandlerType {
} else {
false
};
Arbiter::spawn(HttpChannel::new(h, io, peer, http2));
current_thread::spawn(HttpChannel::new(
h, io, peer, http2,
));
}
Err(err) => {
trace!("Error during handling tls connection: {}", err)

View File

@ -5,15 +5,13 @@ use std::str::FromStr;
use std::sync::mpsc;
use std::{net, thread};
use actix::{msgs, Actor, Addr, Arbiter, System, SystemRunner};
use actix::{msgs, Actor, Addr, Arbiter, System};
use cookie::Cookie;
use futures::Future;
use http::header::HeaderName;
use http::{HeaderMap, HttpTryFrom, Method, Uri, Version};
use net2::TcpBuilder;
use tokio::runtime::current_thread::Runtime;
use tokio_reactor::Handle;
use tokio_tcp::TcpListener;
#[cfg(feature = "alpn")]
use openssl::ssl::SslAcceptor;
@ -63,10 +61,10 @@ use ws;
pub struct TestServer {
addr: net::SocketAddr,
thread: Option<thread::JoinHandle<()>>,
system: SystemRunner,
server_sys: Addr<System>,
ssl: bool,
conn: Addr<ClientConnector>,
rt: Runtime,
}
impl TestServer {
@ -113,25 +111,31 @@ impl TestServer {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
let tcp = TcpListener::from_std(tcp, &Handle::default()).unwrap();
HttpServer::new(factory)
.disable_signals()
.start_incoming(tcp.incoming(), false);
sys.config(move || {
HttpServer::new(factory)
.disable_signals()
.listen(tcp)
.start();
tx.send((Arbiter::system(), local_addr)).unwrap();
let _ = sys.run();
tx.send((
Arbiter::system(),
local_addr,
TestServer::get_conn(),
Arbiter::registry().clone(),
)).unwrap();
}).run();
});
let sys = System::new("actix-test");
let (server_sys, addr) = rx.recv().unwrap();
let (server_sys, addr, conn, reg) = rx.recv().unwrap();
Arbiter::set_system_reg(reg);
TestServer {
addr,
server_sys,
conn,
ssl: false,
conn: TestServer::get_conn(),
thread: Some(join),
system: sys,
rt: Runtime::new().unwrap(),
}
}
@ -197,7 +201,7 @@ impl TestServer {
where
F: Future<Item = I, Error = E>,
{
self.system.run_until_complete(fut)
self.rt.block_on(fut)
}
/// Connect to websocket server
@ -205,9 +209,8 @@ impl TestServer {
&mut self,
) -> Result<(ws::ClientReader, ws::ClientWriter), ws::ClientError> {
let url = self.url("/");
self.system.run_until_complete(
ws::Client::with_connector(url, self.conn.clone()).connect(),
)
self.rt
.block_on(ws::Client::with_connector(url, self.conn.clone()).connect())
}
/// Create `GET` request
@ -285,57 +288,64 @@ impl<S: 'static> TestServerBuilder<S> {
// run server in separate thread
let join = thread::spawn(move || {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
let tcp = TcpListener::from_std(tcp, &Handle::default()).unwrap();
let state = self.state;
let srv = HttpServer::new(move || {
let mut app = TestApp::new(state());
config(&mut app);
vec![app]
}).disable_signals();
System::new("actix-test-server")
.config(move || {
let srv = HttpServer::new(move || {
let mut app = TestApp::new(state());
config(&mut app);
vec![app]
}).workers(1)
.disable_signals();
#[cfg(feature = "alpn")]
{
use futures::Stream;
use std::io;
use tokio_openssl::SslAcceptorExt;
tx.send((
Arbiter::system(),
local_addr,
TestServer::get_conn(),
Arbiter::registry().clone(),
)).unwrap();
let ssl = self.ssl.take();
if let Some(ssl) = ssl {
srv.start_incoming(
tcp.incoming().and_then(move |sock| {
ssl.accept_async(sock)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}),
false,
);
} else {
srv.start_incoming(tcp.incoming(), false);
}
}
#[cfg(not(feature = "alpn"))]
{
srv.start_incoming(tcp.incoming(), false);
}
#[cfg(feature = "alpn")]
{
use futures::Stream;
use std::io;
use tokio_openssl::SslAcceptorExt;
tx.send((Arbiter::system(), local_addr)).unwrap();
let _ = sys.run();
let ssl = self.ssl.take();
if let Some(ssl) = ssl {
srv.start_incoming(
tcp.incoming().and_then(move |sock| {
ssl.accept_async(sock).map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
})
}),
false,
);
} else {
srv.start_incoming(tcp.incoming(), false);
}
}
#[cfg(not(feature = "alpn"))]
{
srv.listen(tcp).start();
}
})
.run();
});
let system = System::new("actix-test");
let (server_sys, addr) = rx.recv().unwrap();
let (server_sys, addr, conn, reg) = rx.recv().unwrap();
Arbiter::set_system_reg(reg);
TestServer {
addr,
server_sys,
ssl,
system,
conn: TestServer::get_conn(),
conn,
server_sys,
thread: Some(join),
rt: Runtime::new().unwrap(),
}
}
}

View File

@ -75,7 +75,7 @@ where
}
#[inline]
fn address(&mut self) -> Addr<A> {
fn address(&self) -> Addr<A> {
self.inner.address()
}
}

View File

@ -1,5 +1,7 @@
extern crate actix;
extern crate actix_web;
#[cfg(feature = "brotli")]
extern crate brotli2;
extern crate bytes;
extern crate flate2;
extern crate futures;
@ -10,8 +12,9 @@ extern crate tokio;
extern crate tokio_reactor;
extern crate tokio_tcp;
#[cfg(feature = "brotli")]
extern crate brotli2;
use std::io::{Read, Write};
use std::sync::{mpsc, Arc};
use std::{net, thread, time};
#[cfg(feature = "brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder};
@ -23,10 +26,8 @@ use futures::stream::once;
use futures::{Future, Stream};
use h2::client as h2client;
use modhttp::Request;
use rand::distributions::Alphanumeric;
use rand::Rng;
use std::io::{Read, Write};
use std::sync::{mpsc, Arc};
use std::{net, thread, time};
use tokio::executor::current_thread;
use tokio::runtime::current_thread::Runtime;
use tokio_tcp::TcpStream;
@ -62,28 +63,29 @@ fn test_start() {
let _ = test::TestServer::unused_addr();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let sys = System::new("test");
let srv = server::new(|| {
vec![App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})]
});
thread::spawn(|| {
System::run(move || {
let srv = server::new(|| {
vec![App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})]
});
let srv = srv.bind("127.0.0.1:0").unwrap();
let addr = srv.addrs()[0];
let srv_addr = srv.start();
let _ = tx.send((addr, srv_addr));
sys.run();
let srv = srv.bind("127.0.0.1:0").unwrap();
let addr = srv.addrs()[0];
let srv_addr = srv.start();
let _ = tx.send((addr, srv_addr));
});
});
let (addr, srv_addr) = rx.recv().unwrap();
let mut sys = System::new("test-server");
let _sys = System::new("test-server");
let mut rt = Runtime::new().unwrap();
{
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
let response = sys.run_until_complete(req.send()).unwrap();
let response = rt.block_on(req.send()).unwrap();
assert!(response.status().is_success());
}
@ -95,7 +97,7 @@ fn test_start() {
.timeout(time::Duration::from_millis(200))
.finish()
.unwrap();
assert!(sys.run_until_complete(req.send()).is_err());
assert!(rt.block_on(req.send()).is_err());
}
// resume
@ -105,7 +107,7 @@ fn test_start() {
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
let response = sys.run_until_complete(req.send()).unwrap();
let response = rt.block_on(req.send()).unwrap();
assert!(response.status().is_success());
}
}
@ -116,29 +118,29 @@ fn test_shutdown() {
let _ = test::TestServer::unused_addr();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let sys = System::new("test");
let srv = server::new(|| {
vec![App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})]
});
thread::spawn(|| {
System::run(move || {
let srv = server::new(|| {
vec![App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})]
});
let srv = srv.bind("127.0.0.1:0").unwrap();
let addr = srv.addrs()[0];
let srv_addr = srv.shutdown_timeout(1).start();
let _ = tx.send((addr, srv_addr));
sys.run();
let srv = srv.bind("127.0.0.1:0").unwrap();
let addr = srv.addrs()[0];
let srv_addr = srv.shutdown_timeout(1).start();
let _ = tx.send((addr, srv_addr));
});
});
let (addr, srv_addr) = rx.recv().unwrap();
let mut sys = System::new("test-server");
let _sys = System::new("test-server");
let mut rt = Runtime::new().unwrap();
{
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
let response = sys.run_until_complete(req.send()).unwrap();
let response = rt.block_on(req.send()).unwrap();
srv_addr.do_send(server::StopServer { graceful: true });
assert!(response.status().is_success());
}
@ -263,7 +265,7 @@ fn test_body_gzip_large() {
#[test]
fn test_body_gzip_large_random() {
let data = rand::thread_rng()
.gen_ascii_chars()
.sample_iter(&Alphanumeric)
.take(70_000)
.collect::<String>();
let srv_data = Arc::new(data.clone());
@ -583,7 +585,7 @@ fn test_gzip_encoding_large() {
#[test]
fn test_reading_gzip_encoding_large_random() {
let data = rand::thread_rng()
.gen_ascii_chars()
.sample_iter(&Alphanumeric)
.take(60_000)
.collect::<String>();
@ -686,7 +688,7 @@ fn test_reading_deflate_encoding_large() {
#[test]
fn test_reading_deflate_encoding_large_random() {
let data = rand::thread_rng()
.gen_ascii_chars()
.sample_iter(&Alphanumeric)
.take(160_000)
.collect::<String>();

View File

@ -7,6 +7,7 @@ extern crate rand;
use bytes::Bytes;
use futures::Stream;
use rand::distributions::Alphanumeric;
use rand::Rng;
#[cfg(feature = "alpn")]
@ -86,7 +87,7 @@ fn test_close_description() {
#[test]
fn test_large_text() {
let data = rand::thread_rng()
.gen_ascii_chars()
.sample_iter(&Alphanumeric)
.take(65_536)
.collect::<String>();
@ -104,7 +105,7 @@ fn test_large_text() {
#[test]
fn test_large_bin() {
let data = rand::thread_rng()
.gen_ascii_chars()
.sample_iter(&Alphanumeric)
.take(65_536)
.collect::<String>();