1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

use newer api

This commit is contained in:
Nikolay Kim 2018-02-12 22:56:47 -08:00
parent a544034c06
commit b1eec3131f
9 changed files with 22 additions and 22 deletions

View File

@ -200,7 +200,7 @@ fn main() {
))) )))
.bind("127.0.0.1:59880").unwrap() .bind("127.0.0.1:59880").unwrap()
.start(); .start();
# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
# let _ = sys.run(); # let _ = sys.run();
} }
``` ```

View File

@ -24,7 +24,7 @@ fn main() {
.bind("127.0.0.1:59080").unwrap() .bind("127.0.0.1:59080").unwrap()
.start(); .start();
# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
let _ = sys.run(); let _ = sys.run();
} }
``` ```
@ -52,7 +52,7 @@ use std::sync::mpsc;
fn main() { fn main() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::spawn(move || {
let sys = actix::System::new("http-server"); let sys = actix::System::new("http-server");
let addr = HttpServer::new( let addr = HttpServer::new(
@ -66,7 +66,7 @@ fn main() {
}); });
let addr = rx.recv().unwrap(); let addr = rx.recv().unwrap();
let _ = addr.call( let _ = addr.send(
server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
} }
``` ```

View File

@ -109,7 +109,7 @@ fn main() {
.start(); .start();
println!("Started http server: 127.0.0.1:8088"); println!("Started http server: 127.0.0.1:8088");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
let _ = sys.run(); let _ = sys.run();
} }
``` ```
@ -167,7 +167,7 @@ fn main() {
.start(); .start();
println!("Started http server: 127.0.0.1:8088"); println!("Started http server: 127.0.0.1:8088");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
let _ = sys.run(); let _ = sys.run();
} }
``` ```

View File

@ -146,7 +146,7 @@ impl ClientConnector {
/// if let Ok(mut stream) = res { /// if let Ok(mut stream) = res {
/// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap(); /// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap();
/// } /// }
/// # Arbiter::system().send(actix::msgs::SystemExit(0)); /// # Arbiter::system().do_send(actix::msgs::SystemExit(0));
/// Ok(()) /// Ok(())
/// }) /// })
/// }); /// });
@ -191,7 +191,7 @@ impl Handler<Connect> for ClientConnector {
ActorResponse::async( ActorResponse::async(
Connector::from_registry() Connector::from_registry()
.call(ResolveConnect::host_and_port(&host, port)) .send(ResolveConnect::host_and_port(&host, port))
.into_actor(self) .into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected) .map_err(|_, _, _| ClientConnectorError::Disconnected)
.and_then(move |res, _act, _| { .and_then(move |res, _act, _| {

View File

@ -264,7 +264,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
/// .resource("/", |r| r.h(httpcodes::HTTPOk))) /// .resource("/", |r| r.h(httpcodes::HTTPOk)))
/// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
/// .start(); /// .start();
/// # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); /// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
/// ///
/// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes
/// } /// }
@ -289,7 +289,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::start(self); let addr: Addr<Syn, _> = Actor::start(self);
signals.map(|signals| signals.send( signals.map(|signals| signals.do_send(
signal::Subscribe(addr.clone().recipient()))); signal::Subscribe(addr.clone().recipient())));
addr addr
} }
@ -351,7 +351,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::start(self); let addr: Addr<Syn, _> = Actor::start(self);
signals.map(|signals| signals.send( signals.map(|signals| signals.do_send(
signal::Subscribe(addr.clone().recipient()))); signal::Subscribe(addr.clone().recipient())));
Ok(addr) Ok(addr)
} }
@ -395,7 +395,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::start(self); let addr: Addr<Syn, _> = Actor::start(self);
signals.map(|signals| signals.send( signals.map(|signals| signals.do_send(
signal::Subscribe(addr.clone().recipient()))); signal::Subscribe(addr.clone().recipient())));
Ok(addr) Ok(addr)
} }
@ -442,7 +442,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
.map(move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); .map(move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false}));
self self
}); });
signals.map(|signals| signals.send( signals.map(|signals| signals.do_send(
signal::Subscribe(addr.clone().recipient()))); signal::Subscribe(addr.clone().recipient())));
addr addr
} }
@ -536,7 +536,7 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H>
}; };
for worker in &self.workers { for worker in &self.workers {
let tx2 = tx.clone(); let tx2 = tx.clone();
let fut = worker.call(StopWorker{graceful: dur}).into_actor(self); let fut = worker.send(StopWorker{graceful: dur}).into_actor(self);
ActorFuture::then(fut, move |_, slf, _| { ActorFuture::then(fut, move |_, slf, _| {
slf.workers.pop(); slf.workers.pop();
if slf.workers.is_empty() { if slf.workers.is_empty() {
@ -544,7 +544,7 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H>
// we need to stop system if server was spawned // we need to stop system if server was spawned
if slf.exit { if slf.exit {
Arbiter::system().send(actix::msgs::SystemExit(0)) Arbiter::system().do_send(actix::msgs::SystemExit(0))
} }
} }
actix::fut::ok(()) actix::fut::ok(())
@ -557,7 +557,7 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H>
} else { } else {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
Arbiter::system().send(actix::msgs::SystemExit(0)) Arbiter::system().do_send(actix::msgs::SystemExit(0))
} }
Response::reply(Ok(())) Response::reply(Ok(()))
} }

View File

@ -78,14 +78,14 @@ impl<H: HttpHandler + 'static> Worker<H> {
let num = slf.settings.num_channels(); let num = slf.settings.num_channels();
if num == 0 { if num == 0 {
let _ = tx.send(true); let _ = tx.send(true);
Arbiter::arbiter().send(StopArbiter(0)); Arbiter::arbiter().do_send(StopArbiter(0));
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d); slf.shutdown_timeout(ctx, tx, d);
} else { } else {
info!("Force shutdown http worker, {} connections", num); info!("Force shutdown http worker, {} connections", num);
slf.settings.head().traverse::<TcpStream, H>(); slf.settings.head().traverse::<TcpStream, H>();
let _ = tx.send(false); let _ = tx.send(false);
Arbiter::arbiter().send(StopArbiter(0)); Arbiter::arbiter().do_send(StopArbiter(0));
} }
}); });
} }

View File

@ -165,7 +165,7 @@ impl TestServer {
/// Stop http server /// Stop http server
fn stop(&mut self) { fn stop(&mut self) {
if let Some(handle) = self.thread.take() { if let Some(handle) = self.thread.take() {
self.server_sys.send(msgs::SystemExit(0)); self.server_sys.do_send(msgs::SystemExit(0));
let _ = handle.join(); let _ = handle.join();
} }
} }

View File

@ -200,7 +200,7 @@ impl WsClient {
// get connection and start handshake // get connection and start handshake
Ok(Box::new( Ok(Box::new(
self.conn.call(Connect(request.uri().clone())) self.conn.send(Connect(request.uri().clone()))
.map_err(|_| WsClientError::Disconnected) .map_err(|_| WsClientError::Disconnected)
.and_then(|res| match res { .and_then(|res| match res {
Ok(stream) => Either::A(WsHandshake::new(stream, request)), Ok(stream) => Either::A(WsHandshake::new(stream, request)),

View File

@ -72,12 +72,12 @@ fn test_start() {
assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success());
// pause // pause
let _ = srv_addr.call(server::PauseServer).wait(); let _ = srv_addr.send(server::PauseServer).wait();
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err()); assert!(net::TcpStream::connect(addr).is_err());
// resume // resume
let _ = srv_addr.call(server::ResumeServer).wait(); let _ = srv_addr.send(server::ResumeServer).wait();
assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success());
} }