extern crate actix; use bytes::Bytes; use futures::sync::oneshot::{self, Sender}; use futures::{Async, Future, Poll, Stream}; use smallvec::SmallVec; use self::actix::dev::{ AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler, ToEnvelope, }; use self::actix::fut::ActorFuture; use self::actix::{ Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message as ActixMessage, SpawnHandle, }; use body::{Binary, Body}; use context::{ActorHttpContext, Drain, Frame as ContextFrame}; use error::{Error, ErrorInternalServerError, PayloadError}; use httprequest::HttpRequest; use ws::frame::Frame; use ws::proto::{CloseReason, OpCode}; use ws::{Message, ProtocolError, WsStream, WsWriter}; /// Execution context for `WebSockets` actors pub struct WebsocketContext where A: Actor>, { inner: ContextParts, stream: Option>, request: HttpRequest, disconnected: bool, } impl ActorContext for WebsocketContext where A: Actor, { fn stop(&mut self) { self.inner.stop(); } fn terminate(&mut self) { self.inner.terminate() } fn state(&self) -> ActorState { self.inner.state() } } impl AsyncContext for WebsocketContext where A: Actor, { fn spawn(&mut self, fut: F) -> SpawnHandle where F: ActorFuture + 'static, { self.inner.spawn(fut) } fn wait(&mut self, fut: F) where F: ActorFuture + 'static, { self.inner.wait(fut) } #[doc(hidden)] #[inline] fn waiting(&self) -> bool { self.inner.waiting() || self.inner.state() == ActorState::Stopping || self.inner.state() == ActorState::Stopped } fn cancel_future(&mut self, handle: SpawnHandle) -> bool { self.inner.cancel_future(handle) } #[inline] fn address(&self) -> Addr { self.inner.address() } } impl WebsocketContext where A: Actor, { #[inline] /// Create a new Websocket context from a request and an actor pub fn new

(req: HttpRequest, actor: A, stream: WsStream

) -> Body where A: StreamHandler, P: Stream + 'static, { let mb = Mailbox::default(); let mut ctx = WebsocketContext { inner: ContextParts::new(mb.sender_producer()), stream: None, request: req, disconnected: false, }; ctx.add_stream(stream); Body::Actor(Box::new(WebsocketContextFut::new(ctx, actor, mb))) } /// Create a new Websocket context pub fn with_factory(req: HttpRequest, stream: WsStream

, f: F) -> Body where F: FnOnce(&mut Self) -> A + 'static, A: StreamHandler, P: Stream + 'static, { let mb = Mailbox::default(); let mut ctx = WebsocketContext { inner: ContextParts::new(mb.sender_producer()), stream: None, request: req, disconnected: false, }; ctx.add_stream(stream); let act = f(&mut ctx); Body::Actor(Box::new(WebsocketContextFut::new(ctx, act, mb))) } } impl WebsocketContext where A: Actor, { /// Write payload #[inline] fn write(&mut self, data: Binary) { if !self.disconnected { if self.stream.is_none() { self.stream = Some(SmallVec::new()); } let stream = self.stream.as_mut().unwrap(); stream.push(ContextFrame::Chunk(Some(data))); } else { warn!("Trying to write to disconnected response"); } } /// Shared application state #[inline] pub fn state(&self) -> &S { self.request.state() } /// Incoming request #[inline] pub fn request(&mut self) -> &mut HttpRequest { &mut self.request } /// Returns drain future pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); self.add_frame(ContextFrame::Drain(tx)); Drain::new(rx) } /// Send text frame #[inline] pub fn text>(&mut self, text: T) { self.write(Frame::message(text.into(), OpCode::Text, true, false)); } /// Send binary frame #[inline] pub fn binary>(&mut self, data: B) { self.write(Frame::message(data, OpCode::Binary, true, false)); } /// Send ping frame #[inline] pub fn ping(&mut self, message: &str) { self.write(Frame::message( Vec::from(message), OpCode::Ping, true, false, )); } /// Send pong frame #[inline] pub fn pong(&mut self, message: &str) { self.write(Frame::message( Vec::from(message), OpCode::Pong, true, false, )); } /// Send close frame #[inline] pub fn close(&mut self, reason: Option) { self.write(Frame::close(reason, false)); } /// Check if connection still open #[inline] pub fn connected(&self) -> bool { !self.disconnected } #[inline] fn add_frame(&mut self, frame: ContextFrame) { if self.stream.is_none() { self.stream = Some(SmallVec::new()); } if let Some(s) = self.stream.as_mut() { s.push(frame) } } /// Handle of the running future /// /// SpawnHandle is the handle returned by `AsyncContext::spawn()` method. pub fn handle(&self) -> SpawnHandle { self.inner.curr_handle() } } impl WsWriter for WebsocketContext where A: Actor, S: 'static, { /// Send text frame #[inline] fn send_text>(&mut self, text: T) { self.text(text) } /// Send binary frame #[inline] fn send_binary>(&mut self, data: B) { self.binary(data) } /// Send ping frame #[inline] fn send_ping(&mut self, message: &str) { self.ping(message) } /// Send pong frame #[inline] fn send_pong(&mut self, message: &str) { self.pong(message) } /// Send close frame #[inline] fn send_close(&mut self, reason: Option) { self.close(reason) } } impl AsyncContextParts for WebsocketContext where A: Actor, { fn parts(&mut self) -> &mut ContextParts { &mut self.inner } } struct WebsocketContextFut where A: Actor>, { fut: ContextFut>, } impl WebsocketContextFut where A: Actor>, { fn new(ctx: WebsocketContext, act: A, mailbox: Mailbox) -> Self { let fut = ContextFut::new(ctx, act, mailbox); WebsocketContextFut { fut } } } impl ActorHttpContext for WebsocketContextFut where A: Actor>, S: 'static, { #[inline] fn disconnected(&mut self) { self.fut.ctx().disconnected = true; self.fut.ctx().stop(); } fn poll(&mut self) -> Poll>, Error> { if self.fut.alive() && self.fut.poll().is_err() { return Err(ErrorInternalServerError("error")); } // frames if let Some(data) = self.fut.ctx().stream.take() { Ok(Async::Ready(Some(data))) } else if self.fut.alive() { Ok(Async::NotReady) } else { Ok(Async::Ready(None)) } } } impl ToEnvelope for WebsocketContext where A: Actor> + Handler, M: ActixMessage + Send + 'static, M::Result: Send, { fn pack(msg: M, tx: Option>) -> Envelope { Envelope::new(msg, tx) } }