diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs index 21d890434..dc0d6fef8 100644 --- a/examples/multipart/src/main.rs +++ b/examples/multipart/src/main.rs @@ -18,8 +18,13 @@ impl Route for MyRoute { fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { println!("{:?}", req); + let multipart = match req.multipart(payload) { + Ok(mp) => mp, + Err(e) => return Reply::reply(e), + }; + // get Multipart stream - WrapStream::::actstream(req.multipart(payload)?) + WrapStream::::actstream(multipart) .and_then(|item, act, ctx| { // Multipart stream is a stream of Fields and nested Multiparts match item { diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 0b96a1e2d..c46a4875c 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -58,6 +58,11 @@ struct ChatClient; struct ClientCommand(String); +impl ResponseType for ClientCommand { + type Item = (); + type Error = (); +} + impl Actor for ChatClient { type Context = FramedContext; @@ -112,11 +117,6 @@ impl Handler for ChatClient } } -impl ResponseType for ChatClient { - type Item = (); - type Error = (); -} - /// Server communication impl FramedActor for ChatClient { @@ -134,11 +134,6 @@ impl StreamHandler for ChatClient { } } -impl ResponseType for ChatClient { - type Item = (); - type Error = (); -} - impl Handler for ChatClient { fn handle(&mut self, msg: codec::ChatResponse, _: &mut FramedContext) diff --git a/examples/websocket-chat/src/codec.rs b/examples/websocket-chat/src/codec.rs index 47e02a376..718c3c82f 100644 --- a/examples/websocket-chat/src/codec.rs +++ b/examples/websocket-chat/src/codec.rs @@ -4,7 +4,7 @@ use serde_json as json; use byteorder::{BigEndian , ByteOrder}; use bytes::{BytesMut, BufMut}; use tokio_io::codec::{Encoder, Decoder}; - +use actix::ResponseType; /// Client request #[derive(Serialize, Deserialize, Debug)] @@ -20,6 +20,11 @@ pub enum ChatRequest { Ping } +impl ResponseType for ChatRequest { + type Item = (); + type Error = (); +} + /// Server response #[derive(Serialize, Deserialize, Debug)] #[serde(tag="cmd", content="data")] @@ -36,6 +41,10 @@ pub enum ChatResponse { Message(String), } +impl ResponseType for ChatResponse { + type Item = (); + type Error = (); +} /// Codec for Client -> Server transport pub struct ChatCodec; diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index f646b8690..6f706ac64 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -2,6 +2,7 @@ extern crate rand; extern crate bytes; extern crate byteorder; +extern crate futures; extern crate tokio_io; extern crate tokio_core; extern crate env_logger; @@ -78,11 +79,6 @@ impl Handler for WsChatSession { } } -impl ResponseType for WsChatSession { - type Item = (); - type Error = (); -} - /// WebSocket message handler impl Handler for WsChatSession { fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext) @@ -194,11 +190,6 @@ impl StreamHandler for WsChatSession } } -impl ResponseType for WsChatSession { - type Item = (); - type Error = (); -} - fn main() { let _ = env_logger::init(); diff --git a/examples/websocket-chat/src/server.rs b/examples/websocket-chat/src/server.rs index e20735f1e..c15644a6a 100644 --- a/examples/websocket-chat/src/server.rs +++ b/examples/websocket-chat/src/server.rs @@ -16,11 +16,24 @@ pub struct Connect { pub addr: Box + Send>, } +/// Response type for Connect message +/// +/// Chat server returns unique session id +impl ResponseType for Connect { + type Item = usize; + type Error = (); +} + /// Session is disconnected pub struct Disconnect { pub id: usize, } +impl ResponseType for Disconnect { + type Item = (); + type Error = (); +} + /// Send message to specific room pub struct Message { /// Id of the client session @@ -31,9 +44,19 @@ pub struct Message { pub room: String, } +impl ResponseType for Message { + type Item = (); + type Error = (); +} + /// List of available rooms pub struct ListRooms; +impl ResponseType for ListRooms { + type Item = Vec; + type Error = (); +} + /// Join room, if room does not exists create new one. pub struct Join { /// Client id @@ -42,6 +65,11 @@ pub struct Join { pub name: String, } +impl ResponseType for Join { + type Item = (); + type Error = (); +} + /// `ChatServer` manages chat rooms and responsible for coordinating chat session. /// implementation is super primitive pub struct ChatServer { @@ -109,15 +137,6 @@ impl Handler for ChatServer { } } -impl ResponseType for ChatServer { - /// Response type for Connect message - /// - /// Chat server returns unique session id - type Item = usize; - type Error = (); -} - - /// Handler for Disconnect message. impl Handler for ChatServer { @@ -144,11 +163,6 @@ impl Handler for ChatServer { } } -impl ResponseType for ChatServer { - type Item = (); - type Error = (); -} - /// Handler for Message message. impl Handler for ChatServer { @@ -159,11 +173,6 @@ impl Handler for ChatServer { } } -impl ResponseType for ChatServer { - type Item = (); - type Error = (); -} - /// Handler for `ListRooms` message. impl Handler for ChatServer { @@ -178,11 +187,6 @@ impl Handler for ChatServer { } } -impl ResponseType for ChatServer { - type Item = Vec; - type Error = (); -} - /// Join room, send disconnect message to old room /// send join message to new room impl Handler for ChatServer { @@ -211,8 +215,3 @@ impl Handler for ChatServer { Self::empty() } } - -impl ResponseType for ChatServer { - type Item = (); - type Error = (); -} diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 41cf1ea4c..961955a59 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -3,6 +3,7 @@ use std::{io, net}; use std::str::FromStr; use std::time::{Instant, Duration}; +use futures::Stream; use tokio_core::net::{TcpStream, TcpListener}; use actix::*; @@ -14,6 +15,10 @@ use codec::{ChatRequest, ChatResponse, ChatCodec}; /// Chat server sends this messages to session pub struct Message(pub String); +impl ResponseType for Message { + type Item = (); + type Error = (); +} /// `ChatSession` actor is responsible for tcp peer communitions. pub struct ChatSession { @@ -68,11 +73,6 @@ impl StreamHandler for ChatSession { } } -impl ResponseType for ChatSession { - type Item = (); - type Error = (); -} - impl Handler for ChatSession { /// We'll stop chat session actor on any error, high likely it is just @@ -137,12 +137,6 @@ impl Handler for ChatSession { } } -impl ResponseType for ChatSession { - type Item = (); - type Error = (); -} - - /// Helper methods impl ChatSession { @@ -194,7 +188,7 @@ impl TcpServer { // So to be able to handle this events `Server` actor has to implement // stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>` let _: () = TcpServer::create(|ctx| { - ctx.add_stream(listener.incoming()); + ctx.add_stream(listener.incoming().map(|(t, a)| TcpConnect(t, a))); TcpServer{chat: chat} }); } @@ -206,18 +200,19 @@ impl Actor for TcpServer { type Context = Context; } -/// Handle stream of TcpStream's -impl StreamHandler<(TcpStream, net::SocketAddr), io::Error> for TcpServer {} +struct TcpConnect(TcpStream, net::SocketAddr); -impl ResponseType<(TcpStream, net::SocketAddr)> for TcpServer { +impl ResponseType for TcpConnect { type Item = (); type Error = (); } -impl Handler<(TcpStream, net::SocketAddr), io::Error> for TcpServer { +/// Handle stream of TcpStream's +impl StreamHandler for TcpServer {} - fn handle(&mut self, msg: (TcpStream, net::SocketAddr), _: &mut Context) - -> Response +impl Handler for TcpServer { + + fn handle(&mut self, msg: TcpConnect, _: &mut Context) -> Response { // For each incoming connection we create `ChatSession` actor // with out chat server address. diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs index af7b33ef2..48d335086 100644 --- a/examples/websocket/src/main.rs +++ b/examples/websocket/src/main.rs @@ -30,12 +30,15 @@ impl Route for MyWebSocket { } } -impl ResponseType for MyWebSocket { - type Item = (); - type Error = (); -} +impl StreamHandler for MyWebSocket { + fn started(&mut self, ctx: &mut Self::Context) { + println!("WebSocket session openned"); + } -impl StreamHandler for MyWebSocket {} + fn finished(&mut self, ctx: &mut Self::Context) { + println!("WebSocket session closed"); + } +} impl Handler for MyWebSocket { fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext) diff --git a/src/context.rs b/src/context.rs index abb2c97d8..9172f77e2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -20,6 +20,7 @@ pub struct HttpContext where A: Actor> + Route, { act: Option, state: ActorState, + modified: bool, items: ActorItemsCell, address: ActorAddressCell, stream: VecDeque, @@ -57,16 +58,19 @@ impl AsyncContext for HttpContext where A: Actor + Route fn spawn(&mut self, fut: F) -> SpawnHandle where F: ActorFuture + 'static { + self.modified = true; self.items.spawn(fut) } fn wait(&mut self, fut: F) where F: ActorFuture + 'static { + self.modified = true; self.wait.add(fut); } fn cancel_future(&mut self, handle: SpawnHandle) -> bool { + self.modified = true; self.items.cancel_future(handle) } } @@ -85,6 +89,7 @@ impl HttpContext where A: Actor + Route { HttpContext { act: None, state: ActorState::Started, + modified: false, items: ActorItemsCell::default(), address: ActorAddressCell::default(), wait: ActorWaitCell::default(), @@ -124,17 +129,19 @@ impl HttpContext where A: Actor + Route { impl HttpContext where A: Actor + Route { #[doc(hidden)] - pub fn subscriber(&mut self) -> Box> - where A: Handler + pub fn subscriber(&mut self) -> Box> + where A: Handler, + M: ResponseType + 'static, { Box::new(self.address.unsync_address()) } #[doc(hidden)] - pub fn sync_subscriber(&mut self) -> Box + Send> + pub fn sync_subscriber(&mut self) -> Box + Send> where A: Handler, - A::Item: Send, - A::Error: Send, + M: ResponseType + Send + 'static, + M::Item: Send, + M::Error: Send, { Box::new(self.address.sync_address()) } @@ -170,28 +177,23 @@ impl Stream for HttpContext where A: Actor + Route _ => () } - // check wait futures - if self.wait.poll(act, ctx) { - return Ok(Async::NotReady) - } - let mut prep_stop = false; loop { - let mut not_ready = true; - - if self.address.poll(act, ctx) { - not_ready = false - } - - self.items.poll(act, ctx); + self.modified = false; // check wait futures if self.wait.poll(act, ctx) { return Ok(Async::NotReady) } + // incoming messages + self.address.poll(act, ctx); + + // spawned futures and streams + self.items.poll(act, ctx); + // are we done - if !not_ready { + if self.modified { continue } @@ -239,15 +241,13 @@ impl Stream for HttpContext where A: Actor + Route } } -type ToEnvelopeSender = - Sender>::Item, >::Error>>; - impl ToEnvelope for HttpContext - where M: Send + 'static, - A: Actor> + Route + Handler, - >::Item: Send, >::Item: Send + where A: Actor> + Route + Handler, + M: ResponseType + Send + 'static, + M::Item: Send, + M::Error: Send, { - fn pack(msg: M, tx: Option>) -> Envelope + fn pack(msg: M, tx: Option>>) -> Envelope { RemoteEnvelope::new(msg, tx).into() } diff --git a/src/server.rs b/src/server.rs index d3c11f453..43bd06acf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -46,7 +46,7 @@ impl HttpServer S: Stream + 'static { Ok(HttpServer::create(move |ctx| { - ctx.add_stream(stream); + ctx.add_stream(stream.map(|(t, a)| IoStream(t, a))); self })) } @@ -81,7 +81,7 @@ impl HttpServer { } else { Ok(HttpServer::create(move |ctx| { for tcp in addrs { - ctx.add_stream(tcp.incoming()); + ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, a))); } self })) @@ -89,7 +89,9 @@ impl HttpServer { } } -impl ResponseType<(T, A)> for HttpServer +struct IoStream(T, A); + +impl ResponseType for IoStream where T: AsyncRead + AsyncWrite + 'static, A: 'static { @@ -97,16 +99,15 @@ impl ResponseType<(T, A)> for HttpServer type Error = (); } -impl StreamHandler<(T, A), io::Error> for HttpServer - where T: AsyncRead + AsyncWrite + 'static, - A: 'static { -} +impl StreamHandler, io::Error> for HttpServer + where T: AsyncRead + AsyncWrite + 'static, A: 'static {} -impl Handler<(T, A), io::Error> for HttpServer +impl Handler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, A: 'static { - fn handle(&mut self, msg: (T, A), _: &mut Context) -> Response + fn handle(&mut self, msg: IoStream, _: &mut Context) + -> Response> { Arbiter::handle().spawn( HttpChannel{router: Rc::clone(&self.router), diff --git a/src/ws.rs b/src/ws.rs index b88711713..582c09e14 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -69,7 +69,7 @@ use http::{Method, StatusCode, header}; use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; -use actix::Actor; +use actix::{Actor, ResponseType}; use context::HttpContext; use route::Route; @@ -103,6 +103,11 @@ pub enum Message { Error } +impl ResponseType for Message { + type Item = (); + type Error = (); +} + /// Prepare `WebSocket` handshake response. /// /// This function returns handshake `HttpResponse`, ready to send to peer.