diff --git a/.travis.yml b/.travis.yml index 6b4c8fd7b..e7ed24526 100644 --- a/.travis.yml +++ b/.travis.yml @@ -57,6 +57,7 @@ script: cd examples/hello-world && cargo check && cd ../.. cd examples/multipart && cargo check && cd ../.. cd examples/json && cargo check && cd ../.. + cd examples/state && cargo check && cd ../.. cd examples/template_tera && cargo check && cd ../.. cd examples/diesel && cargo check && cd ../.. cd examples/tls && cargo check && cd ../.. diff --git a/Cargo.toml b/Cargo.toml index 96104a700..36301036b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ openssl = { version="0.10", optional = true } tokio-openssl = { version="0.2", optional = true } [dependencies.actix] -#version = "^0.4.6" +#version = "0.5" git = "https://github.com/actix/actix.git" [dev-dependencies] diff --git a/examples/diesel/src/db.rs b/examples/diesel/src/db.rs index 4ca85130e..afbf43dea 100644 --- a/examples/diesel/src/db.rs +++ b/examples/diesel/src/db.rs @@ -17,9 +17,8 @@ pub struct CreateUser { pub name: String, } -impl ResponseType for CreateUser { - type Item = models::User; - type Error = Error; +impl Message for CreateUser { + type Result = Result; } impl Actor for DbExecutor { @@ -27,7 +26,7 @@ impl Actor for DbExecutor { } impl Handler for DbExecutor { - type Result = MessageResult; + type Result = Result; fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result { use self::schema::users::dsl::*; diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 4c4bc4cda..75c201558 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -31,14 +31,14 @@ use db::{CreateUser, DbExecutor}; /// State with DbExecutor address struct State { - db: SyncAddress, + db: Addr, } /// Async request handler fn index(req: HttpRequest) -> Box> { let name = &req.match_info()["name"]; - req.state().db.call_fut(CreateUser{name: name.to_owned()}) + req.state().db.send(CreateUser{name: name.to_owned()}) .from_err() .and_then(|res| { match res { diff --git a/examples/state/src/main.rs b/examples/state/src/main.rs index 5bb7e5043..21eb50483 100644 --- a/examples/state/src/main.rs +++ b/examples/state/src/main.rs @@ -44,7 +44,7 @@ impl Handler for MyWebSocket { println!("WS({}): {:?}", self.counter, msg); match msg { ws::Message::Ping(msg) => ctx.pong(&msg), - ws::Message::Text(text) => ctx.text(&text), + ws::Message::Text(text) => ctx.text(text), ws::Message::Binary(bin) => ctx.binary(bin), ws::Message::Closed | ws::Message::Error => { ctx.stop(); diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 5da1f37f6..4b69e6bf5 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -29,7 +29,7 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: SyncAddress<_> = ChatClient::create(|ctx| { + let addr: Addr = ChatClient::create(|ctx| { let (r, w) = stream.split(); ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); ChatClient{ @@ -45,7 +45,7 @@ fn main() { return } - addr.send(ClientCommand(cmd)); + addr.do_send(ClientCommand(cmd)); } }); @@ -81,7 +81,7 @@ impl Actor for ChatClient { println!("Disconnected"); // Stop application on disconnect - Arbiter::system().send(actix::msgs::SystemExit(0)); + Arbiter::system().do_send(actix::msgs::SystemExit(0)); true } diff --git a/examples/websocket-chat/src/codec.rs b/examples/websocket-chat/src/codec.rs index 718c3c82f..03638241b 100644 --- a/examples/websocket-chat/src/codec.rs +++ b/examples/websocket-chat/src/codec.rs @@ -4,10 +4,9 @@ use serde_json as json; use byteorder::{BigEndian , ByteOrder}; use bytes::{BytesMut, BufMut}; use tokio_io::codec::{Encoder, Decoder}; -use actix::ResponseType; /// Client request -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Message)] #[serde(tag="cmd", content="data")] pub enum ChatRequest { /// List rooms @@ -20,13 +19,8 @@ pub enum ChatRequest { Ping } -impl ResponseType for ChatRequest { - type Item = (); - type Error = (); -} - /// Server response -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Message)] #[serde(tag="cmd", content="data")] pub enum ChatResponse { Ping, @@ -41,11 +35,6 @@ pub enum ChatResponse { Message(String), } -impl ResponseType for ChatResponse { - type Item = (); - type Error = (); -} - /// Codec for Client -> Server transport pub struct ChatCodec; diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 0252f141e..4fefedc55 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -26,7 +26,7 @@ mod session; /// This is our websocket route state, this state is shared with all route instances /// via `HttpContext::state()` struct WsChatSessionState { - addr: SyncAddress, + addr: Addr, } /// Entry point for our route @@ -62,12 +62,12 @@ impl Actor for WsChatSession { // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared across all // routes within application - let addr: SyncAddress<_> = ctx.address(); - ctx.state().addr.call( - self, server::Connect{addr: addr.into()}).then( - |res, act, ctx| { + let addr: Addr = ctx.address(); + ctx.state().addr.send(server::Connect{addr: addr.recipient()}) + .into_actor(self) + .then(|res, act, ctx| { match res { - Ok(Ok(res)) => act.id = res, + Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } @@ -77,7 +77,7 @@ impl Actor for WsChatSession { fn stopping(&mut self, ctx: &mut Self::Context) -> bool { // notify chat server - ctx.state().addr.send(server::Disconnect{id: self.id}); + ctx.state().addr.do_send(server::Disconnect{id: self.id}); true } } @@ -109,17 +109,19 @@ impl Handler for WsChatSession { "/list" => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - ctx.state().addr.call(self, server::ListRooms).then(|res, _, ctx| { - match res { - Ok(Ok(rooms)) => { - for room in rooms { - ctx.text(room); - } - }, - _ => println!("Something is wrong"), - } - fut::ok(()) - }).wait(ctx) + ctx.state().addr.send(server::ListRooms) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(rooms) => { + for room in rooms { + ctx.text(room); + } + }, + _ => println!("Something is wrong"), + } + fut::ok(()) + }).wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list // of rooms back @@ -127,7 +129,7 @@ impl Handler for WsChatSession { "/join" => { if v.len() == 2 { self.room = v[1].to_owned(); - ctx.state().addr.send( + ctx.state().addr.do_send( server::Join{id: self.id, name: self.room.clone()}); ctx.text("joined"); @@ -151,7 +153,7 @@ impl Handler for WsChatSession { m.to_owned() }; // send message to chat server - ctx.state().addr.send( + ctx.state().addr.do_send( server::Message{id: self.id, msg: msg, room: self.room.clone()}) @@ -172,12 +174,11 @@ fn main() { let sys = actix::System::new("websocket-example"); // Start chat server actor in separate thread - let server: SyncAddress<_> = - Arbiter::start(|_| server::ChatServer::default()); + let server: Addr = Arbiter::start(|_| server::ChatServer::default()); // Start tcp server in separate thread let srv = server.clone(); - Arbiter::new("tcp-server").send::( + Arbiter::new("tcp-server").do_send::( msgs::Execute::new(move || { session::TcpServer::new("127.0.0.1:12345", srv); Ok(()) diff --git a/examples/websocket-chat/src/server.rs b/examples/websocket-chat/src/server.rs index 477a401be..8b735b852 100644 --- a/examples/websocket-chat/src/server.rs +++ b/examples/websocket-chat/src/server.rs @@ -12,16 +12,10 @@ use session; /// Message for chat server communications /// New chat session is created +#[derive(Message)] +#[rtype(usize)] pub struct Connect { - pub addr: SyncSubscriber, -} - -/// Response type for Connect message -/// -/// Chat server returns unique session id -impl ResponseType for Connect { - type Item = usize; - type Error = (); + pub addr: Recipient, } /// Session is disconnected @@ -44,9 +38,8 @@ pub struct Message { /// List of available rooms pub struct ListRooms; -impl ResponseType for ListRooms { - type Item = Vec; - type Error = (); +impl actix::Message for ListRooms { + type Result = Vec; } /// Join room, if room does not exists create new one. @@ -61,7 +54,7 @@ pub struct Join { /// `ChatServer` manages chat rooms and responsible for coordinating chat session. /// implementation is super primitive pub struct ChatServer { - sessions: HashMap>, + sessions: HashMap>, rooms: HashMap>, rng: RefCell, } @@ -87,7 +80,7 @@ impl ChatServer { for id in sessions { if *id != skip_id { if let Some(addr) = self.sessions.get(id) { - let _ = addr.send(session::Message(message.to_owned())); + let _ = addr.do_send(session::Message(message.to_owned())); } } } @@ -106,7 +99,7 @@ impl Actor for ChatServer { /// /// Register new session and assign unique id to this session impl Handler for ChatServer { - type Result = MessageResult; + type Result = usize; fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { println!("Someone joined"); @@ -122,7 +115,7 @@ impl Handler for ChatServer { self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); // send id back - Ok(id) + id } } @@ -171,7 +164,7 @@ impl Handler for ChatServer { rooms.push(key.to_owned()) } - Ok(rooms) + MessageResult(rooms) } } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 29fcb9895..a1a86bbbe 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -24,7 +24,7 @@ pub struct ChatSession { /// unique session id id: usize, /// this is address of chat server - addr: SyncAddress, + addr: Addr, /// Client must send ping at least once per 10 seconds, otherwise we drop connection. hb: Instant, /// joined room @@ -45,11 +45,12 @@ impl Actor for ChatSession { // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. - let addr: SyncAddress<_> = ctx.address(); - self.addr.call(self, server::Connect{addr: addr.into()}) + let addr: Addr = ctx.address(); + self.addr.send(server::Connect{addr: addr.recipient()}) + .into_actor(self) .then(|res, act, ctx| { match res { - Ok(Ok(res)) => act.id = res, + Ok(res) => act.id = res, // something is wrong with chat server _ => ctx.stop(), } @@ -59,7 +60,7 @@ impl Actor for ChatSession { fn stopping(&mut self, ctx: &mut Self::Context) -> bool { // notify chat server - self.addr.send(server::Disconnect{id: self.id}); + self.addr.do_send(server::Disconnect{id: self.id}); true } } @@ -75,28 +76,30 @@ impl StreamHandler for ChatSession { ChatRequest::List => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - self.addr.call(self, server::ListRooms).then(|res, act, ctx| { - match res { - Ok(Ok(rooms)) => { - act.framed.write(ChatResponse::Rooms(rooms)); - }, - _ => println!("Something is wrong"), - } - actix::fut::ok(()) - }).wait(ctx) + self.addr.send(server::ListRooms) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(rooms) => { + act.framed.write(ChatResponse::Rooms(rooms)); + }, + _ => println!("Something is wrong"), + } + actix::fut::ok(()) + }).wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list of rooms back }, ChatRequest::Join(name) => { println!("Join to room: {}", name); self.room = name.clone(); - self.addr.send(server::Join{id: self.id, name: name.clone()}); + self.addr.do_send(server::Join{id: self.id, name: name.clone()}); self.framed.write(ChatResponse::Joined(name)); }, ChatRequest::Message(message) => { // send message to chat server println!("Peer message: {}", message); - self.addr.send( + self.addr.do_send( server::Message{id: self.id, msg: message, room: self.room.clone()}) @@ -121,7 +124,7 @@ impl Handler for ChatSession { /// Helper methods impl ChatSession { - pub fn new(addr: SyncAddress, + pub fn new(addr: Addr, framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession { ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned(), framed: framed} @@ -138,7 +141,7 @@ impl ChatSession { println!("Client heartbeat failed, disconnecting!"); // notify chat server - act.addr.send(server::Disconnect{id: act.id}); + act.addr.do_send(server::Disconnect{id: act.id}); // stop actor ctx.stop(); @@ -155,11 +158,11 @@ impl ChatSession { /// Define tcp server that will accept incoming tcp connection and create /// chat actors. pub struct TcpServer { - chat: SyncAddress, + chat: Addr, } impl TcpServer { - pub fn new(s: &str, chat: SyncAddress) { + pub fn new(s: &str, chat: Addr) { // Create server listener let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); diff --git a/examples/websocket/src/client.rs b/examples/websocket/src/client.rs index 31fe614df..32b96873a 100644 --- a/examples/websocket/src/client.rs +++ b/examples/websocket/src/client.rs @@ -28,7 +28,7 @@ fn main() { () }) .map(|(reader, writer)| { - let addr: SyncAddress<_> = ChatClient::create(|ctx| { + let addr: Addr = ChatClient::create(|ctx| { ChatClient::add_stream(reader, ctx); ChatClient(writer) }); @@ -41,7 +41,7 @@ fn main() { println!("error"); return } - addr.send(ClientCommand(cmd)); + addr.do_send(ClientCommand(cmd)); } }); @@ -70,7 +70,7 @@ impl Actor for ChatClient { println!("Stopping"); // Stop application on disconnect - Arbiter::system().send(actix::msgs::SystemExit(0)); + Arbiter::system().do_send(actix::msgs::SystemExit(0)); true } diff --git a/guide/src/qs_10.md b/guide/src/qs_10.md index 1334ecdbe..b3c5a8e06 100644 --- a/guide/src/qs_10.md +++ b/guide/src/qs_10.md @@ -200,7 +200,7 @@ fn main() { ))) .bind("127.0.0.1:59880").unwrap() .start(); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); # let _ = sys.run(); } ``` diff --git a/guide/src/qs_3_5.md b/guide/src/qs_3_5.md index 62f21b61b..99c2bcd9a 100644 --- a/guide/src/qs_3_5.md +++ b/guide/src/qs_3_5.md @@ -24,7 +24,7 @@ fn main() { .bind("127.0.0.1:59080").unwrap() .start(); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); let _ = sys.run(); } ``` @@ -52,7 +52,7 @@ use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); - + thread::spawn(move || { let sys = actix::System::new("http-server"); let addr = HttpServer::new( @@ -66,7 +66,7 @@ fn main() { }); let addr = rx.recv().unwrap(); - let _ = addr.call_fut( + let _ = addr.send( server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. } ``` diff --git a/guide/src/qs_4.md b/guide/src/qs_4.md index e7193ae55..c7cbc6c94 100644 --- a/guide/src/qs_4.md +++ b/guide/src/qs_4.md @@ -89,8 +89,7 @@ impl Handler for MyHandler { /// Handle request fn handle(&mut self, req: HttpRequest) -> Self::Result { - let num = self.0.load(Ordering::Relaxed) + 1; - self.0.store(num, Ordering::Relaxed); + self.0.fetch_add(1, Ordering::Relaxed); httpcodes::HTTPOk.into() } } @@ -110,7 +109,7 @@ fn main() { .start(); println!("Started http server: 127.0.0.1:8088"); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); let _ = sys.run(); } ``` @@ -168,7 +167,7 @@ fn main() { .start(); println!("Started http server: 127.0.0.1:8088"); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); +# actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); let _ = sys.run(); } ``` diff --git a/src/client/connector.rs b/src/client/connector.rs index 6ecf91f73..fe5fa75c7 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -5,7 +5,7 @@ use std::collections::VecDeque; use std::time::Duration; use actix::{fut, Actor, ActorFuture, Arbiter, Context, - Handler, Response, ResponseType, Supervised}; + Handler, Message, ActorResponse, Supervised}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; @@ -37,9 +37,8 @@ impl Connect { } } -impl ResponseType for Connect { - type Item = Connection; - type Error = ClientConnectorError; +impl Message for Connect { + type Result = Result; } /// A set of errors that can occur during connecting to a http host @@ -140,14 +139,14 @@ impl ClientConnector { /// let conn: Address<_> = ClientConnector::with_connector(ssl_conn).start(); /// /// Arbiter::handle().spawn({ - /// conn.call_fut( + /// conn.send( /// Connect::new("https://www.rust-lang.org").unwrap()) // <- connect to host /// .map_err(|_| ()) /// .and_then(|res| { /// if let Ok(mut stream) = res { /// stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap(); /// } - /// # Arbiter::system().send(actix::msgs::SystemExit(0)); + /// # Arbiter::system().do_send(actix::msgs::SystemExit(0)); /// Ok(()) /// }) /// }); @@ -163,36 +162,37 @@ impl ClientConnector { } impl Handler for ClientConnector { - type Result = Response; + type Result = ActorResponse; fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { let uri = &msg.0; // host name is required if uri.host().is_none() { - return Response::reply(Err(ClientConnectorError::InvalidUrl)) + return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)) } // supported protocols let proto = match uri.scheme_part() { Some(scheme) => match Protocol::from(scheme.as_str()) { Some(proto) => proto, - None => return Response::reply(Err(ClientConnectorError::InvalidUrl)), + None => return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)), }, - None => return Response::reply(Err(ClientConnectorError::InvalidUrl)), + None => return ActorResponse::reply(Err(ClientConnectorError::InvalidUrl)), }; // check ssl availability if proto.is_secure() && !HAS_OPENSSL { //&& !HAS_TLS { - return Response::reply(Err(ClientConnectorError::SslIsNotSupported)) + return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported)) } let host = uri.host().unwrap().to_owned(); let port = uri.port().unwrap_or_else(|| proto.port()); - Response::async_reply( + ActorResponse::async( Connector::from_registry() - .call(self, ResolveConnect::host_and_port(&host, port)) + .send(ResolveConnect::host_and_port(&host, port)) + .into_actor(self) .map_err(|_, _, _| ClientConnectorError::Disconnected) .and_then(move |res, _act, _| { #[cfg(feature="alpn")] diff --git a/src/context.rs b/src/context.rs index 28cf5d7d9..a3e168f6d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,4 +1,4 @@ -use std; +use std::mem; use std::marker::PhantomData; use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; @@ -6,9 +6,9 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Addr, Handler, ResponseType, MessageResult, SpawnHandle, Syn, Unsync}; + Addr, Handler, Message, SpawnHandle, Syn, Unsync}; use actix::fut::ActorFuture; -use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; +use actix::dev::{ContextImpl, ToEnvelope, SyncEnvelope}; use body::{Body, Binary}; use error::{Error, ErrorInternalServerError}; @@ -83,12 +83,12 @@ impl AsyncContext for HttpContext where A: Actor } #[doc(hidden)] #[inline] - fn unsync_address(&mut self) -> Addr> { + fn unsync_address(&mut self) -> Addr { self.inner.unsync_address() } #[doc(hidden)] #[inline] - fn sync_address(&mut self) -> Addr> { + fn sync_address(&mut self) -> Addr { self.inner.sync_address() } } @@ -184,7 +184,7 @@ impl ActorHttpContext for HttpContext where A: Actor, fn poll(&mut self) -> Poll>, Error> { let ctx: &mut HttpContext = unsafe { - std::mem::transmute(self as &mut HttpContext) + mem::transmute(self as &mut HttpContext) }; if self.inner.alive() { @@ -205,12 +205,12 @@ impl ActorHttpContext for HttpContext where A: Actor, } } -impl ToEnvelope, M> for HttpContext +impl ToEnvelope for HttpContext where A: Actor> + Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, + M: Message + Send + 'static, M::Result: Send, { - fn pack(msg: M, tx: Option>>) -> Syn { - Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) + fn pack(msg: M, tx: Option>) -> SyncEnvelope { + SyncEnvelope::new(msg, tx) } } diff --git a/src/error.rs b/src/error.rs index d166636a7..da6745fd5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -403,8 +403,8 @@ pub enum UrlencodedError { #[fail(display="Content type error")] ContentType, /// Payload error - #[fail(display="Error that occur during reading payload")] - Payload(PayloadError), + #[fail(display="Error that occur during reading payload: {}", _0)] + Payload(#[cause] PayloadError), } /// Return `BadRequest` for `UrlencodedError` @@ -435,11 +435,11 @@ pub enum JsonPayloadError { #[fail(display="Content type error")] ContentType, /// Deserialize error - #[fail(display="Json deserialize error")] - Deserialize(JsonError), + #[fail(display="Json deserialize error: {}", _0)] + Deserialize(#[cause] JsonError), /// Payload error - #[fail(display="Error that occur during reading payload")] - Payload(PayloadError), + #[fail(display="Error that occur during reading payload: {}", _0)] + Payload(#[cause] PayloadError), } /// Return `BadRequest` for `UrlencodedError` diff --git a/src/pipeline.rs b/src/pipeline.rs index babd92199..18f9f261c 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -739,7 +739,7 @@ mod tests { let req = HttpRequest::default(); let mut ctx = HttpContext::new(req.clone(), MyActor); - let addr: Addr> = ctx.address(); + let addr: Addr = ctx.address(); let mut info = PipelineInfo::new(req); info.context = Some(Box::new(ctx)); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap(); diff --git a/src/server/mod.rs b/src/server/mod.rs index 39df2fc8d..1a7b846b6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,6 +2,7 @@ use std::{time, io}; use std::net::Shutdown; +use actix; use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; @@ -43,11 +44,14 @@ pub struct ResumeServer; /// Stop incoming connection processing, stop all workers and exit. /// /// If server starts with `spawn()` method, then spawned thread get terminated. -#[derive(Message)] pub struct StopServer { pub graceful: bool } +impl actix::Message for StopServer { + type Result = Result<(), ()>; +} + /// Low level http request handler #[allow(unused_variables)] pub trait HttpHandler: 'static { diff --git a/src/server/srv.rs b/src/server/srv.rs index 3cd6da122..3b828a1c5 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -36,12 +36,12 @@ pub struct HttpServer where H: IntoHttpHandler + 'static host: Option, keep_alive: Option, factory: Arc Vec + Send + Sync>, - workers: Vec>>>, + workers: Vec>>, sockets: HashMap, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, shutdown_timeout: u16, - signals: Option>>, + signals: Option>, no_signals: bool, } @@ -146,7 +146,7 @@ impl HttpServer where H: IntoHttpHandler + 'static } /// Set alternative address for `ProcessSignals` actor. - pub fn signals(mut self, addr: Addr>) -> Self { + pub fn signals(mut self, addr: Addr) -> Self { self.signals = Some(addr); self } @@ -227,7 +227,7 @@ impl HttpServer where H: IntoHttpHandler + 'static } // subscribe to os signals - fn subscribe_to_signals(&self) -> Option>> { + fn subscribe_to_signals(&self) -> Option> { if !self.no_signals { if let Some(ref signals) = self.signals { Some(signals.clone()) @@ -264,12 +264,12 @@ impl HttpServer /// .resource("/", |r| r.h(httpcodes::HTTPOk))) /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") /// .start(); - /// # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); + /// # actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); /// /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// } /// ``` - pub fn start(mut self) -> Addr> + pub fn start(mut self) -> Addr { if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called before start()"); @@ -288,9 +288,9 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: Addr> = Actor::start(self); - signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().subscriber()))); + let addr: Addr = Actor::start(self); + signals.map(|signals| signals.do_send( + signal::Subscribe(addr.clone().recipient()))); addr } } @@ -333,7 +333,7 @@ impl HttpServer impl HttpServer { /// Start listening for incoming tls connections. - pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { + pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) } else { @@ -350,9 +350,9 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: SyncAddress<_> = Actor::start(self); - signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + let addr: Addr = Actor::start(self); + signals.map(|signals| signals.do_send( + signal::Subscribe(addr.clone().recipient()))); Ok(addr) } } @@ -364,7 +364,7 @@ impl HttpServer /// Start listening for incoming tls connections. /// /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn start_ssl(mut self, mut builder: SslAcceptorBuilder) -> io::Result> + pub fn start_ssl(mut self, mut builder: SslAcceptorBuilder) -> io::Result> { if self.sockets.is_empty() { Err(io::Error::new(io::ErrorKind::Other, "No socket addresses are bound")) @@ -394,9 +394,9 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: SyncAddress<_> = Actor::start(self); - signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + let addr: Addr = Actor::start(self); + signals.map(|signals| signals.do_send( + signal::Subscribe(addr.clone().recipient()))); Ok(addr) } } @@ -407,7 +407,7 @@ impl HttpServer /// Start listening for incoming connections from a stream. /// /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr> + pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr where S: Stream + 'static, T: AsyncRead + AsyncWrite + 'static, A: 'static @@ -435,15 +435,15 @@ impl HttpServer // start server let signals = self.subscribe_to_signals(); - let addr: Addr> = HttpServer::create(move |ctx| { + let addr: Addr = HttpServer::create(move |ctx| { ctx.add_message_stream( stream .map_err(|_| ()) .map(move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); self }); - signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().subscriber()))); + signals.map(|signals| signals.do_send( + signal::Subscribe(addr.clone().recipient()))); addr } } @@ -477,24 +477,6 @@ impl Handler for HttpServer } } -impl Handler>> for HttpServer - where T: IoStream, - H: IntoHttpHandler, -{ - type Result = (); - - fn handle(&mut self, msg: io::Result>, _: &mut Context) -> Self::Result { - match msg { - Ok(msg) => - Arbiter::handle().spawn( - HttpChannel::new( - Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)), - Err(err) => - debug!("Error handling request: {}", err), - } - } -} - impl Handler> for HttpServer where T: IoStream, H: IntoHttpHandler, @@ -535,7 +517,7 @@ impl Handler for HttpServer impl Handler for HttpServer { - type Result = actix::Response; + type Result = actix::Response<(), ()>; fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { // stop accept threads @@ -554,7 +536,7 @@ impl Handler for HttpServer }; for worker in &self.workers { let tx2 = tx.clone(); - let fut = worker.call(self, StopWorker{graceful: dur}); + let fut = worker.send(StopWorker{graceful: dur}).into_actor(self); ActorFuture::then(fut, move |_, slf, _| { slf.workers.pop(); if slf.workers.is_empty() { @@ -562,7 +544,7 @@ impl Handler for HttpServer // we need to stop system if server was spawned if slf.exit { - Arbiter::system().send(actix::msgs::SystemExit(0)) + Arbiter::system().do_send(actix::msgs::SystemExit(0)) } } actix::fut::ok(()) @@ -570,12 +552,12 @@ impl Handler for HttpServer } if !self.workers.is_empty() { - Response::async_reply( - rx.into_future().map(|_| ()).map_err(|_| ()).actfuture()) + Response::async( + rx.into_future().map(|_| ()).map_err(|_| ())) } else { // we need to stop system if server was spawned if self.exit { - Arbiter::system().send(actix::msgs::SystemExit(0)) + Arbiter::system().do_send(actix::msgs::SystemExit(0)) } Response::reply(Ok(())) } diff --git a/src/server/worker.rs b/src/server/worker.rs index 6ee48e2d0..23e8a6c61 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -37,12 +37,14 @@ pub(crate) struct Conn { /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. -#[derive(Message)] -#[rtype(bool)] pub(crate) struct StopWorker { pub graceful: Option, } +impl Message for StopWorker { + type Result = Result; +} + /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests processing. @@ -76,14 +78,14 @@ impl Worker { let num = slf.settings.num_channels(); if num == 0 { let _ = tx.send(true); - Arbiter::arbiter().send(StopArbiter(0)); + Arbiter::arbiter().do_send(StopArbiter(0)); } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { slf.shutdown_timeout(ctx, tx, d); } else { info!("Force shutdown http worker, {} connections", num); slf.settings.head().traverse::(); let _ = tx.send(false); - Arbiter::arbiter().send(StopArbiter(0)); + Arbiter::arbiter().do_send(StopArbiter(0)); } }); } @@ -117,7 +119,7 @@ impl Handler> for Worker impl Handler for Worker where H: HttpHandler + 'static, { - type Result = Response; + type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { let num = self.settings.num_channels(); @@ -128,7 +130,7 @@ impl Handler for Worker info!("Graceful http worker shutdown, {} connections", num); let (tx, rx) = oneshot::channel(); self.shutdown_timeout(ctx, tx, dur); - Response::async_reply(rx.map_err(|_| ()).actfuture()) + Response::async(rx.map_err(|_| ())) } else { info!("Force shutdown http worker, {} connections", num); self.settings.head().traverse::(); diff --git a/src/test.rs b/src/test.rs index c98929f0a..29f01ab31 100644 --- a/src/test.rs +++ b/src/test.rs @@ -56,7 +56,7 @@ pub struct TestServer { addr: net::SocketAddr, thread: Option>, system: SystemRunner, - server_sys: Addr>, + server_sys: Addr, } impl TestServer { @@ -165,7 +165,7 @@ impl TestServer { /// Stop http server fn stop(&mut self) { if let Some(handle) = self.thread.take() { - self.server_sys.send(msgs::SystemExit(0)); + self.server_sys.do_send(msgs::SystemExit(0)); let _ = handle.join(); } } diff --git a/src/ws/client.rs b/src/ws/client.rs index 7800ab02f..ebb2f3849 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -103,7 +103,7 @@ pub struct WsClient { http_err: Option, origin: Option, protocols: Option, - conn: Addr>, + conn: Addr, } impl WsClient { @@ -114,7 +114,7 @@ impl WsClient { } /// Create new websocket connection with custom `ClientConnector` - pub fn with_connector>(uri: S, conn: Addr>) -> WsClient { + pub fn with_connector>(uri: S, conn: Addr) -> WsClient { let mut cl = WsClient { request: ClientRequest::build(), err: None, @@ -200,7 +200,7 @@ impl WsClient { // get connection and start handshake Ok(Box::new( - self.conn.call_fut(Connect(request.uri().clone())) + self.conn.send(Connect(request.uri().clone())) .map_err(|_| WsClientError::Disconnected) .and_then(|res| match res { Ok(stream) => Either::A(WsHandshake::new(stream, request)), diff --git a/src/ws/context.rs b/src/ws/context.rs index 1eb78c7e5..b9214b749 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -5,9 +5,9 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Addr, Handler, ResponseType, SpawnHandle, MessageResult, Syn, Unsync}; + Addr, Handler, Message, Syn, Unsync, SpawnHandle}; use actix::fut::ActorFuture; -use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; +use actix::dev::{ContextImpl, ToEnvelope, SyncEnvelope}; use body::{Body, Binary}; use error::{Error, ErrorInternalServerError}; @@ -67,13 +67,13 @@ impl AsyncContext for WebsocketContext where A: Actor Addr> { + fn unsync_address(&mut self) -> Addr { self.inner.unsync_address() } #[doc(hidden)] #[inline] - fn sync_address(&mut self) -> Addr> { + fn sync_address(&mut self) -> Addr { self.inner.sync_address() } } @@ -217,12 +217,12 @@ impl ActorHttpContext for WebsocketContext where A: Actor ToEnvelope, M> for WebsocketContext +impl ToEnvelope for WebsocketContext where A: Actor> + Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, + M: Message + Send + 'static, M::Result: Send { - fn pack(msg: M, tx: Option>>) -> Syn { - Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) + fn pack(msg: M, tx: Option>) -> SyncEnvelope { + SyncEnvelope::new(msg, tx) } } diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 07b845cc6..d9bf0f103 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -47,7 +47,7 @@ use bytes::BytesMut; use http::{Method, StatusCode, header}; use futures::{Async, Poll, Stream}; -use actix::{Actor, AsyncContext, ResponseType, Handler}; +use actix::{Actor, AsyncContext, Handler}; use body::Binary; use payload::ReadAny; @@ -74,7 +74,7 @@ const SEC_WEBSOCKET_VERSION: &str = "SEC-WEBSOCKET-VERSION"; /// `WebSocket` Message -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Message)] pub enum Message { Text(String), Binary(Binary), @@ -85,11 +85,6 @@ pub enum Message { Error } -impl ResponseType for Message { - type Item = (); - type Error = (); -} - /// Do websocket handshake and start actor pub fn start(mut req: HttpRequest, actor: A) -> Result where A: Actor> + Handler, diff --git a/tests/test_server.rs b/tests/test_server.rs index 3a8321c83..e84c9211b 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -72,12 +72,12 @@ fn test_start() { assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); // pause - let _ = srv_addr.call_fut(server::PauseServer).wait(); + let _ = srv_addr.send(server::PauseServer).wait(); thread::sleep(time::Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); // resume - let _ = srv_addr.call_fut(server::ResumeServer).wait(); + let _ = srv_addr.send(server::ResumeServer).wait(); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); }