diff --git a/guide/src/qs_9.md b/guide/src/qs_9.md index 9c45fbd04..1b612a163 100644 --- a/guide/src/qs_9.md +++ b/guide/src/qs_9.md @@ -6,10 +6,11 @@ a [*ws::WsStream*](../actix_web/ws/struct.WsStream.html) and then use stream combinators to handle actual messages. But it is simplier to handle websocket communications with http actor. -```rust -extern crate actix; -extern crate actix_web; +This is example of simple websocket echo server: +```rust +# extern crate actix; +# extern crate actix_web; use actix::*; use actix_web::*; @@ -17,18 +18,18 @@ use actix_web::*; struct Ws; impl Actor for Ws { - type Context = HttpContext; + type Context = ws::WebsocketContext; } /// Define Handler for ws::Message message impl Handler for Ws { type Result=(); - fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext) { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { match msg { - ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, &msg), - ws::Message::Text(text) => ws::WsWriter::text(ctx, &text), - ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin), + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Text(text) => ctx.text(&text), + ws::Message::Binary(bin) => ctx.binary(bin), _ => (), } } diff --git a/src/context.rs b/src/context.rs index 19269af8f..a86a1fbfc 100644 --- a/src/context.rs +++ b/src/context.rs @@ -38,17 +38,12 @@ pub struct HttpContext where A: Actor>, impl ActorContext for HttpContext where A: Actor { - /// Stop actor execution fn stop(&mut self) { self.inner.stop(); } - - /// Terminate actor execution fn terminate(&mut self) { self.inner.terminate() } - - /// Actor execution state fn state(&self) -> ActorState { self.inner.state() } @@ -61,13 +56,11 @@ impl AsyncContext for HttpContext where A: Actor { self.inner.spawn(fut) } - fn wait(&mut self, fut: F) where F: ActorFuture + 'static { self.inner.wait(fut) } - fn cancel_future(&mut self, handle: SpawnHandle) -> bool { self.inner.cancel_future(handle) } @@ -79,12 +72,10 @@ impl AsyncContextApi for HttpContext where A: Actor fn unsync_sender(&mut self) -> queue::unsync::UnboundedSender> { self.inner.unsync_sender() } - #[inline] fn unsync_address(&mut self) -> Address { self.inner.unsync_address() } - #[inline] fn sync_address(&mut self) -> SyncAddress { self.inner.sync_address() @@ -97,7 +88,6 @@ impl HttpContext where A: Actor { pub fn new(req: HttpRequest, actor: A) -> HttpContext { HttpContext::from_request(req).actor(actor) } - pub fn from_request(req: HttpRequest) -> HttpContext { HttpContext { inner: ContextImpl::new(None), @@ -106,7 +96,6 @@ impl HttpContext where A: Actor { disconnected: false, } } - #[inline] pub fn actor(mut self, actor: A) -> HttpContext { self.inner.set_actor(actor); @@ -217,9 +206,7 @@ impl ToEnvelope for HttpContext fn pack(msg: M, tx: Option>>, channel_on_drop: bool) -> Envelope where A: Handler, - M: ResponseType + Send + 'static, - M::Item: Send, - M::Error: Send + M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send { RemoteEnvelope::new(msg, tx, channel_on_drop).into() } @@ -240,7 +227,7 @@ pub struct Drain { } impl Drain { - fn new(fut: oneshot::Receiver<()>) -> Self { + pub fn new(fut: oneshot::Receiver<()>) -> Self { Drain { fut: fut, _a: PhantomData diff --git a/src/lib.rs b/src/lib.rs index 08df8206a..36d7d0e0f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,6 +109,7 @@ mod worker; mod channel; mod wsframe; mod wsproto; +mod wscontext; mod h1; mod h2; mod h1writer; diff --git a/src/middleware/cors.rs b/src/middleware/cors.rs index 2533da458..630a6eb70 100644 --- a/src/middleware/cors.rs +++ b/src/middleware/cors.rs @@ -34,7 +34,7 @@ //! .allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT]) //! .allowed_header(header::CONTENT_TYPE) //! .max_age(3600) -//! .finish().expect("Can not create CORS middleware")) +//! .finish().expect("Can not create CORS middleware")); //! r.method(Method::GET).f(|_| httpcodes::HTTPOk); //! r.method(Method::HEAD).f(|_| httpcodes::HTTPMethodNotAllowed); //! }) @@ -96,10 +96,7 @@ pub enum Error { impl ResponseError for Error { fn error_response(&self) -> HttpResponse { - match *self { - Error::BadOrigin => HTTPBadRequest.into(), - _ => HTTPBadRequest.into() - } + HTTPBadRequest.into() } } @@ -355,7 +352,7 @@ impl CorsBuilder { { self.methods = true; if let Some(cors) = cors(&mut self.cors, &self.error) { - for m in methods.into_iter() { + for m in methods { match Method::try_from(m) { Ok(method) => { cors.methods.insert(method); @@ -404,7 +401,7 @@ impl CorsBuilder { where U: IntoIterator, HeaderName: HttpTryFrom { if let Some(cors) = cors(&mut self.cors, &self.error) { - for h in headers.into_iter() { + for h in headers { match HeaderName::try_from(h) { Ok(method) => { if cors.headers.is_all() { diff --git a/src/route.rs b/src/route.rs index 2779bd693..542b4b18e 100644 --- a/src/route.rs +++ b/src/route.rs @@ -103,7 +103,7 @@ impl Route { } } -/// RouteHandler wrapper. This struct is required because it needs to be shared +/// `RouteHandler` wrapper. This struct is required because it needs to be shared /// for resource level middlewares. struct InnerHandler(Rc>>); diff --git a/src/ws.rs b/src/ws.rs index c49cb7d4e..8b762798b 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -8,8 +8,9 @@ //! ```rust //! # extern crate actix; //! # extern crate actix_web; -//! use actix::*; -//! use actix_web::*; +//! # use actix::*; +//! # use actix_web::*; +//! use actix_web::ws; //! //! // do websocket handshake and start actor //! fn ws_index(req: HttpRequest) -> Result { @@ -19,18 +20,18 @@ //! struct Ws; //! //! impl Actor for Ws { -//! type Context = HttpContext; +//! type Context = ws::WebsocketContext; //! } //! //! // Define Handler for ws::Message message //! impl Handler for Ws { //! type Result = (); //! -//! fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext) { +//! fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { //! match msg { -//! ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, &msg), -//! ws::Message::Text(text) => ws::WsWriter::text(ctx, &text), -//! ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin), +//! ws::Message::Ping(msg) => ctx.pong(&msg), +//! ws::Message::Text(text) => ctx.text(&text), +//! ws::Message::Binary(bin) => ctx.binary(bin), //! _ => (), //! } //! } @@ -42,22 +43,22 @@ //! # .finish(); //! # } //! ``` -use std::vec::Vec; -use http::{Method, StatusCode, header}; use bytes::BytesMut; +use http::{Method, StatusCode, header}; use futures::{Async, Poll, Stream}; use actix::{Actor, AsyncContext, ResponseType, Handler}; +use body::Binary; use payload::ReadAny; use error::{Error, WsHandshakeError}; -use context::HttpContext; use httprequest::HttpRequest; use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder}; use wsframe; use wsproto::*; pub use wsproto::CloseCode; +pub use wscontext::WebsocketContext; const SEC_WEBSOCKET_ACCEPT: &str = "SEC-WEBSOCKET-ACCEPT"; const SEC_WEBSOCKET_KEY: &str = "SEC-WEBSOCKET-KEY"; @@ -69,7 +70,7 @@ const SEC_WEBSOCKET_VERSION: &str = "SEC-WEBSOCKET-VERSION"; #[derive(Debug)] pub enum Message { Text(String), - Binary(Vec), + Binary(Binary), Ping(String), Pong(String), Close, @@ -84,13 +85,13 @@ impl ResponseType for Message { /// Do websocket handshake and start actor pub fn start(mut req: HttpRequest, actor: A) -> Result - where A: Actor> + Handler, + where A: Actor> + Handler, S: 'static { let mut resp = handshake(&req)?; let stream = WsStream::new(req.payload_mut().readany()); - let mut ctx = HttpContext::new(req, actor); + let mut ctx = WebsocketContext::new(req, actor); ctx.add_message_stream(stream); Ok(resp.body(ctx)?) @@ -222,14 +223,17 @@ impl Stream for WsStream { }, OpCode::Ping => return Ok(Async::Ready(Some( - Message::Ping(String::from_utf8_lossy(&payload).into())))), + Message::Ping( + String::from_utf8_lossy(payload.as_ref()).into())))), OpCode::Pong => return Ok(Async::Ready(Some( - Message::Pong(String::from_utf8_lossy(&payload).into())))), + Message::Pong( + String::from_utf8_lossy(payload.as_ref()).into())))), OpCode::Binary => return Ok(Async::Ready(Some(Message::Binary(payload)))), OpCode::Text => { - match String::from_utf8(payload) { + let tmp = Vec::from(payload.as_ref()); + match String::from_utf8(tmp) { Ok(s) => return Ok(Async::Ready(Some(Message::Text(s)))), Err(_) => @@ -262,67 +266,6 @@ impl Stream for WsStream { } } - -/// `WebSocket` writer -pub struct WsWriter; - -impl WsWriter { - - /// Send text frame - pub fn text(ctx: &mut HttpContext, text: &str) - where A: Actor> - { - let mut frame = wsframe::Frame::message(Vec::from(text), OpCode::Text, true); - let mut buf = Vec::new(); - frame.format(&mut buf).unwrap(); - - ctx.write(buf); - } - - /// Send binary frame - pub fn binary(ctx: &mut HttpContext, data: Vec) - where A: Actor> - { - let mut frame = wsframe::Frame::message(data, OpCode::Binary, true); - let mut buf = Vec::new(); - frame.format(&mut buf).unwrap(); - - ctx.write(buf); - } - - /// Send ping frame - pub fn ping(ctx: &mut HttpContext, message: &str) - where A: Actor> - { - let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Ping, true); - let mut buf = Vec::new(); - frame.format(&mut buf).unwrap(); - - ctx.write(buf); - } - - /// Send pong frame - pub fn pong(ctx: &mut HttpContext, message: &str) - where A: Actor> - { - let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Pong, true); - let mut buf = Vec::new(); - frame.format(&mut buf).unwrap(); - - ctx.write(buf); - } - - /// Send close frame - pub fn close(ctx: &mut HttpContext, code: CloseCode, reason: &str) - where A: Actor> - { - let mut frame = wsframe::Frame::close(code, reason); - let mut buf = Vec::new(); - frame.format(&mut buf).unwrap(); - ctx.write(buf); - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/wscontext.rs b/src/wscontext.rs new file mode 100644 index 000000000..41206e457 --- /dev/null +++ b/src/wscontext.rs @@ -0,0 +1,257 @@ +use std::mem; +use std::collections::VecDeque; +use futures::{Async, Poll}; +use futures::sync::oneshot::Sender; +use futures::unsync::oneshot; + +use actix::{Actor, ActorState, ActorContext, AsyncContext, + Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; +use actix::fut::ActorFuture; +use actix::dev::{queue, AsyncContextApi, + ContextImpl, ContextProtocol, Envelope, ToEnvelope, RemoteEnvelope}; + +use body::{Body, Binary}; +use error::{Error, Result, ErrorInternalServerError}; +use httprequest::HttpRequest; +use context::{Frame, ActorHttpContext, Drain}; + +use wsframe; +use wsproto::*; +pub use wsproto::CloseCode; + + +/// Http actor execution context +pub struct WebsocketContext where A: Actor>, +{ + inner: ContextImpl, + stream: VecDeque, + request: HttpRequest, + disconnected: bool, +} + +impl ActorContext for WebsocketContext where A: Actor +{ + fn stop(&mut self) { + self.inner.stop(); + } + fn terminate(&mut self) { + self.inner.terminate() + } + fn state(&self) -> ActorState { + self.inner.state() + } +} + +impl AsyncContext for WebsocketContext where A: Actor +{ + fn spawn(&mut self, fut: F) -> SpawnHandle + where F: ActorFuture + 'static + { + self.inner.spawn(fut) + } + + fn wait(&mut self, fut: F) + where F: ActorFuture + 'static + { + self.inner.wait(fut) + } + + fn cancel_future(&mut self, handle: SpawnHandle) -> bool { + self.inner.cancel_future(handle) + } +} + +#[doc(hidden)] +impl AsyncContextApi for WebsocketContext where A: Actor { + #[inline] + fn unsync_sender(&mut self) -> queue::unsync::UnboundedSender> { + self.inner.unsync_sender() + } + + #[inline] + fn unsync_address(&mut self) -> Address { + self.inner.unsync_address() + } + + #[inline] + fn sync_address(&mut self) -> SyncAddress { + self.inner.sync_address() + } +} + +impl WebsocketContext where A: Actor { + + #[inline] + pub fn new(req: HttpRequest, actor: A) -> WebsocketContext { + WebsocketContext::from_request(req).actor(actor) + } + + pub fn from_request(req: HttpRequest) -> WebsocketContext { + WebsocketContext { + inner: ContextImpl::new(None), + stream: VecDeque::new(), + request: req, + disconnected: false, + } + } + + #[inline] + pub fn actor(mut self, actor: A) -> WebsocketContext { + self.inner.set_actor(actor); + self + } +} + +impl WebsocketContext where A: Actor { + + /// Write payload + #[inline] + fn write>(&mut self, data: B) { + if !self.disconnected { + self.stream.push_back(Frame::Payload(Some(data.into()))); + } else { + warn!("Trying to write to disconnected response"); + } + } + + /// Shared application state + #[inline] + pub fn state(&self) -> &S { + self.request.state() + } + + /// Incoming request + #[inline] + pub fn request(&mut self) -> &mut HttpRequest { + &mut self.request + } + + /// Send text frame + pub fn text(&mut self, text: &str) { + let mut frame = wsframe::Frame::message(Vec::from(text), OpCode::Text, true); + let mut buf = Vec::new(); + frame.format(&mut buf).unwrap(); + + self.write(buf); + } + + /// Send binary frame + pub fn binary>(&mut self, data: B) { + let mut frame = wsframe::Frame::message(data, OpCode::Binary, true); + let mut buf = Vec::new(); + frame.format(&mut buf).unwrap(); + + self.write(buf); + } + + /// Send ping frame + pub fn ping(&mut self, message: &str) { + let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Ping, true); + let mut buf = Vec::new(); + frame.format(&mut buf).unwrap(); + + self.write(buf); + } + + /// Send pong frame + pub fn pong(&mut self, message: &str) { + let mut frame = wsframe::Frame::message(Vec::from(message), OpCode::Pong, true); + let mut buf = Vec::new(); + frame.format(&mut buf).unwrap(); + + self.write(buf); + } + + /// Send close frame + pub fn close(&mut self, code: CloseCode, reason: &str) { + let mut frame = wsframe::Frame::close(code, reason); + let mut buf = Vec::new(); + frame.format(&mut buf).unwrap(); + self.write(buf); + } + + /// Returns drain future + pub fn drain(&mut self) -> Drain { + let (tx, rx) = oneshot::channel(); + self.inner.modify(); + self.stream.push_back(Frame::Drain(tx)); + Drain::new(rx) + } + + /// Check if connection still open + #[inline] + pub fn connected(&self) -> bool { + !self.disconnected + } +} + +impl WebsocketContext where A: Actor { + + #[inline] + #[doc(hidden)] + pub fn subscriber(&mut self) -> Box> + where A: Handler, M: ResponseType + 'static + { + self.inner.subscriber() + } + + #[inline] + #[doc(hidden)] + pub fn sync_subscriber(&mut self) -> Box + Send> + where A: Handler, + M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, + { + self.inner.sync_subscriber() + } +} + +impl ActorHttpContext for WebsocketContext where A: Actor, S: 'static { + + #[inline] + fn disconnected(&mut self) { + self.disconnected = true; + self.stop(); + } + + fn poll(&mut self) -> Poll, Error> { + let ctx: &mut WebsocketContext = unsafe { + mem::transmute(self as &mut WebsocketContext) + }; + + if self.inner.alive() { + match self.inner.poll(ctx) { + Ok(Async::NotReady) | Ok(Async::Ready(())) => (), + Err(_) => return Err(ErrorInternalServerError("error").into()), + } + } + + // frames + if let Some(frame) = self.stream.pop_front() { + Ok(Async::Ready(Some(frame))) + } else if self.inner.alive() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } + } +} + +impl ToEnvelope for WebsocketContext + where A: Actor>, +{ + #[inline] + fn pack(msg: M, tx: Option>>, + channel_on_drop: bool) -> Envelope + where A: Handler, + M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send { + RemoteEnvelope::new(msg, tx, channel_on_drop).into() + } +} + +impl From> for Body + where A: Actor>, S: 'static +{ + fn from(ctx: WebsocketContext) -> Body { + Body::Actor(Box::new(ctx)) + } +} diff --git a/src/wsframe.rs b/src/wsframe.rs index 3fd09ef4e..be036a4e8 100644 --- a/src/wsframe.rs +++ b/src/wsframe.rs @@ -3,6 +3,7 @@ use std::io::{Write, Error, ErrorKind}; use std::iter::FromIterator; use bytes::BytesMut; +use body::Binary; use wsproto::{OpCode, CloseCode}; @@ -14,7 +15,7 @@ fn apply_mask(buf: &mut [u8], mask: &[u8; 4]) { } /// A struct representing a `WebSocket` frame. -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct Frame { finished: bool, rsv1: bool, @@ -22,13 +23,13 @@ pub(crate) struct Frame { rsv3: bool, opcode: OpCode, mask: Option<[u8; 4]>, - payload: Vec, + payload: Binary, } impl Frame { /// Desctructe frame - pub fn unpack(self) -> (bool, OpCode, Vec) { + pub fn unpack(self) -> (bool, OpCode, Binary) { (self.finished, self.opcode, self.payload) } @@ -55,11 +56,11 @@ impl Frame { /// Create a new data frame. #[inline] - pub fn message(data: Vec, code: OpCode, finished: bool) -> Frame { + pub fn message>(data: B, code: OpCode, finished: bool) -> Frame { Frame { finished: finished, opcode: code, - payload: data, + payload: data.into(), .. Frame::default() } } @@ -82,7 +83,7 @@ impl Frame { }; Frame { - payload: payload, + payload: payload.into(), .. Frame::default() } } @@ -212,7 +213,7 @@ impl Frame { rsv3: rsv3, opcode: opcode, mask: mask, - payload: data, + payload: data.into(), }; (frame, header_length + length) @@ -251,7 +252,7 @@ impl Frame { if self.payload.len() < 126 { two |= self.payload.len() as u8; let headers = [one, two]; - try!(w.write_all(&headers)); + w.write_all(&headers)?; } else if self.payload.len() <= 65_535 { two |= 126; let length_bytes: [u8; 2] = unsafe { @@ -259,7 +260,7 @@ impl Frame { mem::transmute(short.to_be()) }; let headers = [one, two, length_bytes[0], length_bytes[1]]; - try!(w.write_all(&headers)); + w.write_all(&headers)?; } else { two |= 127; let length_bytes: [u8; 8] = unsafe { @@ -278,16 +279,18 @@ impl Frame { length_bytes[6], length_bytes[7], ]; - try!(w.write_all(&headers)); + w.write_all(&headers)?; } if self.mask.is_some() { let mask = self.mask.take().unwrap(); - apply_mask(&mut self.payload, &mask); - try!(w.write_all(&mask)); + let mut payload = Vec::from(self.payload.as_ref()); + apply_mask(&mut payload, &mask); + w.write_all(&mask)?; + w.write_all(payload.as_ref())?; + } else { + w.write_all(self.payload.as_ref())?; } - - try!(w.write_all(&self.payload)); Ok(()) } } @@ -301,7 +304,7 @@ impl Default for Frame { rsv3: false, opcode: OpCode::Close, mask: None, - payload: Vec::new(), + payload: Binary::from(&b""[..]), } } } @@ -318,15 +321,16 @@ impl fmt::Display for Frame { payload length: {} payload: 0x{} ", - self.finished, - self.rsv1, - self.rsv2, - self.rsv3, - self.opcode, - // self.mask.map(|mask| format!("{:?}", mask)).unwrap_or("NONE".into()), - self.len(), - self.payload.len(), - self.payload.iter().map(|byte| format!("{:x}", byte)).collect::()) + self.finished, + self.rsv1, + self.rsv2, + self.rsv3, + self.opcode, + // self.mask.map(|mask| format!("{:?}", mask)).unwrap_or("NONE".into()), + self.len(), + self.payload.len(), + self.payload.as_ref().iter().map( + |byte| format!("{:x}", byte)).collect::()) } } @@ -343,7 +347,7 @@ mod tests { println!("FRAME: {}", frame); assert!(!frame.finished); assert_eq!(frame.opcode, OpCode::Text); - assert_eq!(frame.payload, &b"1"[..]); + assert_eq!(frame.payload.as_ref(), &b"1"[..]); } #[test] @@ -365,7 +369,7 @@ mod tests { let frame = Frame::parse(&mut buf).unwrap().unwrap(); assert!(!frame.finished); assert_eq!(frame.opcode, OpCode::Text); - assert_eq!(frame.payload, &b"1234"[..]); + assert_eq!(frame.payload.as_ref(), &b"1234"[..]); } #[test] @@ -378,7 +382,7 @@ mod tests { let frame = Frame::parse(&mut buf).unwrap().unwrap(); assert!(!frame.finished); assert_eq!(frame.opcode, OpCode::Text); - assert_eq!(frame.payload, &b"1234"[..]); + assert_eq!(frame.payload.as_ref(), &b"1234"[..]); } #[test] @@ -390,7 +394,7 @@ mod tests { let frame = Frame::parse(&mut buf).unwrap().unwrap(); assert!(!frame.finished); assert_eq!(frame.opcode, OpCode::Text); - assert_eq!(frame.payload, vec![1u8]); + assert_eq!(frame.payload, vec![1u8].into()); } #[test]