1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-08-31 08:57:00 +02:00

refactor ws frame parser

This commit is contained in:
Nikolay Kim
2018-02-26 13:58:23 -08:00
parent 56ae565688
commit 644f1a9518
11 changed files with 304 additions and 310 deletions

View File

@@ -43,15 +43,16 @@
//! # .finish();
//! # }
//! ```
use bytes::BytesMut;
use bytes::Bytes;
use http::{Method, StatusCode, header};
use futures::{Async, Poll, Stream};
use byteorder::{ByteOrder, NetworkEndian};
use actix::{Actor, AsyncContext, Handler};
use body::Binary;
use payload::Payload;
use error::{Error, WsHandshakeError};
use payload::PayloadHelper;
use error::{Error, WsHandshakeError, PayloadError};
use httprequest::HttpRequest;
use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
@@ -80,8 +81,7 @@ pub enum Message {
Binary(Binary),
Ping(String),
Pong(String),
Close,
Closed,
Close(CloseCode),
Error
}
@@ -165,104 +165,67 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponseBuilder, WsHands
}
/// Maps `Payload` stream into stream of `ws::Message` items
pub struct WsStream {
rx: Payload,
buf: BytesMut,
pub struct WsStream<S> {
rx: PayloadHelper<S>,
closed: bool,
error_sent: bool,
}
impl WsStream {
pub fn new(payload: Payload) -> WsStream {
WsStream { rx: payload,
buf: BytesMut::new(),
closed: false,
error_sent: false }
impl<S> WsStream<S> where S: Stream<Item=Bytes, Error=PayloadError> {
pub fn new(stream: S) -> WsStream<S> {
WsStream { rx: PayloadHelper::new(stream),
closed: false }
}
}
impl Stream for WsStream {
impl<S> Stream for WsStream<S> where S: Stream<Item=Bytes, Error=PayloadError> {
type Item = Message;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut done = false;
if self.closed {
return Ok(Async::Ready(None))
}
if !self.closed {
loop {
match self.rx.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.buf.extend_from_slice(&chunk)
}
Ok(Async::Ready(None)) => {
done = true;
match Frame::parse(&mut self.rx, true) {
Ok(Async::Ready(Some(frame))) => {
// trace!("WsFrame {}", frame);
let (_finished, opcode, payload) = frame.unpack();
match opcode {
OpCode::Continue => unimplemented!(),
OpCode::Bad =>
Ok(Async::Ready(Some(Message::Error))),
OpCode::Close => {
self.closed = true;
break;
}
Ok(Async::NotReady) => break,
Err(_) => {
self.closed = true;
break;
let code = NetworkEndian::read_uint(payload.as_ref(), 2) as u16;
Ok(Async::Ready(
Some(Message::Close(CloseCode::from(code)))))
},
OpCode::Ping =>
Ok(Async::Ready(Some(
Message::Ping(
String::from_utf8_lossy(payload.as_ref()).into())))),
OpCode::Pong =>
Ok(Async::Ready(Some(
Message::Pong(String::from_utf8_lossy(payload.as_ref()).into())))),
OpCode::Binary =>
Ok(Async::Ready(Some(Message::Binary(payload)))),
OpCode::Text => {
let tmp = Vec::from(payload.as_ref());
match String::from_utf8(tmp) {
Ok(s) =>
Ok(Async::Ready(Some(Message::Text(s)))),
Err(_) =>
Ok(Async::Ready(Some(Message::Error))),
}
}
}
}
}
loop {
match Frame::parse(&mut self.buf, true) {
Ok(Some(frame)) => {
// trace!("WsFrame {}", frame);
let (_finished, opcode, payload) = frame.unpack();
match opcode {
OpCode::Continue => continue,
OpCode::Bad =>
return Ok(Async::Ready(Some(Message::Error))),
OpCode::Close => {
self.closed = true;
self.error_sent = true;
return Ok(Async::Ready(Some(Message::Closed)))
},
OpCode::Ping =>
return Ok(Async::Ready(Some(
Message::Ping(
String::from_utf8_lossy(payload.as_ref()).into())))),
OpCode::Pong =>
return Ok(Async::Ready(Some(
Message::Pong(
String::from_utf8_lossy(payload.as_ref()).into())))),
OpCode::Binary =>
return Ok(Async::Ready(Some(Message::Binary(payload)))),
OpCode::Text => {
let tmp = Vec::from(payload.as_ref());
match String::from_utf8(tmp) {
Ok(s) =>
return Ok(Async::Ready(Some(Message::Text(s)))),
Err(_) =>
return Ok(Async::Ready(Some(Message::Error))),
}
}
}
}
Ok(None) => {
if done {
return Ok(Async::Ready(None))
} else if self.closed {
if !self.error_sent {
self.error_sent = true;
return Ok(Async::Ready(Some(Message::Closed)))
} else {
return Ok(Async::Ready(None))
}
} else {
return Ok(Async::NotReady)
}
},
Err(_) => {
self.closed = true;
self.error_sent = true;
return Ok(Async::Ready(Some(Message::Error)));
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => {
self.closed = true;
Ok(Async::Ready(Some(Message::Error)))
}
}
}