1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-18 13:51:50 +01:00

Add websockets continuation frame support

This commit is contained in:
Nikolay Kim 2019-12-12 14:06:54 +06:00
parent 4a1695f719
commit b4b3350b3e
7 changed files with 269 additions and 68 deletions

View File

@ -1,5 +1,11 @@
# Changes # Changes
## [1.0.0] - 2019-12-xx
### Added
* Add websockets continuation frame support
## [1.0.0-alpha.5] - 2019-12-09 ## [1.0.0-alpha.5] - 2019-12-09
### Fixed ### Fixed

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-http" name = "actix-http"
version = "1.0.0-alpha.6" version = "1.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http primitives" description = "Actix http primitives"
readme = "README.md" readme = "README.md"
@ -13,7 +13,6 @@ categories = ["network-programming", "asynchronous",
"web-programming::websocket"] "web-programming::websocket"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
edition = "2018" edition = "2018"
workspace = ".."
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["openssl", "rustls", "fail", "flate2-zlib", "secure-cookies"] features = ["openssl", "rustls", "fail", "flate2-zlib", "secure-cookies"]
@ -47,26 +46,26 @@ secure-cookies = ["ring"]
actix-service = "1.0.0" actix-service = "1.0.0"
actix-codec = "0.2.0" actix-codec = "0.2.0"
actix-connect = "1.0.0" actix-connect = "1.0.0"
actix-utils = "1.0.1" actix-utils = "1.0.3"
actix-rt = "1.0.0" actix-rt = "1.0.0"
actix-threadpool = "0.3.0" actix-threadpool = "0.3.1"
actix-tls = { version = "1.0.0", optional = true } actix-tls = { version = "1.0.0", optional = true }
base64 = "0.11" base64 = "0.11"
bitflags = "1.0" bitflags = "1.2"
bytes = "0.5.2" bytes = "0.5.2"
copyless = "0.1.4" copyless = "0.1.4"
chrono = "0.4.6" chrono = "0.4.6"
derive_more = "0.99.2" derive_more = "0.99.2"
either = "1.5.2" either = "1.5.3"
encoding_rs = "0.8" encoding_rs = "0.8"
futures = "0.3.1" futures = "0.3.1"
fxhash = "0.2.1" fxhash = "0.2.1"
h2 = "0.2.1" h2 = "0.2.1"
http = "0.2.0" http = "0.2.0"
httparse = "1.3" httparse = "1.3"
indexmap = "1.2" indexmap = "1.3"
lazy_static = "1.0" lazy_static = "1.4"
language-tags = "0.2" language-tags = "0.2"
log = "0.4" log = "0.4"
mime = "0.3" mime = "0.3"

View File

@ -12,6 +12,8 @@ pub enum Message {
Text(String), Text(String),
/// Binary message /// Binary message
Binary(Bytes), Binary(Bytes),
/// Continuation
Continuation(Item),
/// Ping message /// Ping message
Ping(Bytes), Ping(Bytes),
/// Pong message /// Pong message
@ -26,9 +28,11 @@ pub enum Message {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Frame { pub enum Frame {
/// Text frame, codec does not verify utf8 encoding /// Text frame, codec does not verify utf8 encoding
Text(Option<BytesMut>), Text(Bytes),
/// Binary frame /// Binary frame
Binary(Option<BytesMut>), Binary(Bytes),
/// Continuation
Continuation(Item),
/// Ping message /// Ping message
Ping(Bytes), Ping(Bytes),
/// Pong message /// Pong message
@ -37,11 +41,28 @@ pub enum Frame {
Close(Option<CloseReason>), Close(Option<CloseReason>),
} }
/// `WebSocket` continuation item
#[derive(Debug, PartialEq)]
pub enum Item {
FirstText(Bytes),
FirstBinary(Bytes),
Continue(Bytes),
Last(Bytes),
}
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
/// WebSockets protocol codec /// WebSockets protocol codec
pub struct Codec { pub struct Codec {
flags: Flags,
max_size: usize, max_size: usize,
server: bool, }
bitflags::bitflags! {
struct Flags: u8 {
const SERVER = 0b0000_0001;
const CONTINUATION = 0b0000_0010;
const W_CONTINUATION = 0b0000_0100;
}
} }
impl Codec { impl Codec {
@ -49,7 +70,7 @@ impl Codec {
pub fn new() -> Codec { pub fn new() -> Codec {
Codec { Codec {
max_size: 65_536, max_size: 65_536,
server: true, flags: Flags::SERVER,
} }
} }
@ -65,7 +86,7 @@ impl Codec {
/// ///
/// By default decoder works in server mode. /// By default decoder works in server mode.
pub fn client_mode(mut self) -> Self { pub fn client_mode(mut self) -> Self {
self.server = false; self.flags.remove(Flags::SERVER);
self self
} }
} }
@ -76,19 +97,94 @@ impl Encoder for Codec {
fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
match item { match item {
Message::Text(txt) => { Message::Text(txt) => Parser::write_message(
Parser::write_message(dst, txt, OpCode::Text, true, !self.server) dst,
txt,
OpCode::Text,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Binary(bin) => Parser::write_message(
dst,
bin,
OpCode::Binary,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Ping(txt) => Parser::write_message(
dst,
txt,
OpCode::Ping,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Pong(txt) => Parser::write_message(
dst,
txt,
OpCode::Pong,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Close(reason) => {
Parser::write_close(dst, reason, !self.flags.contains(Flags::SERVER))
} }
Message::Binary(bin) => { Message::Continuation(cont) => match cont {
Parser::write_message(dst, bin, OpCode::Binary, true, !self.server) Item::FirstText(data) => {
} if self.flags.contains(Flags::W_CONTINUATION) {
Message::Ping(txt) => { return Err(ProtocolError::ContinuationStarted);
Parser::write_message(dst, txt, OpCode::Ping, true, !self.server) } else {
} self.flags.insert(Flags::W_CONTINUATION);
Message::Pong(txt) => { Parser::write_message(
Parser::write_message(dst, txt, OpCode::Pong, true, !self.server) dst,
} &data[..],
Message::Close(reason) => Parser::write_close(dst, reason, !self.server), OpCode::Binary,
false,
!self.flags.contains(Flags::SERVER),
)
}
}
Item::FirstBinary(data) => {
if self.flags.contains(Flags::W_CONTINUATION) {
return Err(ProtocolError::ContinuationStarted);
} else {
self.flags.insert(Flags::W_CONTINUATION);
Parser::write_message(
dst,
&data[..],
OpCode::Text,
false,
!self.flags.contains(Flags::SERVER),
)
}
}
Item::Continue(data) => {
if self.flags.contains(Flags::W_CONTINUATION) {
Parser::write_message(
dst,
&data[..],
OpCode::Continue,
false,
!self.flags.contains(Flags::SERVER),
)
} else {
return Err(ProtocolError::ContinuationNotStarted);
}
}
Item::Last(data) => {
if self.flags.contains(Flags::W_CONTINUATION) {
self.flags.remove(Flags::W_CONTINUATION);
Parser::write_message(
dst,
&data[..],
OpCode::Continue,
true,
!self.flags.contains(Flags::SERVER),
)
} else {
return Err(ProtocolError::ContinuationNotStarted);
}
}
},
Message::Nop => (), Message::Nop => (),
} }
Ok(()) Ok(())
@ -100,15 +196,64 @@ impl Decoder for Codec {
type Error = ProtocolError; type Error = ProtocolError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match Parser::parse(src, self.server, self.max_size) { match Parser::parse(src, self.flags.contains(Flags::SERVER), self.max_size) {
Ok(Some((finished, opcode, payload))) => { Ok(Some((finished, opcode, payload))) => {
// continuation is not supported // continuation is not supported
if !finished { if !finished {
return Err(ProtocolError::NoContinuation); return match opcode {
OpCode::Continue => {
if self.flags.contains(Flags::CONTINUATION) {
Ok(Some(Frame::Continuation(Item::Continue(
payload
.map(|pl| pl.freeze())
.unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationNotStarted)
}
}
OpCode::Binary => {
if !self.flags.contains(Flags::CONTINUATION) {
self.flags.insert(Flags::CONTINUATION);
Ok(Some(Frame::Continuation(Item::FirstBinary(
payload
.map(|pl| pl.freeze())
.unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationStarted)
}
}
OpCode::Text => {
if !self.flags.contains(Flags::CONTINUATION) {
self.flags.insert(Flags::CONTINUATION);
Ok(Some(Frame::Continuation(Item::FirstText(
payload
.map(|pl| pl.freeze())
.unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationStarted)
}
}
_ => {
error!("Unfinished fragment {:?}", opcode);
Err(ProtocolError::ContinuationFragment(opcode))
}
};
} }
match opcode { match opcode {
OpCode::Continue => Err(ProtocolError::NoContinuation), OpCode::Continue => {
if self.flags.contains(Flags::CONTINUATION) {
self.flags.remove(Flags::CONTINUATION);
Ok(Some(Frame::Continuation(Item::Last(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationNotStarted)
}
}
OpCode::Bad => Err(ProtocolError::BadOpCode), OpCode::Bad => Err(ProtocolError::BadOpCode),
OpCode::Close => { OpCode::Close => {
if let Some(ref pl) = payload { if let Some(ref pl) = payload {
@ -118,29 +263,18 @@ impl Decoder for Codec {
Ok(Some(Frame::Close(None))) Ok(Some(Frame::Close(None)))
} }
} }
OpCode::Ping => { OpCode::Ping => Ok(Some(Frame::Ping(
if let Some(pl) = payload { payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
Ok(Some(Frame::Ping(pl.freeze()))) ))),
} else { OpCode::Pong => Ok(Some(Frame::Pong(
Ok(Some(Frame::Ping(Bytes::new()))) payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
} ))),
} OpCode::Binary => Ok(Some(Frame::Binary(
OpCode::Pong => { payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
if let Some(pl) = payload { ))),
Ok(Some(Frame::Pong(pl.freeze()))) OpCode::Text => Ok(Some(Frame::Text(
} else { payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
Ok(Some(Frame::Pong(Bytes::new()))) ))),
}
}
OpCode::Binary => Ok(Some(Frame::Binary(payload))),
OpCode::Text => {
Ok(Some(Frame::Text(payload)))
//let tmp = Vec::from(payload.as_ref());
//match String::from_utf8(tmp) {
// Ok(s) => Ok(Some(Message::Text(s))),
// Err(_) => Err(ProtocolError::BadEncoding),
//}
}
} }
} }
Ok(None) => Ok(None), Ok(None) => Ok(None),

View File

@ -1,6 +1,6 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, BytesMut};
use log::debug; use log::debug;
use rand; use rand;
@ -154,14 +154,14 @@ impl Parser {
} }
/// Generate binary representation /// Generate binary representation
pub fn write_message<B: Into<Bytes>>( pub fn write_message<B: AsRef<[u8]>>(
dst: &mut BytesMut, dst: &mut BytesMut,
pl: B, pl: B,
op: OpCode, op: OpCode,
fin: bool, fin: bool,
mask: bool, mask: bool,
) { ) {
let payload = pl.into(); let payload = pl.as_ref();
let one: u8 = if fin { let one: u8 = if fin {
0x80 | Into::<u8>::into(op) 0x80 | Into::<u8>::into(op)
} else { } else {

View File

@ -18,7 +18,7 @@ mod frame;
mod mask; mod mask;
mod proto; mod proto;
pub use self::codec::{Codec, Frame, Message}; pub use self::codec::{Codec, Frame, Item, Message};
pub use self::dispatcher::Dispatcher; pub use self::dispatcher::Dispatcher;
pub use self::frame::Parser; pub use self::frame::Parser;
pub use self::proto::{hash_key, CloseCode, CloseReason, OpCode}; pub use self::proto::{hash_key, CloseCode, CloseReason, OpCode};
@ -44,12 +44,15 @@ pub enum ProtocolError {
/// A payload reached size limit. /// A payload reached size limit.
#[display(fmt = "A payload reached size limit.")] #[display(fmt = "A payload reached size limit.")]
Overflow, Overflow,
/// Continuation is not supported /// Continuation is not started
#[display(fmt = "Continuation is not supported.")] #[display(fmt = "Continuation is not started.")]
NoContinuation, ContinuationNotStarted,
/// Bad utf-8 encoding /// Received new continuation but it is already started
#[display(fmt = "Bad utf-8 encoding.")] #[display(fmt = "Received new continuation but it is already started")]
BadEncoding, ContinuationStarted,
/// Unknown continuation fragment
#[display(fmt = "Unknown continuation fragment.")]
ContinuationFragment(OpCode),
/// Io error /// Io error
#[display(fmt = "io error: {}", _0)] #[display(fmt = "io error: {}", _0)]
Io(io::Error), Io(io::Error),

View File

@ -2,7 +2,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::{body, h1, ws, Error, HttpService, Request, Response}; use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
use actix_http_test::TestServer; use actix_http_test::TestServer;
use actix_utils::framed::Dispatcher; use actix_utils::framed::Dispatcher;
use bytes::BytesMut; use bytes::Bytes;
use futures::future; use futures::future;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
@ -25,9 +25,10 @@ async fn service(msg: ws::Frame) -> Result<ws::Message, Error> {
let msg = match msg { let msg = match msg {
ws::Frame::Ping(msg) => ws::Message::Pong(msg), ws::Frame::Ping(msg) => ws::Message::Pong(msg),
ws::Frame::Text(text) => { ws::Frame::Text(text) => {
ws::Message::Text(String::from_utf8_lossy(&text.unwrap()).to_string()) ws::Message::Text(String::from_utf8_lossy(&text).to_string())
} }
ws::Frame::Binary(bin) => ws::Message::Binary(bin.unwrap().freeze()), ws::Frame::Binary(bin) => ws::Message::Binary(bin),
ws::Frame::Continuation(item) => ws::Message::Continuation(item),
ws::Frame::Close(reason) => ws::Message::Close(reason), ws::Frame::Close(reason) => ws::Message::Close(reason),
_ => panic!(), _ => panic!(),
}; };
@ -52,7 +53,7 @@ async fn test_simple() {
let (item, mut framed) = framed.into_future().await; let (item, mut framed) = framed.into_future().await;
assert_eq!( assert_eq!(
item.unwrap().unwrap(), item.unwrap().unwrap(),
ws::Frame::Text(Some(BytesMut::from("text"))) ws::Frame::Text(Bytes::from_static(b"text"))
); );
framed framed
@ -62,7 +63,7 @@ async fn test_simple() {
let (item, mut framed) = framed.into_future().await; let (item, mut framed) = framed.into_future().await;
assert_eq!( assert_eq!(
item.unwrap().unwrap(), item.unwrap().unwrap(),
ws::Frame::Binary(Some(BytesMut::from(&b"text"[..]).into())) ws::Frame::Binary(Bytes::from_static(&b"text"[..]))
); );
framed.send(ws::Message::Ping("text".into())).await.unwrap(); framed.send(ws::Message::Ping("text".into())).await.unwrap();
@ -72,6 +73,61 @@ async fn test_simple() {
ws::Frame::Pong("text".to_string().into()) ws::Frame::Pong("text".to_string().into())
); );
framed
.send(ws::Message::Continuation(ws::Item::FirstText(
"text".into(),
)))
.await
.unwrap();
let (item, mut framed) = framed.into_future().await;
assert_eq!(
item.unwrap().unwrap(),
ws::Frame::Continuation(ws::Item::FirstText(Bytes::from_static(b"text")))
);
assert!(framed
.send(ws::Message::Continuation(ws::Item::FirstText(
"text".into()
)))
.await
.is_err());
assert!(framed
.send(ws::Message::Continuation(ws::Item::FirstBinary(
"text".into()
)))
.await
.is_err());
framed
.send(ws::Message::Continuation(ws::Item::Continue("text".into())))
.await
.unwrap();
let (item, mut framed) = framed.into_future().await;
assert_eq!(
item.unwrap().unwrap(),
ws::Frame::Continuation(ws::Item::Continue(Bytes::from_static(b"text")))
);
framed
.send(ws::Message::Continuation(ws::Item::Last("text".into())))
.await
.unwrap();
let (item, mut framed) = framed.into_future().await;
assert_eq!(
item.unwrap().unwrap(),
ws::Frame::Continuation(ws::Item::Last(Bytes::from_static(b"text")))
);
assert!(framed
.send(ws::Message::Continuation(ws::Item::Continue("text".into())))
.await
.is_err());
assert!(framed
.send(ws::Message::Continuation(ws::Item::Last("text".into())))
.await
.is_err());
framed framed
.send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) .send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
.await .await

View File

@ -880,14 +880,17 @@ mod tests {
bytes: bytes, bytes: bytes,
pos: 0, pos: 0,
ready: false, ready: false,
} };
} }
} }
impl Stream for SlowStream { impl Stream for SlowStream {
type Item = Result<Bytes, PayloadError>; type Item = Result<Bytes, PayloadError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut(); let this = self.get_mut();
if !this.ready { if !this.ready {
this.ready = true; this.ready = true;