1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-30 18:44:35 +01:00
actix-web/actix-web-actors/src/context.rs

249 lines
5.9 KiB
Rust
Raw Normal View History

2019-03-17 21:47:20 +01:00
use std::collections::VecDeque;
2019-12-15 17:45:38 +01:00
use std::pin::Pin;
use std::task::{Context, Poll};
2019-03-17 21:47:20 +01:00
use actix::dev::{
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope,
};
use actix::fut::ActorFuture;
use actix::{
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
};
2019-12-15 17:45:38 +01:00
use actix_web::error::Error;
2019-03-17 21:47:20 +01:00
use bytes::Bytes;
2019-12-15 17:45:38 +01:00
use futures::channel::oneshot::Sender;
use futures::{Future, Stream};
2019-03-17 21:47:20 +01:00
/// Execution context for http actors
pub struct HttpContext<A>
where
A: Actor<Context = HttpContext<A>>,
{
inner: ContextParts<A>,
stream: VecDeque<Option<Bytes>>,
}
impl<A> ActorContext for HttpContext<A>
where
A: Actor<Context = Self>,
{
fn stop(&mut self) {
self.inner.stop();
}
fn terminate(&mut self) {
self.inner.terminate()
}
fn state(&self) -> ActorState {
self.inner.state()
}
}
impl<A> AsyncContext<A> for HttpContext<A>
where
A: Actor<Context = Self>,
{
#[inline]
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where
2019-12-15 17:45:38 +01:00
F: ActorFuture<Output = (), Actor = A> + 'static,
2019-03-17 21:47:20 +01:00
{
self.inner.spawn(fut)
}
#[inline]
fn wait<F>(&mut self, fut: F)
where
2019-12-15 17:45:38 +01:00
F: ActorFuture<Output = (), Actor = A> + 'static,
2019-03-17 21:47:20 +01:00
{
self.inner.wait(fut)
}
#[doc(hidden)]
#[inline]
fn waiting(&self) -> bool {
self.inner.waiting()
|| self.inner.state() == ActorState::Stopping
|| self.inner.state() == ActorState::Stopped
}
#[inline]
fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.inner.cancel_future(handle)
}
#[inline]
fn address(&self) -> Addr<A> {
self.inner.address()
}
}
impl<A> HttpContext<A>
where
A: Actor<Context = Self>,
{
#[inline]
/// Create a new HTTP Context from a request and an actor
2019-12-15 17:45:38 +01:00
pub fn create(actor: A) -> impl Stream<Item = Result<Bytes, Error>> {
2019-03-17 21:47:20 +01:00
let mb = Mailbox::default();
let ctx = HttpContext {
inner: ContextParts::new(mb.sender_producer()),
stream: VecDeque::new(),
};
HttpContextFut::new(ctx, actor, mb)
}
/// Create a new HTTP Context
2019-12-15 17:45:38 +01:00
pub fn with_factory<F>(f: F) -> impl Stream<Item = Result<Bytes, Error>>
2019-03-17 21:47:20 +01:00
where
F: FnOnce(&mut Self) -> A + 'static,
{
let mb = Mailbox::default();
let mut ctx = HttpContext {
inner: ContextParts::new(mb.sender_producer()),
stream: VecDeque::new(),
};
let act = f(&mut ctx);
HttpContextFut::new(ctx, act, mb)
}
}
impl<A> HttpContext<A>
where
A: Actor<Context = Self>,
{
/// Write payload
#[inline]
pub fn write(&mut self, data: Bytes) {
self.stream.push_back(Some(data));
}
/// Indicate end of streaming payload. Also this method calls `Self::close`.
#[inline]
pub fn write_eof(&mut self) {
self.stream.push_back(None);
}
/// Handle of the running future
///
/// SpawnHandle is the handle returned by `AsyncContext::spawn()` method.
pub fn handle(&self) -> SpawnHandle {
self.inner.curr_handle()
}
}
impl<A> AsyncContextParts<A> for HttpContext<A>
where
A: Actor<Context = Self>,
{
fn parts(&mut self) -> &mut ContextParts<A> {
&mut self.inner
}
}
struct HttpContextFut<A>
where
A: Actor<Context = HttpContext<A>>,
{
fut: ContextFut<A, HttpContext<A>>,
}
impl<A> HttpContextFut<A>
where
A: Actor<Context = HttpContext<A>>,
{
fn new(ctx: HttpContext<A>, act: A, mailbox: Mailbox<A>) -> Self {
let fut = ContextFut::new(ctx, act, mailbox);
HttpContextFut { fut }
}
}
impl<A> Stream for HttpContextFut<A>
where
A: Actor<Context = HttpContext<A>>,
{
2019-12-15 17:45:38 +01:00
type Item = Result<Bytes, Error>;
2019-03-17 21:47:20 +01:00
2019-12-15 17:45:38 +01:00
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
2019-03-17 21:47:20 +01:00
if self.fut.alive() {
2019-12-15 17:45:38 +01:00
let _ = Pin::new(&mut self.fut).poll(cx);
2019-03-17 21:47:20 +01:00
}
// frames
if let Some(data) = self.fut.ctx().stream.pop_front() {
2020-05-07 20:33:29 +02:00
Poll::Ready(data.map(Ok))
2019-03-17 21:47:20 +01:00
} else if self.fut.alive() {
2019-12-15 17:45:38 +01:00
Poll::Pending
2019-03-17 21:47:20 +01:00
} else {
2019-12-15 17:45:38 +01:00
Poll::Ready(None)
2019-03-17 21:47:20 +01:00
}
}
}
impl<A, M> ToEnvelope<A, M> for HttpContext<A>
where
A: Actor<Context = HttpContext<A>> + Handler<M>,
M: Message + Send + 'static,
M::Result: Send,
{
fn pack(msg: M, tx: Option<Sender<M::Result>>) -> Envelope<A> {
Envelope::new(msg, tx)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use actix::Actor;
use actix_web::http::StatusCode;
2019-12-15 17:45:38 +01:00
use actix_web::test::{call_service, init_service, read_body, TestRequest};
2019-03-18 06:02:03 +01:00
use actix_web::{web, App, HttpResponse};
2019-12-15 17:45:38 +01:00
use bytes::Bytes;
2019-03-17 21:47:20 +01:00
use super::*;
struct MyActor {
count: usize,
}
impl Actor for MyActor {
type Context = HttpContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_later(Duration::from_millis(100), |slf, ctx| slf.write(ctx));
}
}
impl MyActor {
fn write(&mut self, ctx: &mut HttpContext<Self>) {
self.count += 1;
if self.count > 3 {
ctx.write_eof()
} else {
2019-12-15 17:45:38 +01:00
ctx.write(Bytes::from(format!("LINE-{}", self.count)));
2019-03-17 21:47:20 +01:00
ctx.run_later(Duration::from_millis(100), |slf, ctx| slf.write(ctx));
}
}
}
2019-12-15 17:45:38 +01:00
#[actix_rt::test]
async fn test_default_resource() {
2019-03-17 21:47:20 +01:00
let mut srv =
init_service(App::new().service(web::resource("/test").to(|| {
HttpResponse::Ok().streaming(HttpContext::create(MyActor { count: 0 }))
2019-12-15 17:45:38 +01:00
})))
.await;
2019-03-17 21:47:20 +01:00
let req = TestRequest::with_uri("/test").to_request();
2019-12-15 17:45:38 +01:00
let resp = call_service(&mut srv, req).await;
2019-03-17 21:47:20 +01:00
assert_eq!(resp.status(), StatusCode::OK);
2019-12-15 17:45:38 +01:00
let body = read_body(resp).await;
assert_eq!(body, Bytes::from_static(b"LINE-1LINE-2LINE-3"));
2019-03-17 21:47:20 +01:00
}
}