From 8c1b5fa94549d6710b2b4fb3776058ceeba19604 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 12:17:30 -0800 Subject: [PATCH] sync with latest actix --- Cargo.toml | 2 +- examples/diesel/src/db.rs | 7 +++---- examples/diesel/src/main.rs | 2 +- examples/websocket-chat/src/client.rs | 2 +- examples/websocket-chat/src/codec.rs | 15 ++------------ examples/websocket-chat/src/main.rs | 13 ++++++------ examples/websocket-chat/src/server.rs | 21 +++++++------------ examples/websocket-chat/src/session.rs | 16 +++++++-------- examples/websocket/src/client.rs | 2 +- src/client/connector.rs | 19 +++++++++-------- src/context.rs | 10 ++++----- src/server/mod.rs | 6 +++++- src/server/srv.rs | 28 +++++--------------------- src/server/worker.rs | 10 +++++---- src/ws/context.rs | 6 +++--- src/ws/mod.rs | 9 ++------- 16 files changed, 65 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 96104a70..36301036 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ openssl = { version="0.10", optional = true } tokio-openssl = { version="0.2", optional = true } [dependencies.actix] -#version = "^0.4.6" +#version = "0.5" git = "https://github.com/actix/actix.git" [dev-dependencies] diff --git a/examples/diesel/src/db.rs b/examples/diesel/src/db.rs index 4ca85130..afbf43de 100644 --- a/examples/diesel/src/db.rs +++ b/examples/diesel/src/db.rs @@ -17,9 +17,8 @@ pub struct CreateUser { pub name: String, } -impl ResponseType for CreateUser { - type Item = models::User; - type Error = Error; +impl Message for CreateUser { + type Result = Result; } impl Actor for DbExecutor { @@ -27,7 +26,7 @@ impl Actor for DbExecutor { } impl Handler for DbExecutor { - type Result = MessageResult; + type Result = Result; fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result { use self::schema::users::dsl::*; diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 4c4bc4cd..881cbe1c 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -31,7 +31,7 @@ use db::{CreateUser, DbExecutor}; /// State with DbExecutor address struct State { - db: SyncAddress, + db: Addr>, } /// Async request handler diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 5da1f37f..4fe18d8e 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -29,7 +29,7 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: SyncAddress<_> = ChatClient::create(|ctx| { + let addr: Addr> = ChatClient::create(|ctx| { let (r, w) = stream.split(); ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); ChatClient{ diff --git a/examples/websocket-chat/src/codec.rs b/examples/websocket-chat/src/codec.rs index 718c3c82..03638241 100644 --- a/examples/websocket-chat/src/codec.rs +++ b/examples/websocket-chat/src/codec.rs @@ -4,10 +4,9 @@ use serde_json as json; use byteorder::{BigEndian , ByteOrder}; use bytes::{BytesMut, BufMut}; use tokio_io::codec::{Encoder, Decoder}; -use actix::ResponseType; /// Client request -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Message)] #[serde(tag="cmd", content="data")] pub enum ChatRequest { /// List rooms @@ -20,13 +19,8 @@ pub enum ChatRequest { Ping } -impl ResponseType for ChatRequest { - type Item = (); - type Error = (); -} - /// Server response -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Message)] #[serde(tag="cmd", content="data")] pub enum ChatResponse { Ping, @@ -41,11 +35,6 @@ pub enum ChatResponse { Message(String), } -impl ResponseType for ChatResponse { - type Item = (); - type Error = (); -} - /// Codec for Client -> Server transport pub struct ChatCodec; diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 0252f141..27957f69 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -26,7 +26,7 @@ mod session; /// This is our websocket route state, this state is shared with all route instances /// via `HttpContext::state()` struct WsChatSessionState { - addr: SyncAddress, + addr: Addr>, } /// Entry point for our route @@ -62,12 +62,12 @@ impl Actor for WsChatSession { // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared across all // routes within application - let addr: SyncAddress<_> = ctx.address(); + let addr: Addr> = ctx.address(); ctx.state().addr.call( - self, server::Connect{addr: addr.into()}).then( + self, server::Connect{addr: addr.subscriber()}).then( |res, act, ctx| { match res { - Ok(Ok(res)) => act.id = res, + Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } @@ -111,7 +111,7 @@ impl Handler for WsChatSession { println!("List rooms"); ctx.state().addr.call(self, server::ListRooms).then(|res, _, ctx| { match res { - Ok(Ok(rooms)) => { + Ok(rooms) => { for room in rooms { ctx.text(room); } @@ -172,8 +172,7 @@ fn main() { let sys = actix::System::new("websocket-example"); // Start chat server actor in separate thread - let server: SyncAddress<_> = - Arbiter::start(|_| server::ChatServer::default()); + let server: Addr> = Arbiter::start(|_| server::ChatServer::default()); // Start tcp server in separate thread let srv = server.clone(); diff --git a/examples/websocket-chat/src/server.rs b/examples/websocket-chat/src/server.rs index 477a401b..b38fb460 100644 --- a/examples/websocket-chat/src/server.rs +++ b/examples/websocket-chat/src/server.rs @@ -12,18 +12,12 @@ use session; /// Message for chat server communications /// New chat session is created +#[derive(Message)] +#[rtype(usize)] pub struct Connect { pub addr: SyncSubscriber, } -/// Response type for Connect message -/// -/// Chat server returns unique session id -impl ResponseType for Connect { - type Item = usize; - type Error = (); -} - /// Session is disconnected #[derive(Message)] pub struct Disconnect { @@ -44,9 +38,8 @@ pub struct Message { /// List of available rooms pub struct ListRooms; -impl ResponseType for ListRooms { - type Item = Vec; - type Error = (); +impl actix::Message for ListRooms { + type Result = Vec; } /// Join room, if room does not exists create new one. @@ -106,7 +99,7 @@ impl Actor for ChatServer { /// /// Register new session and assign unique id to this session impl Handler for ChatServer { - type Result = MessageResult; + type Result = usize; fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { println!("Someone joined"); @@ -122,7 +115,7 @@ impl Handler for ChatServer { self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); // send id back - Ok(id) + id } } @@ -171,7 +164,7 @@ impl Handler for ChatServer { rooms.push(key.to_owned()) } - Ok(rooms) + MessageResult(rooms) } } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 29fcb989..50c2701e 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -24,7 +24,7 @@ pub struct ChatSession { /// unique session id id: usize, /// this is address of chat server - addr: SyncAddress, + addr: Addr>, /// Client must send ping at least once per 10 seconds, otherwise we drop connection. hb: Instant, /// joined room @@ -45,11 +45,11 @@ impl Actor for ChatSession { // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. - let addr: SyncAddress<_> = ctx.address(); - self.addr.call(self, server::Connect{addr: addr.into()}) + let addr: Addr> = ctx.address(); + self.addr.call(self, server::Connect{addr: addr.subscriber()}) .then(|res, act, ctx| { match res { - Ok(Ok(res)) => act.id = res, + Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } @@ -77,7 +77,7 @@ impl StreamHandler for ChatSession { println!("List rooms"); self.addr.call(self, server::ListRooms).then(|res, act, ctx| { match res { - Ok(Ok(rooms)) => { + Ok(rooms) => { act.framed.write(ChatResponse::Rooms(rooms)); }, _ => println!("Something is wrong"), @@ -121,7 +121,7 @@ impl Handler for ChatSession { /// Helper methods impl ChatSession { - pub fn new(addr: SyncAddress, + pub fn new(addr: Addr>, framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession { ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned(), framed: framed} @@ -155,11 +155,11 @@ impl ChatSession { /// Define tcp server that will accept incoming tcp connection and create /// chat actors. pub struct TcpServer { - chat: SyncAddress, + chat: Addr>, } impl TcpServer { - pub fn new(s: &str, chat: SyncAddress) { + pub fn new(s: &str, chat: Addr>) { // Create server listener let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); diff --git a/examples/websocket/src/client.rs b/examples/websocket/src/client.rs index 31fe614d..3eba7277 100644 --- a/examples/websocket/src/client.rs +++ b/examples/websocket/src/client.rs @@ -28,7 +28,7 @@ fn main() { () }) .map(|(reader, writer)| { - let addr: SyncAddress<_> = ChatClient::create(|ctx| { + let addr: Addr> = ChatClient::create(|ctx| { ChatClient::add_stream(reader, ctx); ChatClient(writer) }); diff --git a/src/client/connector.rs b/src/client/connector.rs index 6ecf91f7..2a0466cf 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -5,7 +5,7 @@ use std::collections::VecDeque; use std::time::Duration; use actix::{fut, Actor, ActorFuture, Arbiter, Context, - Handler, Response, ResponseType, Supervised}; + Handler, Message, ActorResponse, Supervised}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; @@ -37,9 +37,8 @@ impl Connect { } } -impl ResponseType for Connect { - type Item = Connection; - type Error = ClientConnectorError; +impl Message for Connect { + type Result = Result; } /// A set of errors that can occur during connecting to a http host @@ -163,34 +162,34 @@ impl ClientConnector { } impl Handler for ClientConnector { - type Result = Response; + type Result = ActorResponse; fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { let uri = &msg.0; // host name is required if uri.host().is_none() { - return Response::reply(Err(ClientConnectorError::InvalidUrl)) + return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)) } // supported protocols let proto = match uri.scheme_part() { Some(scheme) => match Protocol::from(scheme.as_str()) { Some(proto) => proto, - None => return Response::reply(Err(ClientConnectorError::InvalidUrl)), + None => return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)), }, - None => return Response::reply(Err(ClientConnectorError::InvalidUrl)), + None => return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)), }; // check ssl availability if proto.is_secure() && !HAS_OPENSSL { //&& !HAS_TLS { - return Response::reply(Err(ClientConnectorError::SslIsNotSupported)) + return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported)) } let host = uri.host().unwrap().to_owned(); let port = uri.port().unwrap_or_else(|| proto.port()); - Response::async_reply( + ActorResponse::async( Connector::from_registry() .call(self, ResolveConnect::host_and_port(&host, port)) .map_err(|_, _, _| ClientConnectorError::Disconnected) diff --git a/src/context.rs b/src/context.rs index 28cf5d7d..7603ea05 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,4 +1,4 @@ -use std; +use std::mem; use std::marker::PhantomData; use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; @@ -6,7 +6,7 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Addr, Handler, ResponseType, MessageResult, SpawnHandle, Syn, Unsync}; + Addr, Handler, Message, SpawnHandle, Syn, Unsync}; use actix::fut::ActorFuture; use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; @@ -184,7 +184,7 @@ impl ActorHttpContext for HttpContext where A: Actor, fn poll(&mut self) -> Poll>, Error> { let ctx: &mut HttpContext = unsafe { - std::mem::transmute(self as &mut HttpContext) + mem::transmute(self as &mut HttpContext) }; if self.inner.alive() { @@ -207,9 +207,9 @@ impl ActorHttpContext for HttpContext where A: Actor, impl ToEnvelope, M> for HttpContext where A: Actor> + Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, + M: Message + Send + 'static, M::Result: Send, { - fn pack(msg: M, tx: Option>>) -> Syn { + fn pack(msg: M, tx: Option>) -> Syn { Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 39df2fc8..1a7b846b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,6 +2,7 @@ use std::{time, io}; use std::net::Shutdown; +use actix; use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; @@ -43,11 +44,14 @@ pub struct ResumeServer; /// Stop incoming connection processing, stop all workers and exit. /// /// If server starts with `spawn()` method, then spawned thread get terminated. -#[derive(Message)] pub struct StopServer { pub graceful: bool } +impl actix::Message for StopServer { + type Result = Result<(), ()>; +} + /// Low level http request handler #[allow(unused_variables)] pub trait HttpHandler: 'static { diff --git a/src/server/srv.rs b/src/server/srv.rs index 3cd6da12..4217ad5a 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -352,7 +352,7 @@ impl HttpServer let signals = self.subscribe_to_signals(); let addr: SyncAddress<_> = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + signal::Subscribe(addr.clone().subscriber()))); Ok(addr) } } @@ -396,7 +396,7 @@ impl HttpServer let signals = self.subscribe_to_signals(); let addr: SyncAddress<_> = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + signal::Subscribe(addr.clone().subscriber()))); Ok(addr) } } @@ -477,24 +477,6 @@ impl Handler for HttpServer } } -impl Handler>> for HttpServer - where T: IoStream, - H: IntoHttpHandler, -{ - type Result = (); - - fn handle(&mut self, msg: io::Result>, _: &mut Context) -> Self::Result { - match msg { - Ok(msg) => - Arbiter::handle().spawn( - HttpChannel::new( - Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)), - Err(err) => - debug!("Error handling request: {}", err), - } - } -} - impl Handler> for HttpServer where T: IoStream, H: IntoHttpHandler, @@ -535,7 +517,7 @@ impl Handler for HttpServer impl Handler for HttpServer { - type Result = actix::Response; + type Result = actix::Response<(), ()>; fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { // stop accept threads @@ -570,8 +552,8 @@ impl Handler for HttpServer } if !self.workers.is_empty() { - Response::async_reply( - rx.into_future().map(|_| ()).map_err(|_| ()).actfuture()) + Response::async( + rx.into_future().map(|_| ()).map_err(|_| ())) } else { // we need to stop system if server was spawned if self.exit { diff --git a/src/server/worker.rs b/src/server/worker.rs index 6ee48e2d..34c5324f 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -37,12 +37,14 @@ pub(crate) struct Conn { /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. -#[derive(Message)] -#[rtype(bool)] pub(crate) struct StopWorker { pub graceful: Option, } +impl Message for StopWorker { + type Result = Result; +} + /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests processing. @@ -117,7 +119,7 @@ impl Handler> for Worker impl Handler for Worker where H: HttpHandler + 'static, { - type Result = Response; + type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { let num = self.settings.num_channels(); @@ -128,7 +130,7 @@ impl Handler for Worker info!("Graceful http worker shutdown, {} connections", num); let (tx, rx) = oneshot::channel(); self.shutdown_timeout(ctx, tx, dur); - Response::async_reply(rx.map_err(|_| ()).actfuture()) + Response::async(rx.map_err(|_| ())) } else { info!("Force shutdown http worker, {} connections", num); self.settings.head().traverse::(); diff --git a/src/ws/context.rs b/src/ws/context.rs index 1eb78c7e..6792e51c 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -5,7 +5,7 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Addr, Handler, ResponseType, SpawnHandle, MessageResult, Syn, Unsync}; + Addr, Handler, Message, Syn, Unsync, SpawnHandle}; use actix::fut::ActorFuture; use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; @@ -219,9 +219,9 @@ impl ActorHttpContext for WebsocketContext where A: Actor ToEnvelope, M> for WebsocketContext where A: Actor> + Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, + M: Message + Send + 'static, M::Result: Send { - fn pack(msg: M, tx: Option>>) -> Syn { + fn pack(msg: M, tx: Option>) -> Syn { Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) } } diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 07b845cc..d9bf0f10 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -47,7 +47,7 @@ use bytes::BytesMut; use http::{Method, StatusCode, header}; use futures::{Async, Poll, Stream}; -use actix::{Actor, AsyncContext, ResponseType, Handler}; +use actix::{Actor, AsyncContext, Handler}; use body::Binary; use payload::ReadAny; @@ -74,7 +74,7 @@ const SEC_WEBSOCKET_VERSION: &str = "SEC-WEBSOCKET-VERSION"; /// `WebSocket` Message -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Message)] pub enum Message { Text(String), Binary(Binary), @@ -85,11 +85,6 @@ pub enum Message { Error } -impl ResponseType for Message { - type Item = (); - type Error = (); -} - /// Do websocket handshake and start actor pub fn start(mut req: HttpRequest, actor: A) -> Result where A: Actor> + Handler,