1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-30 08:24:28 +02:00

migrate actix-web-actors

This commit is contained in:
Nikolay Kim
2019-12-15 22:45:38 +06:00
parent a791aab418
commit a153374b61
7 changed files with 213 additions and 198 deletions

View File

@ -1,4 +1,6 @@
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix::dev::{
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope,
@ -7,10 +9,10 @@ use actix::fut::ActorFuture;
use actix::{
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
};
use actix_web::error::{Error, ErrorInternalServerError};
use actix_web::error::Error;
use bytes::Bytes;
use futures::sync::oneshot::Sender;
use futures::{Async, Future, Poll, Stream};
use futures::channel::oneshot::Sender;
use futures::{Future, Stream};
/// Execution context for http actors
pub struct HttpContext<A>
@ -43,7 +45,7 @@ where
#[inline]
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
F: ActorFuture<Output = (), Actor = A> + 'static,
{
self.inner.spawn(fut)
}
@ -51,7 +53,7 @@ where
#[inline]
fn wait<F>(&mut self, fut: F)
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
F: ActorFuture<Output = (), Actor = A> + 'static,
{
self.inner.wait(fut)
}
@ -81,7 +83,7 @@ where
{
#[inline]
/// Create a new HTTP Context from a request and an actor
pub fn create(actor: A) -> impl Stream<Item = Bytes, Error = Error> {
pub fn create(actor: A) -> impl Stream<Item = Result<Bytes, Error>> {
let mb = Mailbox::default();
let ctx = HttpContext {
inner: ContextParts::new(mb.sender_producer()),
@ -91,7 +93,7 @@ where
}
/// Create a new HTTP Context
pub fn with_factory<F>(f: F) -> impl Stream<Item = Bytes, Error = Error>
pub fn with_factory<F>(f: F) -> impl Stream<Item = Result<Bytes, Error>>
where
F: FnOnce(&mut Self) -> A + 'static,
{
@ -160,24 +162,23 @@ impl<A> Stream for HttpContextFut<A>
where
A: Actor<Context = HttpContext<A>>,
{
type Item = Bytes;
type Error = Error;
type Item = Result<Bytes, Error>;
fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.fut.alive() {
match self.fut.poll() {
Ok(Async::NotReady) | Ok(Async::Ready(())) => (),
Err(_) => return Err(ErrorInternalServerError("error")),
}
let _ = Pin::new(&mut self.fut).poll(cx);
}
// frames
if let Some(data) = self.fut.ctx().stream.pop_front() {
Ok(Async::Ready(data))
Poll::Ready(data.map(|b| Ok(b)))
} else if self.fut.alive() {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(None))
Poll::Ready(None)
}
}
}
@ -199,9 +200,9 @@ mod tests {
use actix::Actor;
use actix_web::http::StatusCode;
use actix_web::test::{block_on, call_service, init_service, TestRequest};
use actix_web::test::{call_service, init_service, read_body, TestRequest};
use actix_web::{web, App, HttpResponse};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use super::*;
@ -223,31 +224,25 @@ mod tests {
if self.count > 3 {
ctx.write_eof()
} else {
ctx.write(Bytes::from(format!("LINE-{}", self.count).as_bytes()));
ctx.write(Bytes::from(format!("LINE-{}", self.count)));
ctx.run_later(Duration::from_millis(100), |slf, ctx| slf.write(ctx));
}
}
}
#[test]
fn test_default_resource() {
#[actix_rt::test]
async fn test_default_resource() {
let mut srv =
init_service(App::new().service(web::resource("/test").to(|| {
HttpResponse::Ok().streaming(HttpContext::create(MyActor { count: 0 }))
})));
})))
.await;
let req = TestRequest::with_uri("/test").to_request();
let mut resp = call_service(&mut srv, req);
let resp = call_service(&mut srv, req).await;
assert_eq!(resp.status(), StatusCode::OK);
let body = block_on(resp.take_body().fold(
BytesMut::new(),
move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, Error>(body)
},
))
.unwrap();
assert_eq!(body.freeze(), Bytes::from_static(b"LINE-1LINE-2LINE-3"));
let body = read_body(resp).await;
assert_eq!(body, Bytes::from_static(b"LINE-1LINE-2LINE-3"));
}
}

View File

@ -1,6 +1,8 @@
//! Websocket integration
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix::dev::{
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler,
@ -16,20 +18,20 @@ use actix_http::ws::{hash_key, Codec};
pub use actix_http::ws::{
CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError,
};
use actix_web::dev::HttpResponseBuilder;
use actix_web::error::{Error, ErrorInternalServerError, PayloadError};
use actix_web::error::{Error, PayloadError};
use actix_web::http::{header, Method, StatusCode};
use actix_web::{HttpRequest, HttpResponse};
use bytes::{Bytes, BytesMut};
use futures::sync::oneshot::Sender;
use futures::{Async, Future, Poll, Stream};
use futures::channel::oneshot::Sender;
use futures::{Future, Stream};
/// Do websocket handshake and start ws actor.
pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error>
where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
A: Actor<Context = WebsocketContext<A>>
+ StreamHandler<Result<Message, ProtocolError>>,
T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake(req)?;
Ok(res.streaming(WebsocketContext::create(actor, stream)))
@ -52,8 +54,9 @@ pub fn start_with_addr<A, T>(
stream: T,
) -> Result<(Addr<A>, HttpResponse), Error>
where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
A: Actor<Context = WebsocketContext<A>>
+ StreamHandler<Result<Message, ProtocolError>>,
T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake(req)?;
let (addr, out_stream) = WebsocketContext::create_with_addr(actor, stream);
@ -70,8 +73,9 @@ pub fn start_with_protocols<A, T>(
stream: T,
) -> Result<HttpResponse, Error>
where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
A: Actor<Context = WebsocketContext<A>>
+ StreamHandler<Result<Message, ProtocolError>>,
T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake_with_protocols(req, protocols)?;
Ok(res.streaming(WebsocketContext::create(actor, stream)))
@ -202,14 +206,14 @@ where
{
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
F: ActorFuture<Output = (), Actor = A> + 'static,
{
self.inner.spawn(fut)
}
fn wait<F>(&mut self, fut: F)
where
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
F: ActorFuture<Output = (), Actor = A> + 'static,
{
self.inner.wait(fut)
}
@ -238,10 +242,10 @@ where
{
#[inline]
/// Create a new Websocket context from a request and an actor
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Bytes, Error = Error>
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Result<Bytes, Error>>
where
A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
A: StreamHandler<Result<Message, ProtocolError>>,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let (_, stream) = WebsocketContext::create_with_addr(actor, stream);
stream
@ -256,10 +260,10 @@ where
pub fn create_with_addr<S>(
actor: A,
stream: S,
) -> (Addr<A>, impl Stream<Item = Bytes, Error = Error>)
) -> (Addr<A>, impl Stream<Item = Result<Bytes, Error>>)
where
A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
A: StreamHandler<Result<Message, ProtocolError>>,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mb = Mailbox::default();
let mut ctx = WebsocketContext {
@ -279,10 +283,10 @@ where
actor: A,
stream: S,
codec: Codec,
) -> impl Stream<Item = Bytes, Error = Error>
) -> impl Stream<Item = Result<Bytes, Error>>
where
A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
A: StreamHandler<Result<Message, ProtocolError>>,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mb = Mailbox::default();
let mut ctx = WebsocketContext {
@ -298,11 +302,11 @@ where
pub fn with_factory<S, F>(
stream: S,
f: F,
) -> impl Stream<Item = Bytes, Error = Error>
) -> impl Stream<Item = Result<Bytes, Error>>
where
F: FnOnce(&mut Self) -> A + 'static,
A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
A: StreamHandler<Result<Message, ProtocolError>>,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mb = Mailbox::default();
let mut ctx = WebsocketContext {
@ -346,14 +350,14 @@ where
/// Send ping frame
#[inline]
pub fn ping(&mut self, message: &str) {
self.write_raw(Message::Ping(message.to_string()));
pub fn ping(&mut self, message: &[u8]) {
self.write_raw(Message::Ping(Bytes::copy_from_slice(message)));
}
/// Send pong frame
#[inline]
pub fn pong(&mut self, message: &str) {
self.write_raw(Message::Pong(message.to_string()));
pub fn pong(&mut self, message: &[u8]) {
self.write_raw(Message::Pong(Bytes::copy_from_slice(message)));
}
/// Send close frame
@ -415,30 +419,34 @@ impl<A> Stream for WebsocketContextFut<A>
where
A: Actor<Context = WebsocketContext<A>>,
{
type Item = Bytes;
type Error = Error;
type Item = Result<Bytes, Error>;
fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
if self.fut.alive() && self.fut.poll().is_err() {
return Err(ErrorInternalServerError("error"));
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.fut.alive() {
let _ = Pin::new(&mut this.fut).poll(cx);
}
// encode messages
while let Some(item) = self.fut.ctx().messages.pop_front() {
while let Some(item) = this.fut.ctx().messages.pop_front() {
if let Some(msg) = item {
self.encoder.encode(msg, &mut self.buf)?;
this.encoder.encode(msg, &mut this.buf)?;
} else {
self.closed = true;
this.closed = true;
break;
}
}
if !self.buf.is_empty() {
Ok(Async::Ready(Some(self.buf.take().freeze())))
} else if self.fut.alive() && !self.closed {
Ok(Async::NotReady)
if !this.buf.is_empty() {
Poll::Ready(Some(Ok(this.buf.split().freeze())))
} else if this.fut.alive() && !this.closed {
Poll::Pending
} else {
Ok(Async::Ready(None))
Poll::Ready(None)
}
}
}
@ -454,7 +462,9 @@ where
}
}
#[pin_project::pin_project]
struct WsStream<S> {
#[pin]
stream: S,
decoder: Codec,
buf: BytesMut,
@ -463,7 +473,7 @@ struct WsStream<S> {
impl<S> WsStream<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
S: Stream<Item = Result<Bytes, PayloadError>>,
{
fn new(stream: S, codec: Codec) -> Self {
Self {
@ -477,62 +487,64 @@ where
impl<S> Stream for WsStream<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
S: Stream<Item = Result<Bytes, PayloadError>>,
{
type Item = Message;
type Error = ProtocolError;
type Item = Result<Message, ProtocolError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if !self.closed {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
if !*this.closed {
loop {
match self.stream.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.buf.extend_from_slice(&chunk[..]);
this = self.as_mut().project();
match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
this.buf.extend_from_slice(&chunk[..]);
}
Ok(Async::Ready(None)) => {
self.closed = true;
Poll::Ready(None) => {
*this.closed = true;
break;
}
Ok(Async::NotReady) => break,
Err(e) => {
return Err(ProtocolError::Io(io::Error::new(
io::ErrorKind::Other,
format!("{}", e),
)));
Poll::Pending => break,
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(ProtocolError::Io(
io::Error::new(io::ErrorKind::Other, format!("{}", e)),
))));
}
}
}
}
match self.decoder.decode(&mut self.buf)? {
match this.decoder.decode(this.buf)? {
None => {
if self.closed {
Ok(Async::Ready(None))
if *this.closed {
Poll::Ready(None)
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
Some(frm) => {
let msg = match frm {
Frame::Text(data) => {
if let Some(data) = data {
Message::Text(
std::str::from_utf8(&data)
.map_err(|_| ProtocolError::BadEncoding)?
.to_string(),
)
} else {
Message::Text(String::new())
}
}
Frame::Binary(data) => Message::Binary(
data.map(|b| b.freeze()).unwrap_or_else(Bytes::new),
Frame::Text(data) => Message::Text(
std::str::from_utf8(&data)
.map_err(|e| {
ProtocolError::Io(io::Error::new(
io::ErrorKind::Other,
format!("{}", e),
))
})?
.to_string(),
),
Frame::Binary(data) => Message::Binary(data),
Frame::Ping(s) => Message::Ping(s),
Frame::Pong(s) => Message::Pong(s),
Frame::Close(reason) => Message::Close(reason),
Frame::Continuation(item) => Message::Continuation(item),
};
Ok(Async::Ready(Some(msg)))
Poll::Ready(Some(Ok(msg)))
}
}
}