diff --git a/actix-web-actors/src/ws.rs b/actix-web-actors/src/ws.rs index b5f5c08c2..546326272 100644 --- a/actix-web-actors/src/ws.rs +++ b/actix-web-actors/src/ws.rs @@ -12,9 +12,9 @@ use actix::{ Message as ActixMessage, SpawnHandle, }; use actix_codec::{Decoder, Encoder}; -use actix_http::ws::hash_key; +use actix_http::ws::{hash_key, Codec}; pub use actix_http::ws::{ - CloseCode, CloseReason, Codec, Frame, HandshakeError, Message, ProtocolError, + CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError, }; use actix_web::dev::{Head, HttpResponseBuilder}; @@ -28,7 +28,7 @@ use futures::{Async, Future, Poll, Stream}; /// Do websocket handshake and start ws actor. pub fn start(actor: A, req: &HttpRequest, stream: T) -> Result where - A: Actor> + StreamHandler, + A: Actor> + StreamHandler, T: Stream + 'static, { let mut res = handshake(req)?; @@ -170,7 +170,7 @@ where /// Create a new Websocket context from a request and an actor pub fn create(actor: A, stream: S) -> impl Stream where - A: StreamHandler, + A: StreamHandler, S: Stream + 'static, { let mb = Mailbox::default(); @@ -190,7 +190,7 @@ where ) -> impl Stream where F: FnOnce(&mut Self) -> A + 'static, - A: StreamHandler, + A: StreamHandler, S: Stream + 'static, { let mb = Mailbox::default(); @@ -368,7 +368,7 @@ impl Stream for WsStream where S: Stream, { - type Item = Frame; + type Item = Message; type Error = ProtocolError; fn poll(&mut self) -> Poll, Self::Error> { @@ -401,7 +401,28 @@ where Ok(Async::NotReady) } } - Some(frm) => Ok(Async::Ready(Some(frm))), + Some(frm) => { + let msg = match frm { + Frame::Text(data) => { + if let Some(data) = data { + Message::Text( + std::str::from_utf8(&data) + .map_err(|_| ProtocolError::BadEncoding)? + .to_string(), + ) + } else { + Message::Text(String::new()) + } + } + Frame::Binary(data) => Message::Binary( + data.map(|b| b.freeze()).unwrap_or_else(|| Bytes::new()), + ), + Frame::Ping(s) => Message::Ping(s), + Frame::Pong(s) => Message::Pong(s), + Frame::Close(reason) => Message::Close(reason), + }; + Ok(Async::Ready(Some(msg))) + } } } } diff --git a/actix-web-actors/tests/test_ws.rs b/actix-web-actors/tests/test_ws.rs index ea9c8d8fe..202d562ca 100644 --- a/actix-web-actors/tests/test_ws.rs +++ b/actix-web-actors/tests/test_ws.rs @@ -12,15 +12,13 @@ impl Actor for Ws { type Context = ws::WebsocketContext; } -impl StreamHandler for Ws { - fn handle(&mut self, msg: ws::Frame, ctx: &mut Self::Context) { +impl StreamHandler for Ws { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { match msg { - ws::Frame::Ping(msg) => ctx.pong(&msg), - ws::Frame::Text(text) => { - ctx.text(String::from_utf8_lossy(&text.unwrap())).to_owned() - } - ws::Frame::Binary(bin) => ctx.binary(bin.unwrap()), - ws::Frame::Close(reason) => ctx.close(reason), + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Text(text) => ctx.text(text), + ws::Message::Binary(bin) => ctx.binary(bin), + ws::Message::Close(reason) => ctx.close(reason), _ => (), } }