1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-12-18 01:43:58 +01:00

simplify ws stream interface

This commit is contained in:
Nikolay Kim 2019-03-17 22:31:10 -07:00
parent 6ab7665868
commit b0343eb22d
2 changed files with 34 additions and 15 deletions

View File

@ -12,9 +12,9 @@ use actix::{
Message as ActixMessage, SpawnHandle, Message as ActixMessage, SpawnHandle,
}; };
use actix_codec::{Decoder, Encoder}; use actix_codec::{Decoder, Encoder};
use actix_http::ws::hash_key; use actix_http::ws::{hash_key, Codec};
pub use actix_http::ws::{ pub use actix_http::ws::{
CloseCode, CloseReason, Codec, Frame, HandshakeError, Message, ProtocolError, CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError,
}; };
use actix_web::dev::{Head, HttpResponseBuilder}; use actix_web::dev::{Head, HttpResponseBuilder};
@ -28,7 +28,7 @@ use futures::{Async, Future, Poll, Stream};
/// Do websocket handshake and start ws actor. /// Do websocket handshake and start ws actor.
pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error> pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error>
where where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Frame, ProtocolError>, A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
T: Stream<Item = Bytes, Error = PayloadError> + 'static, T: Stream<Item = Bytes, Error = PayloadError> + 'static,
{ {
let mut res = handshake(req)?; let mut res = handshake(req)?;
@ -170,7 +170,7 @@ where
/// Create a new Websocket context from a request and an actor /// Create a new Websocket context from a request and an actor
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Bytes, Error = Error> pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Bytes, Error = Error>
where where
A: StreamHandler<Frame, ProtocolError>, A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static, S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{ {
let mb = Mailbox::default(); let mb = Mailbox::default();
@ -190,7 +190,7 @@ where
) -> impl Stream<Item = Bytes, Error = Error> ) -> impl Stream<Item = Bytes, Error = Error>
where where
F: FnOnce(&mut Self) -> A + 'static, F: FnOnce(&mut Self) -> A + 'static,
A: StreamHandler<Frame, ProtocolError>, A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static, S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{ {
let mb = Mailbox::default(); let mb = Mailbox::default();
@ -368,7 +368,7 @@ impl<S> Stream for WsStream<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Bytes, Error = PayloadError>,
{ {
type Item = Frame; type Item = Message;
type Error = ProtocolError; type Error = ProtocolError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -401,7 +401,28 @@ where
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
Some(frm) => Ok(Async::Ready(Some(frm))), Some(frm) => {
let msg = match frm {
Frame::Text(data) => {
if let Some(data) = data {
Message::Text(
std::str::from_utf8(&data)
.map_err(|_| ProtocolError::BadEncoding)?
.to_string(),
)
} else {
Message::Text(String::new())
}
}
Frame::Binary(data) => Message::Binary(
data.map(|b| b.freeze()).unwrap_or_else(|| Bytes::new()),
),
Frame::Ping(s) => Message::Ping(s),
Frame::Pong(s) => Message::Pong(s),
Frame::Close(reason) => Message::Close(reason),
};
Ok(Async::Ready(Some(msg)))
}
} }
} }
} }

View File

@ -12,15 +12,13 @@ impl Actor for Ws {
type Context = ws::WebsocketContext<Self>; type Context = ws::WebsocketContext<Self>;
} }
impl StreamHandler<ws::Frame, ws::ProtocolError> for Ws { impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
fn handle(&mut self, msg: ws::Frame, ctx: &mut Self::Context) { fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg { match msg {
ws::Frame::Ping(msg) => ctx.pong(&msg), ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Frame::Text(text) => { ws::Message::Text(text) => ctx.text(text),
ctx.text(String::from_utf8_lossy(&text.unwrap())).to_owned() ws::Message::Binary(bin) => ctx.binary(bin),
} ws::Message::Close(reason) => ctx.close(reason),
ws::Frame::Binary(bin) => ctx.binary(bin.unwrap()),
ws::Frame::Close(reason) => ctx.close(reason),
_ => (), _ => (),
} }
} }