From 8c1b5fa94549d6710b2b4fb3776058ceeba19604 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 12:17:30 -0800 Subject: [PATCH 01/12] sync with latest actix --- Cargo.toml | 2 +- examples/diesel/src/db.rs | 7 +++---- examples/diesel/src/main.rs | 2 +- examples/websocket-chat/src/client.rs | 2 +- examples/websocket-chat/src/codec.rs | 15 ++------------ examples/websocket-chat/src/main.rs | 13 ++++++------ examples/websocket-chat/src/server.rs | 21 +++++++------------ examples/websocket-chat/src/session.rs | 16 +++++++-------- examples/websocket/src/client.rs | 2 +- src/client/connector.rs | 19 +++++++++-------- src/context.rs | 10 ++++----- src/server/mod.rs | 6 +++++- src/server/srv.rs | 28 +++++--------------------- src/server/worker.rs | 10 +++++---- src/ws/context.rs | 6 +++--- src/ws/mod.rs | 9 ++------- 16 files changed, 65 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 96104a700..36301036b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ openssl = { version="0.10", optional = true } tokio-openssl = { version="0.2", optional = true } [dependencies.actix] -#version = "^0.4.6" +#version = "0.5" git = "https://github.com/actix/actix.git" [dev-dependencies] diff --git a/examples/diesel/src/db.rs b/examples/diesel/src/db.rs index 4ca85130e..afbf43dea 100644 --- a/examples/diesel/src/db.rs +++ b/examples/diesel/src/db.rs @@ -17,9 +17,8 @@ pub struct CreateUser { pub name: String, } -impl ResponseType for CreateUser { - type Item = models::User; - type Error = Error; +impl Message for CreateUser { + type Result = Result; } impl Actor for DbExecutor { @@ -27,7 +26,7 @@ impl Actor for DbExecutor { } impl Handler for DbExecutor { - type Result = MessageResult; + type Result = Result; fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result { use self::schema::users::dsl::*; diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 4c4bc4cda..881cbe1c5 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -31,7 +31,7 @@ use db::{CreateUser, DbExecutor}; /// State with DbExecutor address struct State { - db: SyncAddress, + db: Addr>, } /// Async request handler diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 5da1f37f6..4fe18d8e9 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -29,7 +29,7 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: SyncAddress<_> = ChatClient::create(|ctx| { + let addr: Addr> = ChatClient::create(|ctx| { let (r, w) = stream.split(); ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); ChatClient{ diff --git a/examples/websocket-chat/src/codec.rs b/examples/websocket-chat/src/codec.rs index 718c3c82f..03638241b 100644 --- a/examples/websocket-chat/src/codec.rs +++ b/examples/websocket-chat/src/codec.rs @@ -4,10 +4,9 @@ use serde_json as json; use byteorder::{BigEndian , ByteOrder}; use bytes::{BytesMut, BufMut}; use tokio_io::codec::{Encoder, Decoder}; -use actix::ResponseType; /// Client request -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Message)] #[serde(tag="cmd", content="data")] pub enum ChatRequest { /// List rooms @@ -20,13 +19,8 @@ pub enum ChatRequest { Ping } -impl ResponseType for ChatRequest { - type Item = (); - type Error = (); -} - /// Server response -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Message)] #[serde(tag="cmd", content="data")] pub enum ChatResponse { Ping, @@ -41,11 +35,6 @@ pub enum ChatResponse { Message(String), } -impl ResponseType for ChatResponse { - type Item = (); - type Error = (); -} - /// Codec for Client -> Server transport pub struct ChatCodec; diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 0252f141e..27957f69d 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -26,7 +26,7 @@ mod session; /// This is our websocket route state, this state is shared with all route instances /// via `HttpContext::state()` struct WsChatSessionState { - addr: SyncAddress, + addr: Addr>, } /// Entry point for our route @@ -62,12 +62,12 @@ impl Actor for WsChatSession { // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared across all // routes within application - let addr: SyncAddress<_> = ctx.address(); + let addr: Addr> = ctx.address(); ctx.state().addr.call( - self, server::Connect{addr: addr.into()}).then( + self, server::Connect{addr: addr.subscriber()}).then( |res, act, ctx| { match res { - Ok(Ok(res)) => act.id = res, + Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } @@ -111,7 +111,7 @@ impl Handler for WsChatSession { println!("List rooms"); ctx.state().addr.call(self, server::ListRooms).then(|res, _, ctx| { match res { - Ok(Ok(rooms)) => { + Ok(rooms) => { for room in rooms { ctx.text(room); } @@ -172,8 +172,7 @@ fn main() { let sys = actix::System::new("websocket-example"); // Start chat server actor in separate thread - let server: SyncAddress<_> = - Arbiter::start(|_| server::ChatServer::default()); + let server: Addr> = Arbiter::start(|_| server::ChatServer::default()); // Start tcp server in separate thread let srv = server.clone(); diff --git a/examples/websocket-chat/src/server.rs b/examples/websocket-chat/src/server.rs index 477a401be..b38fb4601 100644 --- a/examples/websocket-chat/src/server.rs +++ b/examples/websocket-chat/src/server.rs @@ -12,18 +12,12 @@ use session; /// Message for chat server communications /// New chat session is created +#[derive(Message)] +#[rtype(usize)] pub struct Connect { pub addr: SyncSubscriber, } -/// Response type for Connect message -/// -/// Chat server returns unique session id -impl ResponseType for Connect { - type Item = usize; - type Error = (); -} - /// Session is disconnected #[derive(Message)] pub struct Disconnect { @@ -44,9 +38,8 @@ pub struct Message { /// List of available rooms pub struct ListRooms; -impl ResponseType for ListRooms { - type Item = Vec; - type Error = (); +impl actix::Message for ListRooms { + type Result = Vec; } /// Join room, if room does not exists create new one. @@ -106,7 +99,7 @@ impl Actor for ChatServer { /// /// Register new session and assign unique id to this session impl Handler for ChatServer { - type Result = MessageResult; + type Result = usize; fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { println!("Someone joined"); @@ -122,7 +115,7 @@ impl Handler for ChatServer { self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); // send id back - Ok(id) + id } } @@ -171,7 +164,7 @@ impl Handler for ChatServer { rooms.push(key.to_owned()) } - Ok(rooms) + MessageResult(rooms) } } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 29fcb9895..50c2701ef 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -24,7 +24,7 @@ pub struct ChatSession { /// unique session id id: usize, /// this is address of chat server - addr: SyncAddress, + addr: Addr>, /// Client must send ping at least once per 10 seconds, otherwise we drop connection. hb: Instant, /// joined room @@ -45,11 +45,11 @@ impl Actor for ChatSession { // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. - let addr: SyncAddress<_> = ctx.address(); - self.addr.call(self, server::Connect{addr: addr.into()}) + let addr: Addr> = ctx.address(); + self.addr.call(self, server::Connect{addr: addr.subscriber()}) .then(|res, act, ctx| { match res { - Ok(Ok(res)) => act.id = res, + Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } @@ -77,7 +77,7 @@ impl StreamHandler for ChatSession { println!("List rooms"); self.addr.call(self, server::ListRooms).then(|res, act, ctx| { match res { - Ok(Ok(rooms)) => { + Ok(rooms) => { act.framed.write(ChatResponse::Rooms(rooms)); }, _ => println!("Something is wrong"), @@ -121,7 +121,7 @@ impl Handler for ChatSession { /// Helper methods impl ChatSession { - pub fn new(addr: SyncAddress, + pub fn new(addr: Addr>, framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession { ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned(), framed: framed} @@ -155,11 +155,11 @@ impl ChatSession { /// Define tcp server that will accept incoming tcp connection and create /// chat actors. pub struct TcpServer { - chat: SyncAddress, + chat: Addr>, } impl TcpServer { - pub fn new(s: &str, chat: SyncAddress) { + pub fn new(s: &str, chat: Addr>) { // Create server listener let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); diff --git a/examples/websocket/src/client.rs b/examples/websocket/src/client.rs index 31fe614df..3eba7277e 100644 --- a/examples/websocket/src/client.rs +++ b/examples/websocket/src/client.rs @@ -28,7 +28,7 @@ fn main() { () }) .map(|(reader, writer)| { - let addr: SyncAddress<_> = ChatClient::create(|ctx| { + let addr: Addr> = ChatClient::create(|ctx| { ChatClient::add_stream(reader, ctx); ChatClient(writer) }); diff --git a/src/client/connector.rs b/src/client/connector.rs index 6ecf91f73..2a0466cf2 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -5,7 +5,7 @@ use std::collections::VecDeque; use std::time::Duration; use actix::{fut, Actor, ActorFuture, Arbiter, Context, - Handler, Response, ResponseType, Supervised}; + Handler, Message, ActorResponse, Supervised}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; @@ -37,9 +37,8 @@ impl Connect { } } -impl ResponseType for Connect { - type Item = Connection; - type Error = ClientConnectorError; +impl Message for Connect { + type Result = Result; } /// A set of errors that can occur during connecting to a http host @@ -163,34 +162,34 @@ impl ClientConnector { } impl Handler for ClientConnector { - type Result = Response; + type Result = ActorResponse; fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { let uri = &msg.0; // host name is required if uri.host().is_none() { - return Response::reply(Err(ClientConnectorError::InvalidUrl)) + return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)) } // supported protocols let proto = match uri.scheme_part() { Some(scheme) => match Protocol::from(scheme.as_str()) { Some(proto) => proto, - None => return Response::reply(Err(ClientConnectorError::InvalidUrl)), + None => return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)), }, - None => return Response::reply(Err(ClientConnectorError::InvalidUrl)), + None => return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)), }; // check ssl availability if proto.is_secure() && !HAS_OPENSSL { //&& !HAS_TLS { - return Response::reply(Err(ClientConnectorError::SslIsNotSupported)) + return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported)) } let host = uri.host().unwrap().to_owned(); let port = uri.port().unwrap_or_else(|| proto.port()); - Response::async_reply( + ActorResponse::async( Connector::from_registry() .call(self, ResolveConnect::host_and_port(&host, port)) .map_err(|_, _, _| ClientConnectorError::Disconnected) diff --git a/src/context.rs b/src/context.rs index 28cf5d7d9..7603ea059 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,4 +1,4 @@ -use std; +use std::mem; use std::marker::PhantomData; use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; @@ -6,7 +6,7 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Addr, Handler, ResponseType, MessageResult, SpawnHandle, Syn, Unsync}; + Addr, Handler, Message, SpawnHandle, Syn, Unsync}; use actix::fut::ActorFuture; use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; @@ -184,7 +184,7 @@ impl ActorHttpContext for HttpContext where A: Actor, fn poll(&mut self) -> Poll>, Error> { let ctx: &mut HttpContext = unsafe { - std::mem::transmute(self as &mut HttpContext) + mem::transmute(self as &mut HttpContext) }; if self.inner.alive() { @@ -207,9 +207,9 @@ impl ActorHttpContext for HttpContext where A: Actor, impl ToEnvelope, M> for HttpContext where A: Actor> + Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, + M: Message + Send + 'static, M::Result: Send, { - fn pack(msg: M, tx: Option>>) -> Syn { + fn pack(msg: M, tx: Option>) -> Syn { Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 39df2fc8d..1a7b846b6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,6 +2,7 @@ use std::{time, io}; use std::net::Shutdown; +use actix; use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; @@ -43,11 +44,14 @@ pub struct ResumeServer; /// Stop incoming connection processing, stop all workers and exit. /// /// If server starts with `spawn()` method, then spawned thread get terminated. -#[derive(Message)] pub struct StopServer { pub graceful: bool } +impl actix::Message for StopServer { + type Result = Result<(), ()>; +} + /// Low level http request handler #[allow(unused_variables)] pub trait HttpHandler: 'static { diff --git a/src/server/srv.rs b/src/server/srv.rs index 3cd6da122..4217ad5a1 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -352,7 +352,7 @@ impl HttpServer let signals = self.subscribe_to_signals(); let addr: SyncAddress<_> = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + signal::Subscribe(addr.clone().subscriber()))); Ok(addr) } } @@ -396,7 +396,7 @@ impl HttpServer let signals = self.subscribe_to_signals(); let addr: SyncAddress<_> = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + signal::Subscribe(addr.clone().subscriber()))); Ok(addr) } } @@ -477,24 +477,6 @@ impl Handler for HttpServer } } -impl Handler>> for HttpServer - where T: IoStream, - H: IntoHttpHandler, -{ - type Result = (); - - fn handle(&mut self, msg: io::Result>, _: &mut Context) -> Self::Result { - match msg { - Ok(msg) => - Arbiter::handle().spawn( - HttpChannel::new( - Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)), - Err(err) => - debug!("Error handling request: {}", err), - } - } -} - impl Handler> for HttpServer where T: IoStream, H: IntoHttpHandler, @@ -535,7 +517,7 @@ impl Handler for HttpServer impl Handler for HttpServer { - type Result = actix::Response; + type Result = actix::Response<(), ()>; fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { // stop accept threads @@ -570,8 +552,8 @@ impl Handler for HttpServer } if !self.workers.is_empty() { - Response::async_reply( - rx.into_future().map(|_| ()).map_err(|_| ()).actfuture()) + Response::async( + rx.into_future().map(|_| ()).map_err(|_| ())) } else { // we need to stop system if server was spawned if self.exit { diff --git a/src/server/worker.rs b/src/server/worker.rs index 6ee48e2d0..34c5324ff 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -37,12 +37,14 @@ pub(crate) struct Conn { /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. -#[derive(Message)] -#[rtype(bool)] pub(crate) struct StopWorker { pub graceful: Option, } +impl Message for StopWorker { + type Result = Result; +} + /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests processing. @@ -117,7 +119,7 @@ impl Handler> for Worker impl Handler for Worker where H: HttpHandler + 'static, { - type Result = Response; + type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { let num = self.settings.num_channels(); @@ -128,7 +130,7 @@ impl Handler for Worker info!("Graceful http worker shutdown, {} connections", num); let (tx, rx) = oneshot::channel(); self.shutdown_timeout(ctx, tx, dur); - Response::async_reply(rx.map_err(|_| ()).actfuture()) + Response::async(rx.map_err(|_| ())) } else { info!("Force shutdown http worker, {} connections", num); self.settings.head().traverse::(); diff --git a/src/ws/context.rs b/src/ws/context.rs index 1eb78c7e5..6792e51cd 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -5,7 +5,7 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Addr, Handler, ResponseType, SpawnHandle, MessageResult, Syn, Unsync}; + Addr, Handler, Message, Syn, Unsync, SpawnHandle}; use actix::fut::ActorFuture; use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; @@ -219,9 +219,9 @@ impl ActorHttpContext for WebsocketContext where A: Actor ToEnvelope, M> for WebsocketContext where A: Actor> + Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, + M: Message + Send + 'static, M::Result: Send { - fn pack(msg: M, tx: Option>>) -> Syn { + fn pack(msg: M, tx: Option>) -> Syn { Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) } } diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 07b845cc6..d9bf0f103 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -47,7 +47,7 @@ use bytes::BytesMut; use http::{Method, StatusCode, header}; use futures::{Async, Poll, Stream}; -use actix::{Actor, AsyncContext, ResponseType, Handler}; +use actix::{Actor, AsyncContext, Handler}; use body::Binary; use payload::ReadAny; @@ -74,7 +74,7 @@ const SEC_WEBSOCKET_VERSION: &str = "SEC-WEBSOCKET-VERSION"; /// `WebSocket` Message -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Message)] pub enum Message { Text(String), Binary(Binary), @@ -85,11 +85,6 @@ pub enum Message { Error } -impl ResponseType for Message { - type Item = (); - type Error = (); -} - /// Do websocket handshake and start actor pub fn start(mut req: HttpRequest, actor: A) -> Result where A: Actor> + Handler, From 720d8c36c1e664e741e087baa651fc263cf8dd75 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 12:45:08 -0800 Subject: [PATCH 02/12] update names --- src/context.rs | 4 ++-- src/ws/context.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/context.rs b/src/context.rs index 7603ea059..a8ccfd4e4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -8,7 +8,7 @@ use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, Addr, Handler, Message, SpawnHandle, Syn, Unsync}; use actix::fut::ActorFuture; -use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; +use actix::dev::{ContextImpl, ToEnvelope, SyncEnvelope}; use body::{Body, Binary}; use error::{Error, ErrorInternalServerError}; @@ -210,7 +210,7 @@ impl ToEnvelope, M> for HttpContext M: Message + Send + 'static, M::Result: Send, { fn pack(msg: M, tx: Option>) -> Syn { - Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) + Syn::new(Box::new(SyncEnvelope::envelope(msg, tx))) } } diff --git a/src/ws/context.rs b/src/ws/context.rs index 6792e51cd..6f84ea483 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -7,7 +7,7 @@ use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, Addr, Handler, Message, Syn, Unsync, SpawnHandle}; use actix::fut::ActorFuture; -use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; +use actix::dev::{ContextImpl, ToEnvelope, SyncEnvelope}; use body::{Body, Binary}; use error::{Error, ErrorInternalServerError}; @@ -222,7 +222,7 @@ impl ToEnvelope, M> for WebsocketContext M: Message + Send + 'static, M::Result: Send { fn pack(msg: M, tx: Option>) -> Syn { - Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) + Syn::new(Box::new(SyncEnvelope::envelope(msg, tx))) } } From 335ca8ff3388c3b83e68c71ef9b02fbcdb001caa Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 16:08:04 -0800 Subject: [PATCH 03/12] use new actix api --- guide/src/qs_3_5.md | 2 +- src/client/connector.rs | 3 ++- src/context.rs | 10 +++++----- src/pipeline.rs | 2 +- src/server/srv.rs | 18 +++++++++--------- src/test.rs | 2 +- src/ws/client.rs | 6 +++--- src/ws/context.rs | 10 +++++----- tests/test_server.rs | 4 ++-- 9 files changed, 29 insertions(+), 28 deletions(-) diff --git a/guide/src/qs_3_5.md b/guide/src/qs_3_5.md index 62f21b61b..ef35973d4 100644 --- a/guide/src/qs_3_5.md +++ b/guide/src/qs_3_5.md @@ -66,7 +66,7 @@ fn main() { }); let addr = rx.recv().unwrap(); - let _ = addr.call_fut( + let _ = addr.call( server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. } ``` diff --git a/src/client/connector.rs b/src/client/connector.rs index 2a0466cf2..8aa9322a4 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -191,7 +191,8 @@ impl Handler for ClientConnector { ActorResponse::async( Connector::from_registry() - .call(self, ResolveConnect::host_and_port(&host, port)) + .call(ResolveConnect::host_and_port(&host, port)) + .into_actor(self) .map_err(|_, _, _| ClientConnectorError::Disconnected) .and_then(move |res, _act, _| { #[cfg(feature="alpn")] diff --git a/src/context.rs b/src/context.rs index a8ccfd4e4..a3e168f6d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -83,12 +83,12 @@ impl AsyncContext for HttpContext where A: Actor } #[doc(hidden)] #[inline] - fn unsync_address(&mut self) -> Addr> { + fn unsync_address(&mut self) -> Addr { self.inner.unsync_address() } #[doc(hidden)] #[inline] - fn sync_address(&mut self) -> Addr> { + fn sync_address(&mut self) -> Addr { self.inner.sync_address() } } @@ -205,12 +205,12 @@ impl ActorHttpContext for HttpContext where A: Actor, } } -impl ToEnvelope, M> for HttpContext +impl ToEnvelope for HttpContext where A: Actor> + Handler, M: Message + Send + 'static, M::Result: Send, { - fn pack(msg: M, tx: Option>) -> Syn { - Syn::new(Box::new(SyncEnvelope::envelope(msg, tx))) + fn pack(msg: M, tx: Option>) -> SyncEnvelope { + SyncEnvelope::new(msg, tx) } } diff --git a/src/pipeline.rs b/src/pipeline.rs index babd92199..18f9f261c 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -739,7 +739,7 @@ mod tests { let req = HttpRequest::default(); let mut ctx = HttpContext::new(req.clone(), MyActor); - let addr: Addr> = ctx.address(); + let addr: Addr = ctx.address(); let mut info = PipelineInfo::new(req); info.context = Some(Box::new(ctx)); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap(); diff --git a/src/server/srv.rs b/src/server/srv.rs index 4217ad5a1..a41e17c7f 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -36,12 +36,12 @@ pub struct HttpServer where H: IntoHttpHandler + 'static host: Option, keep_alive: Option, factory: Arc Vec + Send + Sync>, - workers: Vec>>>, + workers: Vec>>, sockets: HashMap, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, shutdown_timeout: u16, - signals: Option>>, + signals: Option>, no_signals: bool, } @@ -146,7 +146,7 @@ impl HttpServer where H: IntoHttpHandler + 'static } /// Set alternative address for `ProcessSignals` actor. - pub fn signals(mut self, addr: Addr>) -> Self { + pub fn signals(mut self, addr: Addr) -> Self { self.signals = Some(addr); self } @@ -227,7 +227,7 @@ impl HttpServer where H: IntoHttpHandler + 'static } // subscribe to os signals - fn subscribe_to_signals(&self) -> Option>> { + fn subscribe_to_signals(&self) -> Option> { if !self.no_signals { if let Some(ref signals) = self.signals { Some(signals.clone()) @@ -269,7 +269,7 @@ impl HttpServer /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// } /// ``` - pub fn start(mut self) -> Addr> + pub fn start(mut self) -> Addr { if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called before start()"); @@ -288,7 +288,7 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: Addr> = Actor::start(self); + let addr: Addr = Actor::start(self); signals.map(|signals| signals.send( signal::Subscribe(addr.clone().subscriber()))); addr @@ -407,7 +407,7 @@ impl HttpServer /// Start listening for incoming connections from a stream. /// /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr> + pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr where S: Stream + 'static, T: AsyncRead + AsyncWrite + 'static, A: 'static @@ -435,7 +435,7 @@ impl HttpServer // start server let signals = self.subscribe_to_signals(); - let addr: Addr> = HttpServer::create(move |ctx| { + let addr: Addr = HttpServer::create(move |ctx| { ctx.add_message_stream( stream .map_err(|_| ()) @@ -536,7 +536,7 @@ impl Handler for HttpServer }; for worker in &self.workers { let tx2 = tx.clone(); - let fut = worker.call(self, StopWorker{graceful: dur}); + let fut = worker.call(StopWorker{graceful: dur}).into_actor(self); ActorFuture::then(fut, move |_, slf, _| { slf.workers.pop(); if slf.workers.is_empty() { diff --git a/src/test.rs b/src/test.rs index c98929f0a..48c1d9508 100644 --- a/src/test.rs +++ b/src/test.rs @@ -56,7 +56,7 @@ pub struct TestServer { addr: net::SocketAddr, thread: Option>, system: SystemRunner, - server_sys: Addr>, + server_sys: Addr, } impl TestServer { diff --git a/src/ws/client.rs b/src/ws/client.rs index 7800ab02f..4201c20e2 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -103,7 +103,7 @@ pub struct WsClient { http_err: Option, origin: Option, protocols: Option, - conn: Addr>, + conn: Addr, } impl WsClient { @@ -114,7 +114,7 @@ impl WsClient { } /// Create new websocket connection with custom `ClientConnector` - pub fn with_connector>(uri: S, conn: Addr>) -> WsClient { + pub fn with_connector>(uri: S, conn: Addr) -> WsClient { let mut cl = WsClient { request: ClientRequest::build(), err: None, @@ -200,7 +200,7 @@ impl WsClient { // get connection and start handshake Ok(Box::new( - self.conn.call_fut(Connect(request.uri().clone())) + self.conn.call(Connect(request.uri().clone())) .map_err(|_| WsClientError::Disconnected) .and_then(|res| match res { Ok(stream) => Either::A(WsHandshake::new(stream, request)), diff --git a/src/ws/context.rs b/src/ws/context.rs index 6f84ea483..b9214b749 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -67,13 +67,13 @@ impl AsyncContext for WebsocketContext where A: Actor Addr> { + fn unsync_address(&mut self) -> Addr { self.inner.unsync_address() } #[doc(hidden)] #[inline] - fn sync_address(&mut self) -> Addr> { + fn sync_address(&mut self) -> Addr { self.inner.sync_address() } } @@ -217,12 +217,12 @@ impl ActorHttpContext for WebsocketContext where A: Actor ToEnvelope, M> for WebsocketContext +impl ToEnvelope for WebsocketContext where A: Actor> + Handler, M: Message + Send + 'static, M::Result: Send { - fn pack(msg: M, tx: Option>) -> Syn { - Syn::new(Box::new(SyncEnvelope::envelope(msg, tx))) + fn pack(msg: M, tx: Option>) -> SyncEnvelope { + SyncEnvelope::new(msg, tx) } } diff --git a/tests/test_server.rs b/tests/test_server.rs index 3a8321c83..6c784ca9c 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -72,12 +72,12 @@ fn test_start() { assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); // pause - let _ = srv_addr.call_fut(server::PauseServer).wait(); + let _ = srv_addr.call(server::PauseServer).wait(); thread::sleep(time::Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); // resume - let _ = srv_addr.call_fut(server::ResumeServer).wait(); + let _ = srv_addr.call(server::ResumeServer).wait(); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); } From 57655d8153c3976ce14985f67417e2dc1a50095e Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Tue, 13 Feb 2018 13:47:59 +1300 Subject: [PATCH 04/12] Use AtomicUsize properly doing a read+write on an atomic int will lose updates from other threads. --- guide/src/qs_4.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index e7193ae55..a8424a4a0 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -89,8 +89,7 @@ impl Handler for MyHandler { /// Handle request fn handle(&mut self, req: HttpRequest) -> Self::Result { - let num = self.0.load(Ordering::Relaxed) + 1; - self.0.store(num, Ordering::Relaxed); + self.0.fetch_add(1, Ordering::Relaxed); httpcodes::HTTPOk.into() } } From 7ccacb92ce80aaf5716141d1e81a8f531a0c72fa Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 17:42:10 -0800 Subject: [PATCH 05/12] update websocket-chat example --- examples/websocket-chat/src/client.rs | 2 +- examples/websocket-chat/src/main.rs | 36 ++++++++++++++------------ examples/websocket-chat/src/server.rs | 4 +-- examples/websocket-chat/src/session.rs | 33 ++++++++++++----------- 4 files changed, 40 insertions(+), 35 deletions(-) diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 4fe18d8e9..d3b556b6f 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -29,7 +29,7 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: Addr> = ChatClient::create(|ctx| { + let addr: Addr = ChatClient::create(|ctx| { let (r, w) = stream.split(); ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); ChatClient{ diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 27957f69d..88e8590ef 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -26,7 +26,7 @@ mod session; /// This is our websocket route state, this state is shared with all route instances /// via `HttpContext::state()` struct WsChatSessionState { - addr: Addr>, + addr: Addr, } /// Entry point for our route @@ -62,10 +62,10 @@ impl Actor for WsChatSession { // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared across all // routes within application - let addr: Addr> = ctx.address(); - ctx.state().addr.call( - self, server::Connect{addr: addr.subscriber()}).then( - |res, act, ctx| { + let addr: Addr = ctx.address(); + ctx.state().addr.call(server::Connect{addr: addr.subscriber()}) + .into_actor(self) + .then(|res, act, ctx| { match res { Ok(res) => act.id = res, // something is wrong with chat server @@ -109,17 +109,19 @@ impl Handler for WsChatSession { "/list" => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - ctx.state().addr.call(self, server::ListRooms).then(|res, _, ctx| { - match res { - Ok(rooms) => { - for room in rooms { - ctx.text(room); - } - }, - _ => println!("Something is wrong"), - } - fut::ok(()) - }).wait(ctx) + ctx.state().addr.call(server::ListRooms) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(rooms) => { + for room in rooms { + ctx.text(room); + } + }, + _ => println!("Something is wrong"), + } + fut::ok(()) + }).wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list // of rooms back @@ -172,7 +174,7 @@ fn main() { let sys = actix::System::new("websocket-example"); // Start chat server actor in separate thread - let server: Addr> = Arbiter::start(|_| server::ChatServer::default()); + let server: Addr = Arbiter::start(|_| server::ChatServer::default()); // Start tcp server in separate thread let srv = server.clone(); diff --git a/examples/websocket-chat/src/server.rs b/examples/websocket-chat/src/server.rs index b38fb4601..6cae978f6 100644 --- a/examples/websocket-chat/src/server.rs +++ b/examples/websocket-chat/src/server.rs @@ -15,7 +15,7 @@ use session; #[derive(Message)] #[rtype(usize)] pub struct Connect { - pub addr: SyncSubscriber, + pub addr: Subscriber, } /// Session is disconnected @@ -54,7 +54,7 @@ pub struct Join { /// `ChatServer` manages chat rooms and responsible for coordinating chat session. /// implementation is super primitive pub struct ChatServer { - sessions: HashMap>, + sessions: HashMap>, rooms: HashMap>, rng: RefCell, } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 50c2701ef..a5642db9f 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -24,7 +24,7 @@ pub struct ChatSession { /// unique session id id: usize, /// this is address of chat server - addr: Addr>, + addr: Addr, /// Client must send ping at least once per 10 seconds, otherwise we drop connection. hb: Instant, /// joined room @@ -45,8 +45,9 @@ impl Actor for ChatSession { // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. - let addr: Addr> = ctx.address(); - self.addr.call(self, server::Connect{addr: addr.subscriber()}) + let addr: Addr = ctx.address(); + self.addr.call(server::Connect{addr: addr.subscriber()}) + .into_actor(self) .then(|res, act, ctx| { match res { Ok(res) => act.id = res, @@ -75,15 +76,17 @@ impl StreamHandler for ChatSession { ChatRequest::List => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - self.addr.call(self, server::ListRooms).then(|res, act, ctx| { - match res { - Ok(rooms) => { - act.framed.write(ChatResponse::Rooms(rooms)); - }, - _ => println!("Something is wrong"), - } - actix::fut::ok(()) - }).wait(ctx) + self.addr.call(server::ListRooms) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(rooms) => { + act.framed.write(ChatResponse::Rooms(rooms)); + }, + _ => println!("Something is wrong"), + } + actix::fut::ok(()) + }).wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list of rooms back }, @@ -121,7 +124,7 @@ impl Handler for ChatSession { /// Helper methods impl ChatSession { - pub fn new(addr: Addr>, + pub fn new(addr: Addr, framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession { ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned(), framed: framed} @@ -155,11 +158,11 @@ impl ChatSession { /// Define tcp server that will accept incoming tcp connection and create /// chat actors. pub struct TcpServer { - chat: Addr>, + chat: Addr, } impl TcpServer { - pub fn new(s: &str, chat: Addr>) { + pub fn new(s: &str, chat: Addr) { // Create server listener let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); From 80285f2a322a126a23c4cd14449d3da9249e275b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 18:38:13 -0800 Subject: [PATCH 06/12] fix doc test --- src/client/connector.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/connector.rs b/src/client/connector.rs index 8aa9322a4..af708bbbe 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -139,7 +139,7 @@ impl ClientConnector { /// let conn: Address<_> = ClientConnector::with_connector(ssl_conn).start(); /// /// Arbiter::handle().spawn({ - /// conn.call_fut( + /// conn.call( /// Connect::new("https://www.rust-lang.org").unwrap()) // <- connect to host /// .map_err(|_| ()) /// .and_then(|res| { From eb041de36d99935903cf8cc70790a352a9876bd2 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 19:15:39 -0800 Subject: [PATCH 07/12] update examples --- .travis.yml | 1 + examples/diesel/src/main.rs | 4 ++-- examples/state/src/main.rs | 2 +- examples/websocket/src/client.rs | 2 +- src/server/srv.rs | 8 ++++---- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6b4c8fd7b..e7ed24526 100644 --- a/.travis.yml +++ b/.travis.yml @@ -57,6 +57,7 @@ script: cd examples/hello-world && cargo check && cd ../.. cd examples/multipart && cargo check && cd ../.. cd examples/json && cargo check && cd ../.. + cd examples/state && cargo check && cd ../.. cd examples/template_tera && cargo check && cd ../.. cd examples/diesel && cargo check && cd ../.. cd examples/tls && cargo check && cd ../.. diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 881cbe1c5..4f61fa0b4 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -31,14 +31,14 @@ use db::{CreateUser, DbExecutor}; /// State with DbExecutor address struct State { - db: Addr>, + db: Addr, } /// Async request handler fn index(req: HttpRequest) -> Box> { let name = &req.match_info()["name"]; - req.state().db.call_fut(CreateUser{name: name.to_owned()}) + req.state().db.call(CreateUser{name: name.to_owned()}) .from_err() .and_then(|res| { match res { diff --git a/examples/state/src/main.rs b/examples/state/src/main.rs index 5bb7e5043..21eb50483 100644 --- a/examples/state/src/main.rs +++ b/examples/state/src/main.rs @@ -44,7 +44,7 @@ impl Handler for MyWebSocket { println!("WS({}): {:?}", self.counter, msg); match msg { ws::Message::Ping(msg) => ctx.pong(&msg), - ws::Message::Text(text) => ctx.text(&text), + ws::Message::Text(text) => ctx.text(text), ws::Message::Binary(bin) => ctx.binary(bin), ws::Message::Closed | ws::Message::Error => { ctx.stop(); diff --git a/examples/websocket/src/client.rs b/examples/websocket/src/client.rs index 3eba7277e..38a7bd5eb 100644 --- a/examples/websocket/src/client.rs +++ b/examples/websocket/src/client.rs @@ -28,7 +28,7 @@ fn main() { () }) .map(|(reader, writer)| { - let addr: Addr> = ChatClient::create(|ctx| { + let addr: Addr = ChatClient::create(|ctx| { ChatClient::add_stream(reader, ctx); ChatClient(writer) }); diff --git a/src/server/srv.rs b/src/server/srv.rs index a41e17c7f..69b46bc7f 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -333,7 +333,7 @@ impl HttpServer impl HttpServer { /// Start listening for incoming tls connections. - pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { + pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) } else { @@ -350,7 +350,7 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: SyncAddress<_> = Actor::start(self); + let addr: Addr = Actor::start(self); signals.map(|signals| signals.send( signal::Subscribe(addr.clone().subscriber()))); Ok(addr) @@ -364,7 +364,7 @@ impl HttpServer /// Start listening for incoming tls connections. /// /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn start_ssl(mut self, mut builder: SslAcceptorBuilder) -> io::Result> + pub fn start_ssl(mut self, mut builder: SslAcceptorBuilder) -> io::Result> { if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) @@ -394,7 +394,7 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: SyncAddress<_> = Actor::start(self); + let addr: Addr = Actor::start(self); signals.map(|signals| signals.send( signal::Subscribe(addr.clone().subscriber()))); Ok(addr) From 4b8181476c8f6ccedf85f20e9085ae607197e7d8 Mon Sep 17 00:00:00 2001 From: Christopher Armstrong Date: Mon, 12 Feb 2018 23:55:44 -0600 Subject: [PATCH 08/12] consistently use `#[cause]` and display causing errors (#73) --- src/error.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/error.rs b/src/error.rs index d166636a7..da6745fd5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -403,8 +403,8 @@ pub enum UrlencodedError { #[fail(display="Content type error")] ContentType, /// Payload error - #[fail(display="Error that occur during reading payload")] - Payload(PayloadError), + #[fail(display="Error that occur during reading payload: {}", _0)] + Payload(#[cause] PayloadError), } /// Return `BadRequest` for `UrlencodedError` @@ -435,11 +435,11 @@ pub enum JsonPayloadError { #[fail(display="Content type error")] ContentType, /// Deserialize error - #[fail(display="Json deserialize error")] - Deserialize(JsonError), + #[fail(display="Json deserialize error: {}", _0)] + Deserialize(#[cause] JsonError), /// Payload error - #[fail(display="Error that occur during reading payload")] - Payload(PayloadError), + #[fail(display="Error that occur during reading payload: {}", _0)] + Payload(#[cause] PayloadError), } /// Return `BadRequest` for `UrlencodedError` From a544034c06cde998f621044199531492f92c4927 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 22:09:31 -0800 Subject: [PATCH 09/12] use Recipient --- src/server/srv.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/server/srv.rs b/src/server/srv.rs index 69b46bc7f..2df542883 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -290,7 +290,7 @@ impl HttpServer let signals = self.subscribe_to_signals(); let addr: Addr = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().subscriber()))); + signal::Subscribe(addr.clone().recipient()))); addr } } @@ -352,7 +352,7 @@ impl HttpServer let signals = self.subscribe_to_signals(); let addr: Addr = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().subscriber()))); + signal::Subscribe(addr.clone().recipient()))); Ok(addr) } } @@ -396,7 +396,7 @@ impl HttpServer let signals = self.subscribe_to_signals(); let addr: Addr = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().subscriber()))); + signal::Subscribe(addr.clone().recipient()))); Ok(addr) } } @@ -443,7 +443,7 @@ impl HttpServer self }); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().subscriber()))); + signal::Subscribe(addr.clone().recipient()))); addr } } From b1eec3131fda1c05a08f2b6e39f14f5cc3d2960b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 22:56:47 -0800 Subject: [PATCH 10/12] use newer api --- guide/src/qs_10.md | 2 +- guide/src/qs_3_5.md | 6 +++--- guide/src/qs_4.md | 4 ++-- src/client/connector.rs | 4 ++-- src/server/srv.rs | 16 ++++++++-------- src/server/worker.rs | 4 ++-- src/test.rs | 2 +- src/ws/client.rs | 2 +- tests/test_server.rs | 4 ++-- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/guide/src/qs_10.md b/guide/src/qs_10.md index 1334ecdbe..b3c5a8e06 100644 --- a/guide/src/qs_10.md +++ b/guide/src/qs_10.md @@ -200,7 +200,7 @@ fn main() { ))) .bind("127.0.0.1:59880").unwrap() .start(); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); # let _ = sys.run(); } ``` diff --git a/guide/src/qs_3_5.md b/guide/src/qs_3_5.md index ef35973d4..99c2bcd9a 100644 --- a/guide/src/qs_3_5.md +++ b/guide/src/qs_3_5.md @@ -24,7 +24,7 @@ fn main() { .bind("127.0.0.1:59080").unwrap() .start(); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); let _ = sys.run(); } ``` @@ -52,7 +52,7 @@ use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); - + thread::spawn(move || { let sys = actix::System::new("http-server"); let addr = HttpServer::new( @@ -66,7 +66,7 @@ fn main() { }); let addr = rx.recv().unwrap(); - let _ = addr.call( + let _ = addr.send( server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. } ``` diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index a8424a4a0..c7cbc6c94 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -109,7 +109,7 @@ fn main() { .start(); println!("Started http server: 127.0.0.1:8088"); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); let _ = sys.run(); } ``` @@ -167,7 +167,7 @@ fn main() { .start(); println!("Started http server: 127.0.0.1:8088"); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); let _ = sys.run(); } ``` diff --git a/src/client/connector.rs b/src/client/connector.rs index af708bbbe..2d49a06d6 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -146,7 +146,7 @@ impl ClientConnector { /// if let Ok(mut stream) = res { /// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap(); /// } - /// # Arbiter::system().send(actix::msgs::SystemExit(0)); + /// # Arbiter::system().do_send(actix::msgs::SystemExit(0)); /// Ok(()) /// }) /// }); @@ -191,7 +191,7 @@ impl Handler for ClientConnector { ActorResponse::async( Connector::from_registry() - .call(ResolveConnect::host_and_port(&host, port)) + .send(ResolveConnect::host_and_port(&host, port)) .into_actor(self) .map_err(|_, _, _| ClientConnectorError::Disconnected) .and_then(move |res, _act, _| { diff --git a/src/server/srv.rs b/src/server/srv.rs index 2df542883..3b828a1c5 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -264,7 +264,7 @@ impl HttpServer /// .resource("/", |r| r.h(httpcodes::HTTPOk))) /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") /// .start(); - /// # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); + /// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); /// /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// } @@ -289,7 +289,7 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); let addr: Addr = Actor::start(self); - signals.map(|signals| signals.send( + signals.map(|signals| signals.do_send( signal::Subscribe(addr.clone().recipient()))); addr } @@ -351,7 +351,7 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); let addr: Addr = Actor::start(self); - signals.map(|signals| signals.send( + signals.map(|signals| signals.do_send( signal::Subscribe(addr.clone().recipient()))); Ok(addr) } @@ -395,7 +395,7 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); let addr: Addr = Actor::start(self); - signals.map(|signals| signals.send( + signals.map(|signals| signals.do_send( signal::Subscribe(addr.clone().recipient()))); Ok(addr) } @@ -442,7 +442,7 @@ impl HttpServer .map(move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); self }); - signals.map(|signals| signals.send( + signals.map(|signals| signals.do_send( signal::Subscribe(addr.clone().recipient()))); addr } @@ -536,7 +536,7 @@ impl Handler for HttpServer }; for worker in &self.workers { let tx2 = tx.clone(); - let fut = worker.call(StopWorker{graceful: dur}).into_actor(self); + let fut = worker.send(StopWorker{graceful: dur}).into_actor(self); ActorFuture::then(fut, move |_, slf, _| { slf.workers.pop(); if slf.workers.is_empty() { @@ -544,7 +544,7 @@ impl Handler for HttpServer // we need to stop system if server was spawned if slf.exit { - Arbiter::system().send(actix::msgs::SystemExit(0)) + Arbiter::system().do_send(actix::msgs::SystemExit(0)) } } actix::fut::ok(()) @@ -557,7 +557,7 @@ impl Handler for HttpServer } else { // we need to stop system if server was spawned if self.exit { - Arbiter::system().send(actix::msgs::SystemExit(0)) + Arbiter::system().do_send(actix::msgs::SystemExit(0)) } Response::reply(Ok(())) } diff --git a/src/server/worker.rs b/src/server/worker.rs index 34c5324ff..23e8a6c61 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -78,14 +78,14 @@ impl Worker { let num = slf.settings.num_channels(); if num == 0 { let _ = tx.send(true); - Arbiter::arbiter().send(StopArbiter(0)); + Arbiter::arbiter().do_send(StopArbiter(0)); } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { slf.shutdown_timeout(ctx, tx, d); } else { info!("Force shutdown http worker, {} connections", num); slf.settings.head().traverse::(); let _ = tx.send(false); - Arbiter::arbiter().send(StopArbiter(0)); + Arbiter::arbiter().do_send(StopArbiter(0)); } }); } diff --git a/src/test.rs b/src/test.rs index 48c1d9508..29f01ab31 100644 --- a/src/test.rs +++ b/src/test.rs @@ -165,7 +165,7 @@ impl TestServer { /// Stop http server fn stop(&mut self) { if let Some(handle) = self.thread.take() { - self.server_sys.send(msgs::SystemExit(0)); + self.server_sys.do_send(msgs::SystemExit(0)); let _ = handle.join(); } } diff --git a/src/ws/client.rs b/src/ws/client.rs index 4201c20e2..ebb2f3849 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -200,7 +200,7 @@ impl WsClient { // get connection and start handshake Ok(Box::new( - self.conn.call(Connect(request.uri().clone())) + self.conn.send(Connect(request.uri().clone())) .map_err(|_| WsClientError::Disconnected) .and_then(|res| match res { Ok(stream) => Either::A(WsHandshake::new(stream, request)), diff --git a/tests/test_server.rs b/tests/test_server.rs index 6c784ca9c..e84c9211b 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -72,12 +72,12 @@ fn test_start() { assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); // pause - let _ = srv_addr.call(server::PauseServer).wait(); + let _ = srv_addr.send(server::PauseServer).wait(); thread::sleep(time::Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); // resume - let _ = srv_addr.call(server::ResumeServer).wait(); + let _ = srv_addr.send(server::ResumeServer).wait(); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); } From 96b87761d10dc238485590709bd563bea116b0a5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 23:13:06 -0800 Subject: [PATCH 11/12] update examples --- examples/diesel/src/main.rs | 2 +- examples/websocket-chat/src/client.rs | 4 ++-- examples/websocket-chat/src/main.rs | 12 ++++++------ examples/websocket-chat/src/server.rs | 6 +++--- examples/websocket-chat/src/session.rs | 12 ++++++------ examples/websocket/src/client.rs | 4 ++-- 6 files changed, 20 insertions(+), 20 deletions(-) diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 4f61fa0b4..75c201558 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -38,7 +38,7 @@ struct State { fn index(req: HttpRequest) -> Box> { let name = &req.match_info()["name"]; - req.state().db.call(CreateUser{name: name.to_owned()}) + req.state().db.send(CreateUser{name: name.to_owned()}) .from_err() .and_then(|res| { match res { diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index d3b556b6f..4b69e6bf5 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -45,7 +45,7 @@ fn main() { return } - addr.send(ClientCommand(cmd)); + addr.do_send(ClientCommand(cmd)); } }); @@ -81,7 +81,7 @@ impl Actor for ChatClient { println!("Disconnected"); // Stop application on disconnect - Arbiter::system().send(actix::msgs::SystemExit(0)); + Arbiter::system().do_send(actix::msgs::SystemExit(0)); true } diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 88e8590ef..4fefedc55 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -63,7 +63,7 @@ impl Actor for WsChatSession { // HttpContext::state() is instance of WsChatSessionState, state is shared across all // routes within application let addr: Addr = ctx.address(); - ctx.state().addr.call(server::Connect{addr: addr.subscriber()}) + ctx.state().addr.send(server::Connect{addr: addr.recipient()}) .into_actor(self) .then(|res, act, ctx| { match res { @@ -77,7 +77,7 @@ impl Actor for WsChatSession { fn stopping(&mut self, ctx: &mut Self::Context) -> bool { // notify chat server - ctx.state().addr.send(server::Disconnect{id: self.id}); + ctx.state().addr.do_send(server::Disconnect{id: self.id}); true } } @@ -109,7 +109,7 @@ impl Handler for WsChatSession { "/list" => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - ctx.state().addr.call(server::ListRooms) + ctx.state().addr.send(server::ListRooms) .into_actor(self) .then(|res, _, ctx| { match res { @@ -129,7 +129,7 @@ impl Handler for WsChatSession { "/join" => { if v.len() == 2 { self.room = v[1].to_owned(); - ctx.state().addr.send( + ctx.state().addr.do_send( server::Join{id: self.id, name: self.room.clone()}); ctx.text("joined"); @@ -153,7 +153,7 @@ impl Handler for WsChatSession { m.to_owned() }; // send message to chat server - ctx.state().addr.send( + ctx.state().addr.do_send( server::Message{id: self.id, msg: msg, room: self.room.clone()}) @@ -178,7 +178,7 @@ fn main() { // Start tcp server in separate thread let srv = server.clone(); - Arbiter::new("tcp-server").send::( + Arbiter::new("tcp-server").do_send::( msgs::Execute::new(move || { session::TcpServer::new("127.0.0.1:12345", srv); Ok(()) diff --git a/examples/websocket-chat/src/server.rs b/examples/websocket-chat/src/server.rs index 6cae978f6..8b735b852 100644 --- a/examples/websocket-chat/src/server.rs +++ b/examples/websocket-chat/src/server.rs @@ -15,7 +15,7 @@ use session; #[derive(Message)] #[rtype(usize)] pub struct Connect { - pub addr: Subscriber, + pub addr: Recipient, } /// Session is disconnected @@ -54,7 +54,7 @@ pub struct Join { /// `ChatServer` manages chat rooms and responsible for coordinating chat session. /// implementation is super primitive pub struct ChatServer { - sessions: HashMap>, + sessions: HashMap>, rooms: HashMap>, rng: RefCell, } @@ -80,7 +80,7 @@ impl ChatServer { for id in sessions { if *id != skip_id { if let Some(addr) = self.sessions.get(id) { - let _ = addr.send(session::Message(message.to_owned())); + let _ = addr.do_send(session::Message(message.to_owned())); } } } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index a5642db9f..a1a86bbbe 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -46,7 +46,7 @@ impl Actor for ChatSession { // future within context, but context waits until this future resolves // before processing any other events. let addr: Addr = ctx.address(); - self.addr.call(server::Connect{addr: addr.subscriber()}) + self.addr.send(server::Connect{addr: addr.recipient()}) .into_actor(self) .then(|res, act, ctx| { match res { @@ -60,7 +60,7 @@ impl Actor for ChatSession { fn stopping(&mut self, ctx: &mut Self::Context) -> bool { // notify chat server - self.addr.send(server::Disconnect{id: self.id}); + self.addr.do_send(server::Disconnect{id: self.id}); true } } @@ -76,7 +76,7 @@ impl StreamHandler for ChatSession { ChatRequest::List => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - self.addr.call(server::ListRooms) + self.addr.send(server::ListRooms) .into_actor(self) .then(|res, act, ctx| { match res { @@ -93,13 +93,13 @@ impl StreamHandler for ChatSession { ChatRequest::Join(name) => { println!("Join to room: {}", name); self.room = name.clone(); - self.addr.send(server::Join{id: self.id, name: name.clone()}); + self.addr.do_send(server::Join{id: self.id, name: name.clone()}); self.framed.write(ChatResponse::Joined(name)); }, ChatRequest::Message(message) => { // send message to chat server println!("Peer message: {}", message); - self.addr.send( + self.addr.do_send( server::Message{id: self.id, msg: message, room: self.room.clone()}) @@ -141,7 +141,7 @@ impl ChatSession { println!("Client heartbeat failed, disconnecting!"); // notify chat server - act.addr.send(server::Disconnect{id: act.id}); + act.addr.do_send(server::Disconnect{id: act.id}); // stop actor ctx.stop(); diff --git a/examples/websocket/src/client.rs b/examples/websocket/src/client.rs index 38a7bd5eb..32b96873a 100644 --- a/examples/websocket/src/client.rs +++ b/examples/websocket/src/client.rs @@ -41,7 +41,7 @@ fn main() { println!("error"); return } - addr.send(ClientCommand(cmd)); + addr.do_send(ClientCommand(cmd)); } }); @@ -70,7 +70,7 @@ impl Actor for ChatClient { println!("Stopping"); // Stop application on disconnect - Arbiter::system().send(actix::msgs::SystemExit(0)); + Arbiter::system().do_send(actix::msgs::SystemExit(0)); true } From 8f9ec5c23cb675f3813c7859382cb6eaa8a58591 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 13 Feb 2018 07:50:49 -0800 Subject: [PATCH 12/12] fix doc test --- src/client/connector.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/connector.rs b/src/client/connector.rs index 2d49a06d6..fe5fa75c7 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -139,7 +139,7 @@ impl ClientConnector { /// let conn: Address<_> = ClientConnector::with_connector(ssl_conn).start(); /// /// Arbiter::handle().spawn({ - /// conn.call( + /// conn.send( /// Connect::new("https://www.rust-lang.org").unwrap()) // <- connect to host /// .map_err(|_| ()) /// .and_then(|res| {