mirror of
https://github.com/actix/actix-extras.git
synced 2025-06-26 18:37:41 +02:00
adopt actix-ws crate (#361)
This commit is contained in:
183
actix-ws/src/fut.rs
Normal file
183
actix-ws/src/fut.rs
Normal file
@ -0,0 +1,183 @@
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
future::poll_fn,
|
||||
io,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{Decoder, Encoder};
|
||||
use actix_http::{
|
||||
ws::{Codec, Frame, Message, ProtocolError},
|
||||
Payload,
|
||||
};
|
||||
use actix_web::{
|
||||
web::{Bytes, BytesMut},
|
||||
Error,
|
||||
};
|
||||
use futures_core::stream::Stream;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
/// A response body for Websocket HTTP Requests
|
||||
pub struct StreamingBody {
|
||||
session_rx: Receiver<Message>,
|
||||
|
||||
messages: VecDeque<Message>,
|
||||
buf: BytesMut,
|
||||
codec: Codec,
|
||||
closing: bool,
|
||||
}
|
||||
|
||||
/// A stream of Messages from a websocket client
|
||||
///
|
||||
/// Messages can be accessed via the stream's `.next()` method
|
||||
pub struct MessageStream {
|
||||
payload: Payload,
|
||||
|
||||
messages: VecDeque<Message>,
|
||||
buf: BytesMut,
|
||||
codec: Codec,
|
||||
closing: bool,
|
||||
}
|
||||
|
||||
impl StreamingBody {
|
||||
pub(super) fn new(session_rx: Receiver<Message>) -> Self {
|
||||
StreamingBody {
|
||||
session_rx,
|
||||
messages: VecDeque::new(),
|
||||
buf: BytesMut::new(),
|
||||
codec: Codec::new(),
|
||||
closing: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageStream {
|
||||
pub(super) fn new(payload: Payload) -> Self {
|
||||
MessageStream {
|
||||
payload,
|
||||
messages: VecDeque::new(),
|
||||
buf: BytesMut::new(),
|
||||
codec: Codec::new(),
|
||||
closing: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the next item from the message stream
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// while let Some(Ok(msg)) = stream.recv().await {
|
||||
/// // handle message
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn recv(&mut self) -> Option<Result<Message, ProtocolError>> {
|
||||
poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for StreamingBody {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
if this.closing {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
loop {
|
||||
match Pin::new(&mut this.session_rx).poll_recv(cx) {
|
||||
Poll::Ready(Some(msg)) => {
|
||||
this.messages.push_back(msg);
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
this.closing = true;
|
||||
break;
|
||||
}
|
||||
Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(msg) = this.messages.pop_front() {
|
||||
if let Err(e) = this.codec.encode(msg, &mut this.buf) {
|
||||
return Poll::Ready(Some(Err(e.into())));
|
||||
}
|
||||
}
|
||||
|
||||
if !this.buf.is_empty() {
|
||||
return Poll::Ready(Some(Ok(this.buf.split().freeze())));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MessageStream {
|
||||
type Item = Result<Message, ProtocolError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// Return the first message in the queue if one exists
|
||||
//
|
||||
// This is faster than polling and parsing
|
||||
if let Some(msg) = this.messages.pop_front() {
|
||||
return Poll::Ready(Some(Ok(msg)));
|
||||
}
|
||||
|
||||
if !this.closing {
|
||||
// Read in bytes until there's nothing left to read
|
||||
loop {
|
||||
match Pin::new(&mut this.payload).poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(bytes))) => {
|
||||
this.buf.extend_from_slice(&bytes);
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
return Poll::Ready(Some(Err(ProtocolError::Io(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
e.to_string(),
|
||||
)))));
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
this.closing = true;
|
||||
break;
|
||||
}
|
||||
Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create messages until there's no more bytes left
|
||||
while let Some(frame) = this.codec.decode(&mut this.buf)? {
|
||||
let message = match frame {
|
||||
Frame::Text(bytes) => {
|
||||
let s = std::str::from_utf8(&bytes)
|
||||
.map_err(|e| {
|
||||
ProtocolError::Io(io::Error::new(io::ErrorKind::Other, e.to_string()))
|
||||
})?
|
||||
.to_string();
|
||||
Message::Text(s.into())
|
||||
}
|
||||
Frame::Binary(bytes) => Message::Binary(bytes),
|
||||
Frame::Ping(bytes) => Message::Ping(bytes),
|
||||
Frame::Pong(bytes) => Message::Pong(bytes),
|
||||
Frame::Close(reason) => Message::Close(reason),
|
||||
Frame::Continuation(item) => Message::Continuation(item),
|
||||
};
|
||||
|
||||
this.messages.push_back(message);
|
||||
}
|
||||
|
||||
// Return the first message in the queue
|
||||
if let Some(msg) = this.messages.pop_front() {
|
||||
return Poll::Ready(Some(Ok(msg)));
|
||||
}
|
||||
|
||||
// If we've exhausted our message queue and we're closing, close the stream
|
||||
if this.closing {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
84
actix-ws/src/lib.rs
Normal file
84
actix-ws/src/lib.rs
Normal file
@ -0,0 +1,84 @@
|
||||
//! WebSockets for Actix Web, without actors.
|
||||
//!
|
||||
//! For usage, see documentation on [`handle()`].
|
||||
|
||||
#![deny(rust_2018_idioms, nonstandard_style, future_incompatible)]
|
||||
#![warn(missing_docs)]
|
||||
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
|
||||
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
||||
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||
|
||||
pub use actix_http::ws::{CloseCode, CloseReason, Message, ProtocolError};
|
||||
use actix_http::{
|
||||
body::{BodyStream, MessageBody},
|
||||
ws::handshake,
|
||||
};
|
||||
use actix_web::{web, HttpRequest, HttpResponse};
|
||||
use tokio::sync::mpsc::channel;
|
||||
|
||||
mod fut;
|
||||
mod session;
|
||||
|
||||
pub use self::{
|
||||
fut::{MessageStream, StreamingBody},
|
||||
session::{Closed, Session},
|
||||
};
|
||||
|
||||
/// Begin handling websocket traffic
|
||||
///
|
||||
/// ```no_run
|
||||
/// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
|
||||
/// use actix_ws::Message;
|
||||
/// use futures::stream::StreamExt as _;
|
||||
///
|
||||
/// async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> {
|
||||
/// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
|
||||
///
|
||||
/// actix_rt::spawn(async move {
|
||||
/// while let Some(Ok(msg)) = msg_stream.next().await {
|
||||
/// match msg {
|
||||
/// Message::Ping(bytes) => {
|
||||
/// if session.pong(&bytes).await.is_err() {
|
||||
/// return;
|
||||
/// }
|
||||
/// }
|
||||
/// Message::Text(s) => println!("Got text, {}", s),
|
||||
/// _ => break,
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// let _ = session.close(None).await;
|
||||
/// });
|
||||
///
|
||||
/// Ok(response)
|
||||
/// }
|
||||
///
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() -> Result<(), anyhow::Error> {
|
||||
/// HttpServer::new(move || {
|
||||
/// App::new()
|
||||
/// .wrap(Logger::default())
|
||||
/// .route("/ws", web::get().to(ws))
|
||||
/// })
|
||||
/// .bind("127.0.0.1:8080")?
|
||||
/// .run()
|
||||
/// .await?;
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn handle(
|
||||
req: &HttpRequest,
|
||||
body: web::Payload,
|
||||
) -> Result<(HttpResponse, Session, MessageStream), actix_web::Error> {
|
||||
let mut response = handshake(req.head())?;
|
||||
let (tx, rx) = channel(32);
|
||||
|
||||
Ok((
|
||||
response
|
||||
.message_body(BodyStream::new(StreamingBody::new(rx)).boxed())?
|
||||
.into(),
|
||||
Session::new(tx),
|
||||
MessageStream::new(body.into_inner()),
|
||||
))
|
||||
}
|
143
actix-ws/src/session.rs
Normal file
143
actix-ws/src/session.rs
Normal file
@ -0,0 +1,143 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
use actix_http::ws::{CloseReason, Message};
|
||||
use actix_web::web::Bytes;
|
||||
use bytestring::ByteString;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
/// A handle into the websocket session.
|
||||
///
|
||||
/// This type can be used to send messages into the websocket.
|
||||
#[derive(Clone)]
|
||||
pub struct Session {
|
||||
inner: Option<Sender<Message>>,
|
||||
closed: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
/// The error representing a closed websocket session
|
||||
#[derive(Debug)]
|
||||
pub struct Closed;
|
||||
|
||||
impl std::fmt::Display for Closed {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Session is closed")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Closed {}
|
||||
|
||||
impl Session {
|
||||
pub(super) fn new(inner: Sender<Message>) -> Self {
|
||||
Session {
|
||||
inner: Some(inner),
|
||||
closed: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
fn pre_check(&mut self) {
|
||||
if self.closed.load(Ordering::Relaxed) {
|
||||
self.inner.take();
|
||||
}
|
||||
}
|
||||
|
||||
/// Send text into the websocket
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// if session.text("Some text").await.is_err() {
|
||||
/// // session closed
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn text(&mut self, msg: impl Into<ByteString>) -> Result<(), Closed> {
|
||||
self.pre_check();
|
||||
if let Some(inner) = self.inner.as_mut() {
|
||||
inner
|
||||
.send(Message::Text(msg.into()))
|
||||
.await
|
||||
.map_err(|_| Closed)
|
||||
} else {
|
||||
Err(Closed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Send raw bytes into the websocket
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// if session.binary(b"some bytes").await.is_err() {
|
||||
/// // session closed
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn binary(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
|
||||
self.pre_check();
|
||||
if let Some(inner) = self.inner.as_mut() {
|
||||
inner
|
||||
.send(Message::Binary(msg.into()))
|
||||
.await
|
||||
.map_err(|_| Closed)
|
||||
} else {
|
||||
Err(Closed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Ping the client
|
||||
///
|
||||
/// For many applications, it will be important to send regular pings to keep track of if the
|
||||
/// client has disconnected
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// if session.ping(b"").await.is_err() {
|
||||
/// // session is closed
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
|
||||
self.pre_check();
|
||||
if let Some(inner) = self.inner.as_mut() {
|
||||
inner
|
||||
.send(Message::Ping(Bytes::copy_from_slice(msg)))
|
||||
.await
|
||||
.map_err(|_| Closed)
|
||||
} else {
|
||||
Err(Closed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Pong the client
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// match msg {
|
||||
/// Message::Ping(bytes) => {
|
||||
/// let _ = session.pong(&bytes).await;
|
||||
/// }
|
||||
/// _ => (),
|
||||
/// }
|
||||
pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
|
||||
self.pre_check();
|
||||
if let Some(inner) = self.inner.as_mut() {
|
||||
inner
|
||||
.send(Message::Pong(Bytes::copy_from_slice(msg)))
|
||||
.await
|
||||
.map_err(|_| Closed)
|
||||
} else {
|
||||
Err(Closed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a close message, and consume the session
|
||||
///
|
||||
/// All clones will return `Err(Closed)` if used after this call
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// session.close(None).await
|
||||
/// ```
|
||||
pub async fn close(mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
|
||||
self.pre_check();
|
||||
if let Some(inner) = self.inner.take() {
|
||||
self.closed.store(true, Ordering::Relaxed);
|
||||
inner.send(Message::Close(reason)).await.map_err(|_| Closed)
|
||||
} else {
|
||||
Err(Closed)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user