From 4c5a63965e72afc838d8a6d4e9346ff33420451d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 4 Jul 2018 17:04:23 +0600 Subject: [PATCH] use new actix context api --- Cargo.toml | 2 - src/context.rs | 79 +++++++++++++++++++------------- src/lib.rs | 1 - src/pipeline.rs | 69 ---------------------------- src/ws/context.rs | 113 ++++++++++++++++++++++++++++------------------ src/ws/mod.rs | 6 +-- 6 files changed, 119 insertions(+), 151 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a2aea4fd..fa243bc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,8 +104,6 @@ tokio-tls = { version="0.1", optional = true } openssl = { version="0.10", optional = true } tokio-openssl = { version="0.2", optional = true } -backtrace="*" - [dev-dependencies] env_logger = "0.5" serde_derive = "1.0" diff --git a/src/context.rs b/src/context.rs index 601cfe58..d13cd417 100644 --- a/src/context.rs +++ b/src/context.rs @@ -6,7 +6,9 @@ use futures::{Async, Future, Poll}; use smallvec::SmallVec; use std::marker::PhantomData; -use self::actix::dev::{ContextImpl, Envelope, ToEnvelope}; +use self::actix::dev::{ + AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope, +}; use self::actix::fut::ActorFuture; use self::actix::{ Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, @@ -41,7 +43,7 @@ pub struct HttpContext where A: Actor>, { - inner: ContextImpl, + inner: ContextParts, stream: Option>, request: HttpRequest, disconnected: bool, @@ -103,30 +105,32 @@ where { #[inline] /// Create a new HTTP Context from a request and an actor - pub fn new(req: HttpRequest, actor: A) -> HttpContext { - HttpContext { - inner: ContextImpl::new(Some(actor)), + pub fn new(req: HttpRequest, actor: A) -> Body { + let mb = Mailbox::default(); + let ctx = HttpContext { + inner: ContextParts::new(mb.sender_producer()), stream: None, request: req, disconnected: false, - } + }; + Body::Actor(Box::new(HttpContextFut::new(ctx, actor, mb))) } /// Create a new HTTP Context - pub fn with_factory(req: HttpRequest, f: F) -> HttpContext + pub fn with_factory(req: HttpRequest, f: F) -> Body where F: FnOnce(&mut Self) -> A + 'static, { + let mb = Mailbox::default(); let mut ctx = HttpContext { - inner: ContextImpl::new(None), + inner: ContextParts::new(mb.sender_producer()), stream: None, request: req, disconnected: false, }; let act = f(&mut ctx); - ctx.inner.set_actor(act); - ctx + Body::Actor(Box::new(HttpContextFut::new(ctx, act, mb))) } } @@ -165,7 +169,6 @@ where /// Returns drain future pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); - self.inner.modify(); self.add_frame(Frame::Drain(tx)); Drain::new(rx) } @@ -184,7 +187,6 @@ where if let Some(s) = self.stream.as_mut() { s.push(frame) } - self.inner.modify(); } /// Handle of the running future @@ -195,32 +197,55 @@ where } } -impl ActorHttpContext for HttpContext +impl AsyncContextParts for HttpContext where A: Actor, +{ + fn parts(&mut self) -> &mut ContextParts { + &mut self.inner + } +} + +struct HttpContextFut +where + A: Actor>, +{ + fut: ContextFut>, +} + +impl HttpContextFut +where + A: Actor>, +{ + fn new(ctx: HttpContext, act: A, mailbox: Mailbox) -> Self { + let fut = ContextFut::new(ctx, act, mailbox); + HttpContextFut { fut } + } +} + +impl ActorHttpContext for HttpContextFut +where + A: Actor>, S: 'static, { #[inline] fn disconnected(&mut self) { - self.disconnected = true; - self.stop(); + self.fut.ctx().disconnected = true; + self.fut.ctx().stop(); } fn poll(&mut self) -> Poll>, Error> { - let ctx: &mut HttpContext = - unsafe { &mut *(self as &mut HttpContext as *mut _) }; - - if self.inner.alive() { - match self.inner.poll(ctx) { + if self.fut.alive() { + match self.fut.poll() { Ok(Async::NotReady) | Ok(Async::Ready(())) => (), Err(_) => return Err(ErrorInternalServerError("error")), } } // frames - if let Some(data) = self.stream.take() { + if let Some(data) = self.fut.ctx().stream.take() { Ok(Async::Ready(Some(data))) - } else if self.inner.alive() { + } else if self.fut.alive() { Ok(Async::NotReady) } else { Ok(Async::Ready(None)) @@ -239,16 +264,6 @@ where } } -impl From> for Body -where - A: Actor>, - S: 'static, -{ - fn from(ctx: HttpContext) -> Body { - Body::Actor(Box::new(ctx)) - } -} - /// Consume a future pub struct Drain { fut: oneshot::Receiver<()>, diff --git a/src/lib.rs b/src/lib.rs index 5ed1bcef..218cabf9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,7 +84,6 @@ allow(decimal_literal_representation, suspicious_arithmetic_impl) )] #![warn(missing_docs)] -#![allow(unused_mut, unused_imports, unused_variables, dead_code)] #[macro_use] extern crate log; diff --git a/src/pipeline.rs b/src/pipeline.rs index 6f3d4807..0ba25806 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -780,72 +780,3 @@ impl Completed { } } } - -#[cfg(test)] -mod tests { - use super::*; - use actix::*; - use context::HttpContext; - use futures::future::{lazy, result}; - use tokio::runtime::current_thread::Runtime; - - use test::TestRequest; - - impl PipelineState { - fn is_none(&self) -> Option { - if let PipelineState::None = *self { - Some(true) - } else { - None - } - } - fn completed(self) -> Option> { - if let PipelineState::Completed(c) = self { - Some(c) - } else { - None - } - } - } - - struct MyActor; - impl Actor for MyActor { - type Context = HttpContext; - } - - #[test] - fn test_completed() { - Runtime::new() - .unwrap() - .block_on(lazy(|| { - let req = TestRequest::default().finish(); - let mut info = PipelineInfo::new(req); - Completed::<(), Inner<()>>::init(&mut info) - .is_none() - .unwrap(); - - let req = TestRequest::default().finish(); - let ctx = HttpContext::new(req.clone(), MyActor); - let addr = ctx.address(); - let mut info = PipelineInfo::new(req); - info.context = Some(Box::new(ctx)); - let mut state = Completed::<(), Inner<()>>::init(&mut info) - .completed() - .unwrap(); - - assert!(state.poll(&mut info).is_none()); - let pp = - Pipeline(info, PipelineState::Completed(state), Rc::new(Vec::new())); - assert!(!pp.is_done()); - - let Pipeline(mut info, st, _) = pp; - let mut st = st.completed().unwrap(); - drop(addr); - - assert!(st.poll(&mut info).unwrap().is_none().unwrap()); - - result(Ok::<_, ()>(())) - })) - .unwrap(); - } -} diff --git a/src/ws/context.rs b/src/ws/context.rs index 34346f1e..91a23e0f 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -1,30 +1,35 @@ extern crate actix; +use bytes::Bytes; use futures::sync::oneshot::{self, Sender}; -use futures::{Async, Poll}; +use futures::{Async, Future, Poll, Stream}; use smallvec::SmallVec; -use self::actix::dev::{ContextImpl, Envelope, ToEnvelope}; +use self::actix::dev::{ + AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler, + ToEnvelope, +}; use self::actix::fut::ActorFuture; use self::actix::{ - Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, + Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, + Message as ActixMessage, SpawnHandle, }; use body::{Binary, Body}; use context::{ActorHttpContext, Drain, Frame as ContextFrame}; -use error::{Error, ErrorInternalServerError}; +use error::{Error, ErrorInternalServerError, PayloadError}; use httprequest::HttpRequest; use ws::frame::Frame; use ws::proto::{CloseReason, OpCode}; -use ws::WsWriter; +use ws::{Message, ProtocolError, WsStream, WsWriter}; /// Execution context for `WebSockets` actors pub struct WebsocketContext where A: Actor>, { - inner: ContextImpl, + inner: ContextParts, stream: Option>, request: HttpRequest, disconnected: bool, @@ -87,30 +92,41 @@ where { #[inline] /// Create a new Websocket context from a request and an actor - pub fn new(req: HttpRequest, actor: A) -> WebsocketContext { - WebsocketContext { - inner: ContextImpl::new(Some(actor)), - stream: None, - request: req, - disconnected: false, - } - } - - /// Create a new Websocket context - pub fn with_factory(req: HttpRequest, f: F) -> Self + pub fn new

(req: HttpRequest, actor: A, stream: WsStream

) -> Body where - F: FnOnce(&mut Self) -> A + 'static, + A: StreamHandler, + P: Stream + 'static, { + let mb = Mailbox::default(); let mut ctx = WebsocketContext { - inner: ContextImpl::new(None), + inner: ContextParts::new(mb.sender_producer()), stream: None, request: req, disconnected: false, }; + ctx.add_stream(stream); + + Body::Actor(Box::new(WebsocketContextFut::new(ctx, actor, mb))) + } + + /// Create a new Websocket context + pub fn with_factory(req: HttpRequest, stream: WsStream

, f: F) -> Body + where + F: FnOnce(&mut Self) -> A + 'static, + A: StreamHandler, + P: Stream + 'static, + { + let mb = Mailbox::default(); + let mut ctx = WebsocketContext { + inner: ContextParts::new(mb.sender_producer()), + stream: None, + request: req, + disconnected: false, + }; + ctx.add_stream(stream); let act = f(&mut ctx); - ctx.inner.set_actor(act); - ctx + Body::Actor(Box::new(WebsocketContextFut::new(ctx, act, mb))) } } @@ -127,7 +143,6 @@ where } let stream = self.stream.as_mut().unwrap(); stream.push(ContextFrame::Chunk(Some(data))); - self.inner.modify(); } else { warn!("Trying to write to disconnected response"); } @@ -148,7 +163,6 @@ where /// Returns drain future pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); - self.inner.modify(); self.add_frame(ContextFrame::Drain(tx)); Drain::new(rx) } @@ -207,7 +221,6 @@ where if let Some(s) = self.stream.as_mut() { s.push(frame) } - self.inner.modify(); } /// Handle of the running future @@ -254,28 +267,52 @@ where } } -impl ActorHttpContext for WebsocketContext +impl AsyncContextParts for WebsocketContext where A: Actor, +{ + fn parts(&mut self) -> &mut ContextParts { + &mut self.inner + } +} + +struct WebsocketContextFut +where + A: Actor>, +{ + fut: ContextFut>, +} + +impl WebsocketContextFut +where + A: Actor>, +{ + fn new(ctx: WebsocketContext, act: A, mailbox: Mailbox) -> Self { + let fut = ContextFut::new(ctx, act, mailbox); + WebsocketContextFut { fut } + } +} + +impl ActorHttpContext for WebsocketContextFut +where + A: Actor>, S: 'static, { #[inline] fn disconnected(&mut self) { - self.disconnected = true; - self.stop(); + self.fut.ctx().disconnected = true; + self.fut.ctx().stop(); } fn poll(&mut self) -> Poll>, Error> { - let ctx: &mut WebsocketContext = unsafe { &mut *(self as *mut _) }; - - if self.inner.alive() && self.inner.poll(ctx).is_err() { + if self.fut.alive() && self.fut.poll().is_err() { return Err(ErrorInternalServerError("error")); } // frames - if let Some(data) = self.stream.take() { + if let Some(data) = self.fut.ctx().stream.take() { Ok(Async::Ready(Some(data))) - } else if self.inner.alive() { + } else if self.fut.alive() { Ok(Async::NotReady) } else { Ok(Async::Ready(None)) @@ -286,20 +323,10 @@ where impl ToEnvelope for WebsocketContext where A: Actor> + Handler, - M: Message + Send + 'static, + M: ActixMessage + Send + 'static, M::Result: Send, { fn pack(msg: M, tx: Option>) -> Envelope { Envelope::new(msg, tx) } } - -impl From> for Body -where - A: Actor>, - S: 'static, -{ - fn from(ctx: WebsocketContext) -> Body { - Body::Actor(Box::new(ctx)) - } -} diff --git a/src/ws/mod.rs b/src/ws/mod.rs index bc99414d..63b7ab0a 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -179,10 +179,8 @@ where let mut resp = handshake(req)?; let stream = WsStream::new(req.payload()); - let mut ctx = WebsocketContext::new(req.clone(), actor); - ctx.add_stream(stream); - - Ok(resp.body(ctx)) + let body = WebsocketContext::new(req.clone(), actor, stream); + Ok(resp.body(body)) } /// Prepare `WebSocket` handshake response.