From 30bdf9cb5ebed33207fa8ec4de1196e31f2e7d88 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 01:13:06 -0800 Subject: [PATCH] update actix api --- src/context.rs | 21 +++++++++------------ src/pipeline.rs | 2 +- src/server/srv.rs | 20 ++++++++++---------- src/test.rs | 4 ++-- src/ws/client.rs | 4 ++-- src/ws/context.rs | 20 +++++++++----------- 6 files changed, 33 insertions(+), 38 deletions(-) diff --git a/src/context.rs b/src/context.rs index 19a2b4a0..28cf5d7d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -6,9 +6,9 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Address, SyncAddress, Handler, ResponseType, MessageResult, SpawnHandle}; + Addr, Handler, ResponseType, MessageResult, SpawnHandle, Syn, Unsync}; use actix::fut::ActorFuture; -use actix::dev::{ContextImpl, Envelope, ToEnvelope, RemoteEnvelope}; +use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; use error::{Error, ErrorInternalServerError}; @@ -83,12 +83,12 @@ impl AsyncContext for HttpContext where A: Actor } #[doc(hidden)] #[inline] - fn local_address(&mut self) -> Address { + fn unsync_address(&mut self) -> Addr> { self.inner.unsync_address() } #[doc(hidden)] #[inline] - fn sync_address(&mut self) -> SyncAddress { + fn sync_address(&mut self) -> Addr> { self.inner.sync_address() } } @@ -205,15 +205,12 @@ impl ActorHttpContext for HttpContext where A: Actor, } } -impl ToEnvelope for HttpContext - where A: Actor>, +impl ToEnvelope, M> for HttpContext + where A: Actor> + Handler, + M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, { - #[inline] - fn pack(msg: M, tx: Option>>) -> Envelope - where A: Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send - { - RemoteEnvelope::envelope(msg, tx).into() + fn pack(msg: M, tx: Option>>) -> Syn { + Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) } } diff --git a/src/pipeline.rs b/src/pipeline.rs index d1328ad9..babd9219 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -739,7 +739,7 @@ mod tests { let req = HttpRequest::default(); let mut ctx = HttpContext::new(req.clone(), MyActor); - let addr: Address<_> = ctx.address(); + let addr: Addr> = ctx.address(); let mut info = PipelineInfo::new(req); info.context = Some(Box::new(ctx)); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap(); diff --git a/src/server/srv.rs b/src/server/srv.rs index 79f90774..3cd6da12 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -36,12 +36,12 @@ pub struct HttpServer where H: IntoHttpHandler + 'static host: Option, keep_alive: Option, factory: Arc Vec + Send + Sync>, - workers: Vec>>, + workers: Vec>>>, sockets: HashMap, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, shutdown_timeout: u16, - signals: Option>, + signals: Option>>, no_signals: bool, } @@ -146,7 +146,7 @@ impl HttpServer where H: IntoHttpHandler + 'static } /// Set alternative address for `ProcessSignals` actor. - pub fn signals(mut self, addr: SyncAddress) -> Self { + pub fn signals(mut self, addr: Addr>) -> Self { self.signals = Some(addr); self } @@ -227,7 +227,7 @@ impl HttpServer where H: IntoHttpHandler + 'static } // subscribe to os signals - fn subscribe_to_signals(&self) -> Option> { + fn subscribe_to_signals(&self) -> Option>> { if !self.no_signals { if let Some(ref signals) = self.signals { Some(signals.clone()) @@ -269,7 +269,7 @@ impl HttpServer /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// } /// ``` - pub fn start(mut self) -> SyncAddress + pub fn start(mut self) -> Addr> { if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called before start()"); @@ -288,9 +288,9 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: SyncAddress<_> = Actor::start(self); + let addr: Addr> = Actor::start(self); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + signal::Subscribe(addr.clone().subscriber()))); addr } } @@ -407,7 +407,7 @@ impl HttpServer /// Start listening for incoming connections from a stream. /// /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(mut self, stream: S, secure: bool) -> SyncAddress + pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr> where S: Stream + 'static, T: AsyncRead + AsyncWrite + 'static, A: 'static @@ -435,7 +435,7 @@ impl HttpServer // start server let signals = self.subscribe_to_signals(); - let addr: SyncAddress<_> = HttpServer::create(move |ctx| { + let addr: Addr> = HttpServer::create(move |ctx| { ctx.add_message_stream( stream .map_err(|_| ()) @@ -443,7 +443,7 @@ impl HttpServer self }); signals.map(|signals| signals.send( - signal::Subscribe(addr.clone().into()))); + signal::Subscribe(addr.clone().subscriber()))); addr } } diff --git a/src/test.rs b/src/test.rs index bc8da075..c98929f0 100644 --- a/src/test.rs +++ b/src/test.rs @@ -6,7 +6,7 @@ use std::sync::mpsc; use std::str::FromStr; use std::collections::HashMap; -use actix::{Arbiter, SyncAddress, System, SystemRunner, msgs}; +use actix::{Arbiter, Addr, Syn, System, SystemRunner, msgs}; use cookie::Cookie; use http::{Uri, Method, Version, HeaderMap, HttpTryFrom}; use http::header::{HeaderName, HeaderValue}; @@ -56,7 +56,7 @@ pub struct TestServer { addr: net::SocketAddr, thread: Option>, system: SystemRunner, - server_sys: SyncAddress, + server_sys: Addr>, } impl TestServer { diff --git a/src/ws/client.rs b/src/ws/client.rs index 8b4837c1..7800ab02 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -103,7 +103,7 @@ pub struct WsClient { http_err: Option, origin: Option, protocols: Option, - conn: Address, + conn: Addr>, } impl WsClient { @@ -114,7 +114,7 @@ impl WsClient { } /// Create new websocket connection with custom `ClientConnector` - pub fn with_connector>(uri: S, conn: Address) -> WsClient { + pub fn with_connector>(uri: S, conn: Addr>) -> WsClient { let mut cl = WsClient { request: ClientRequest::build(), err: None, diff --git a/src/ws/context.rs b/src/ws/context.rs index 835c1774..1eb78c7e 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -5,9 +5,9 @@ use futures::unsync::oneshot; use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Address, SyncAddress, Handler, ResponseType, SpawnHandle, MessageResult}; + Addr, Handler, ResponseType, SpawnHandle, MessageResult, Syn, Unsync}; use actix::fut::ActorFuture; -use actix::dev::{ContextImpl, Envelope, ToEnvelope, RemoteEnvelope}; +use actix::dev::{ContextImpl, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; use error::{Error, ErrorInternalServerError}; @@ -67,13 +67,13 @@ impl AsyncContext for WebsocketContext where A: Actor Address { + fn unsync_address(&mut self) -> Addr> { self.inner.unsync_address() } #[doc(hidden)] #[inline] - fn sync_address(&mut self) -> SyncAddress { + fn sync_address(&mut self) -> Addr> { self.inner.sync_address() } } @@ -217,14 +217,12 @@ impl ActorHttpContext for WebsocketContext where A: Actor ToEnvelope for WebsocketContext - where A: Actor>, +impl ToEnvelope, M> for WebsocketContext + where A: Actor> + Handler, + M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, { - #[inline] - fn pack(msg: M, tx: Option>>) -> Envelope - where A: Handler, - M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send { - RemoteEnvelope::envelope(msg, tx).into() + fn pack(msg: M, tx: Option>>) -> Syn { + Syn::new(Box::new(RemoteEnvelope::envelope(msg, tx))) } }