diff --git a/src/client/connector.rs b/src/client/connector.rs index 1d4f6dbf6..12a36dee2 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -1,20 +1,17 @@ -use std::cell::{Cell, RefCell}; use std::collections::{HashMap, VecDeque}; use std::net::Shutdown; -use std::rc::Rc; use std::time::{Duration, Instant}; use std::{fmt, io, mem, time}; use actix::actors::{Connect as ResolveConnect, Connector, ConnectorError}; use actix::fut::WrapFuture; -use actix::registry::ArbiterService; +use actix::registry::SystemService; use actix::{ fut, Actor, ActorFuture, ActorResponse, AsyncContext, Context, ContextFutureSpawner, - Handler, Message, Recipient, Supervised, Syn, + Handler, Message, Recipient, StreamHandler, Supervised, }; -use futures::task::{current as current_task, Task}; -use futures::unsync::oneshot; +use futures::sync::{mpsc, oneshot}; use futures::{Async, Future, Poll}; use http::{Error as HttpError, HttpTryFrom, Uri}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -167,6 +164,21 @@ struct Waiter { conn_timeout: Duration, } +enum Paused { + No, + Yes, + Timeout(Instant, Delay), +} + +impl Paused { + fn is_paused(&self) -> bool { + match *self { + Paused::No => false, + _ => true, + } + } +} + /// `ClientConnector` type is responsible for transport layer of a /// client connection. pub struct ClientConnector { @@ -176,10 +188,10 @@ pub struct ClientConnector { connector: TlsConnector, stats: ClientConnectorStats, - subscriber: Option>, + subscriber: Option>, - pool: Rc, - pool_modified: Rc>, + acq_tx: mpsc::UnboundedSender, + acq_rx: Option>, conn_lifetime: Duration, conn_keep_alive: Duration, @@ -191,7 +203,7 @@ pub struct ClientConnector { to_close: Vec, waiters: HashMap>, wait_timeout: Option<(Instant, Delay)>, - paused: Option>, + paused: Paused, } impl Actor for ClientConnector { @@ -199,17 +211,18 @@ impl Actor for ClientConnector { fn started(&mut self, ctx: &mut Self::Context) { self.collect_periodic(ctx); + ctx.add_stream(self.acq_rx.take().unwrap()); ctx.spawn(Maintenance); } } impl Supervised for ClientConnector {} -impl ArbiterService for ClientConnector {} +impl SystemService for ClientConnector {} impl Default for ClientConnector { fn default() -> ClientConnector { - let _modified = Rc::new(Cell::new(false)); + let (tx, rx) = mpsc::unbounded(); #[cfg(all(feature = "alpn"))] { @@ -222,8 +235,8 @@ impl Default for ClientConnector { ClientConnector { stats: ClientConnectorStats::default(), subscriber: None, - pool: Rc::new(Pool::new(Rc::clone(&_modified))), - pool_modified: _modified, + acq_tx: tx, + acq_rx: Some(rx), connector: builder.build().unwrap(), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), @@ -235,7 +248,7 @@ impl Default for ClientConnector { to_close: Vec::new(), waiters: HashMap::new(), wait_timeout: None, - paused: None, + paused: Paused::No, } } @@ -243,8 +256,8 @@ impl Default for ClientConnector { ClientConnector { stats: ClientConnectorStats::default(), subscriber: None, - pool: Rc::new(Pool::new(Rc::clone(&_modified))), - pool_modified: _modified, + acq_tx: tx, + acq_rx: Some(rx), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), limit: 100, @@ -255,7 +268,7 @@ impl Default for ClientConnector { to_close: Vec::new(), waiters: HashMap::new(), wait_timeout: None, - paused: None, + paused: Paused::No, } } } @@ -286,7 +299,7 @@ impl ClientConnector { /// /// // Start `ClientConnector` with custom `SslConnector` /// let ssl_conn = SslConnector::builder(SslMethod::tls()).unwrap().build(); - /// let conn: Addr = ClientConnector::with_connector(ssl_conn).start(); + /// let conn = ClientConnector::with_connector(ssl_conn).start(); /// /// Arbiter::spawn( /// conn.send( @@ -305,13 +318,14 @@ impl ClientConnector { /// } /// ``` pub fn with_connector(connector: SslConnector) -> ClientConnector { - let modified = Rc::new(Cell::new(false)); + let (tx, rx) = mpsc::unbounded(); + ClientConnector { connector, stats: ClientConnectorStats::default(), subscriber: None, - pool: Rc::new(Pool::new(Rc::clone(&modified))), - pool_modified: modified, + pool_tx: tx, + pool_rx: Some(rx), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), limit: 100, @@ -322,7 +336,7 @@ impl ClientConnector { to_close: Vec::new(), waiters: HashMap::new(), wait_timeout: None, - paused: None, + paused: Paused::No, } } @@ -366,7 +380,7 @@ impl ClientConnector { } /// Subscribe for connector stats. Only one subscriber is supported. - pub fn stats(mut self, subs: Recipient) -> Self { + pub fn stats(mut self, subs: Recipient) -> Self { self.subscriber = Some(subs); self } @@ -448,75 +462,18 @@ impl ClientConnector { } } - fn collect(&mut self, periodic: bool) { - let now = Instant::now(); - - if self.pool_modified.get() { - // collect half acquire keys - if let Some(keys) = self.pool.collect_keys() { - for key in keys { - self.release_key(&key); - } - } - - // collect connections for close - if let Some(to_close) = self.pool.collect_close() { - for conn in to_close { - self.release_key(&conn.key); - self.to_close.push(conn); - self.stats.closed += 1; - } - } - - // connection connections - if let Some(to_release) = self.pool.collect_release() { - for conn in to_release { - self.release_key(&conn.key); - - // check connection lifetime and the return to available pool - if (now - conn.ts) < self.conn_lifetime { - self.available - .entry(conn.key.clone()) - .or_insert_with(VecDeque::new) - .push_back(Conn(Instant::now(), conn)); - } - } - } - } - - // check keep-alive - for conns in self.available.values_mut() { - while !conns.is_empty() { - if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive - || (now - conns[0].1.ts) > self.conn_lifetime - { - let conn = conns.pop_front().unwrap().1; - self.to_close.push(conn); - self.stats.closed += 1; - } else { - break; - } - } - } - - // check connections for shutdown - if periodic { - let mut idx = 0; - while idx < self.to_close.len() { - match AsyncWrite::shutdown(&mut self.to_close[idx]) { - Ok(Async::NotReady) => idx += 1, - _ => { - self.to_close.swap_remove(idx); - } - } - } - } - - self.pool_modified.set(false); - } - fn collect_periodic(&mut self, ctx: &mut Context) { - self.collect(true); + // check connections for shutdown + let mut idx = 0; + while idx < self.to_close.len() { + match AsyncWrite::shutdown(&mut self.to_close[idx]) { + Ok(Async::NotReady) => idx += 1, + _ => { + self.to_close.swap_remove(idx); + } + } + } + // re-schedule next collect period ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx)); @@ -598,9 +555,9 @@ impl Handler for ClientConnector { let when = Instant::now() + time; let mut timeout = Delay::new(when); let _ = timeout.poll(); - self.paused = Some(Some((when, timeout))); - } else if self.paused.is_none() { - self.paused = Some(None); + self.paused = Paused::Timeout(when, timeout); + } else { + self.paused = Paused::Yes; } } } @@ -609,7 +566,7 @@ impl Handler for ClientConnector { type Result = (); fn handle(&mut self, _: Resume, _: &mut Self::Context) { - self.paused.take(); + self.paused = Paused::No; } } @@ -617,10 +574,6 @@ impl Handler for ClientConnector { type Result = ActorResponse; fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { - if self.pool_modified.get() { - self.collect(false); - } - let uri = &msg.uri; let wait_timeout = msg.wait_timeout; let conn_timeout = msg.conn_timeout; @@ -646,11 +599,6 @@ impl Handler for ClientConnector { return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported)); } - // check if pool has task reference - if self.pool.task.borrow().is_none() { - *self.pool.task.borrow_mut() = Some(current_task()); - } - let host = uri.host().unwrap().to_owned(); let port = uri.port().unwrap_or_else(|| proto.port()); let key = Key { @@ -660,7 +608,7 @@ impl Handler for ClientConnector { }; // check pause state - if self.paused.is_some() { + if self.paused.is_paused() { let rx = self.wait_for(key, wait_timeout, conn_timeout); self.stats.waits += 1; return ActorResponse::async( @@ -678,7 +626,7 @@ impl Handler for ClientConnector { match self.acquire(&key) { Acquire::Acquired(mut conn) => { // use existing connection - conn.pool = Some(AcquiredConn(key, Some(Rc::clone(&self.pool)))); + conn.pool = Some(AcquiredConn(key, Some(self.acq_tx.clone()))); self.stats.reused += 1; return ActorResponse::async(fut::ok(conn)); } @@ -695,7 +643,7 @@ impl Handler for ClientConnector { }), ); } - Acquire::Available => Some(Rc::clone(&self.pool)), + Acquire::Available => Some(self.acq_tx.clone()), } } else { None @@ -801,6 +749,49 @@ impl Handler for ClientConnector { } } +impl StreamHandler for ClientConnector { + fn handle(&mut self, msg: AcquiredConnOperation, _: &mut Context) { + let now = Instant::now(); + + match msg { + AcquiredConnOperation::Close(conn) => { + self.release_key(&conn.key); + self.to_close.push(conn); + self.stats.closed += 1; + } + AcquiredConnOperation::Release(conn) => { + self.release_key(&conn.key); + + // check connection lifetime and the return to available pool + if (Instant::now() - conn.ts) < self.conn_lifetime { + self.available + .entry(conn.key.clone()) + .or_insert_with(VecDeque::new) + .push_back(Conn(Instant::now(), conn)); + } + } + AcquiredConnOperation::ReleaseKey(key) => { + self.release_key(&key); + } + } + + // check keep-alive + for conns in self.available.values_mut() { + while !conns.is_empty() { + if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive + || (now - conns[0].1.ts) > self.conn_lifetime + { + let conn = conns.pop_front().unwrap().1; + self.to_close.push(conn); + self.stats.closed += 1; + } else { + break; + } + } + } + } +} + struct Maintenance; impl fut::ActorFuture for Maintenance { @@ -812,18 +803,10 @@ impl fut::ActorFuture for Maintenance { &mut self, act: &mut ClientConnector, ctx: &mut Context, ) -> Poll { // check pause duration - let done = if let Some(Some(ref pause)) = act.paused { - pause.0 <= Instant::now() - } else { - false - }; - if done { - act.paused.take(); - } - - // collect connections - if act.pool_modified.get() { - act.collect(false); + if let Paused::Timeout(inst, _) = act.paused { + if inst <= Instant::now() { + act.paused = Paused::No; + } } // collect wait timers @@ -843,7 +826,7 @@ impl fut::ActorFuture for Maintenance { // use existing connection act.stats.reused += 1; conn.pool = - Some(AcquiredConn(key.clone(), Some(Rc::clone(&act.pool)))); + Some(AcquiredConn(key.clone(), Some(act.acq_tx.clone()))); let _ = waiter.tx.send(Ok(conn)); } Acquire::NotAvailable => { @@ -851,7 +834,7 @@ impl fut::ActorFuture for Maintenance { break; } Acquire::Available => { - let conn = AcquiredConn(key.clone(), Some(Rc::clone(&act.pool))); + let conn = AcquiredConn(key.clone(), Some(act.acq_tx.clone())); fut::WrapFuture::::actfuture( Connector::from_registry().send( @@ -1050,100 +1033,38 @@ enum Acquire { NotAvailable, } -struct AcquiredConn(Key, Option>); +enum AcquiredConnOperation { + Close(Connection), + Release(Connection), + ReleaseKey(Key), +} + +struct AcquiredConn(Key, Option>); impl AcquiredConn { fn close(&mut self, conn: Connection) { - if let Some(pool) = self.1.take() { - pool.close(conn); + if let Some(tx) = self.1.take() { + let _ = tx.unbounded_send(AcquiredConnOperation::Close(conn)); } } fn release(&mut self, conn: Connection) { - if let Some(pool) = self.1.take() { - pool.release(conn); + if let Some(tx) = self.1.take() { + let _ = tx.unbounded_send(AcquiredConnOperation::Release(conn)); } } } impl Drop for AcquiredConn { fn drop(&mut self) { - if let Some(pool) = self.1.take() { - pool.release_key(self.0.clone()); - } - } -} - -pub struct Pool { - keys: RefCell>, - to_close: RefCell>, - to_release: RefCell>, - task: RefCell>, - modified: Rc>, -} - -impl Pool { - fn new(modified: Rc>) -> Pool { - Pool { - modified, - keys: RefCell::new(Vec::new()), - to_close: RefCell::new(Vec::new()), - to_release: RefCell::new(Vec::new()), - task: RefCell::new(None), - } - } - - fn collect_keys(&self) -> Option> { - if self.keys.borrow().is_empty() { - None - } else { - Some(mem::replace(&mut *self.keys.borrow_mut(), Vec::new())) - } - } - - fn collect_close(&self) -> Option> { - if self.to_close.borrow().is_empty() { - None - } else { - Some(mem::replace(&mut *self.to_close.borrow_mut(), Vec::new())) - } - } - - fn collect_release(&self) -> Option> { - if self.to_release.borrow().is_empty() { - None - } else { - Some(mem::replace(&mut *self.to_release.borrow_mut(), Vec::new())) - } - } - - fn close(&self, conn: Connection) { - self.modified.set(true); - self.to_close.borrow_mut().push(conn); - if let Some(ref task) = *self.task.borrow() { - task.notify() - } - } - - fn release(&self, conn: Connection) { - self.modified.set(true); - self.to_release.borrow_mut().push(conn); - if let Some(ref task) = *self.task.borrow() { - task.notify() - } - } - - fn release_key(&self, key: Key) { - self.modified.set(true); - self.keys.borrow_mut().push(key); - if let Some(ref task) = *self.task.borrow() { - task.notify() + if let Some(tx) = self.1.take() { + let _ = tx.unbounded_send(AcquiredConnOperation::ReleaseKey(self.0.clone())); } } } pub struct Connection { key: Key, - stream: Box, + stream: Box, pool: Option, ts: Instant, } @@ -1155,7 +1076,7 @@ impl fmt::Debug for Connection { } impl Connection { - fn new(key: Key, pool: Option, stream: Box) -> Self { + fn new(key: Key, pool: Option, stream: Box) -> Self { Connection { key, stream, @@ -1168,7 +1089,7 @@ impl Connection { &mut *self.stream } - pub fn from_stream(io: T) -> Connection { + pub fn from_stream(io: T) -> Connection { Connection::new(Key::empty(), None, Box::new(io)) } diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index c75280739..3f3d425d9 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -56,7 +56,7 @@ impl From for SendRequestError { enum State { New, - Connect(actix::dev::Request), + Connect(actix::dev::Request), Connection(Connection), Send(Box), None, @@ -68,7 +68,7 @@ enum State { pub struct SendRequest { req: ClientRequest, state: State, - conn: Addr, + conn: Addr, conn_timeout: Duration, wait_timeout: Duration, timeout: Option, @@ -80,7 +80,7 @@ impl SendRequest { } pub(crate) fn with_connector( - req: ClientRequest, conn: Addr, + req: ClientRequest, conn: Addr, ) -> SendRequest { SendRequest { req, diff --git a/src/client/request.rs b/src/client/request.rs index 9a3d0fb1d..adb1b29fe 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -3,7 +3,7 @@ use std::io::Write; use std::time::Duration; use std::{fmt, mem}; -use actix::{Addr, Unsync}; +use actix::Addr; use bytes::{BufMut, Bytes, BytesMut}; use cookie::{Cookie, CookieJar}; use futures::Stream; @@ -67,7 +67,7 @@ pub struct ClientRequest { enum ConnectionType { Default, - Connector(Addr), + Connector(Addr), Connection(Connection), } @@ -541,7 +541,7 @@ impl ClientRequestBuilder { } /// Send request using custom connector - pub fn with_connector(&mut self, conn: Addr) -> &mut Self { + pub fn with_connector(&mut self, conn: Addr) -> &mut Self { if let Some(parts) = parts(&mut self.request, &self.err) { parts.conn = ConnectionType::Connector(conn); } diff --git a/src/context.rs b/src/context.rs index 375e8ef1d..e298287bc 100644 --- a/src/context.rs +++ b/src/context.rs @@ -4,11 +4,10 @@ use futures::{Async, Future, Poll}; use smallvec::SmallVec; use std::marker::PhantomData; -use actix::dev::{ContextImpl, SyncEnvelope, ToEnvelope}; +use actix::dev::{ContextImpl, Envelope, ToEnvelope}; use actix::fut::ActorFuture; use actix::{ Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, - Syn, Unsync, }; use body::{Binary, Body}; @@ -90,15 +89,9 @@ where fn cancel_future(&mut self, handle: SpawnHandle) -> bool { self.inner.cancel_future(handle) } - #[doc(hidden)] #[inline] - fn unsync_address(&mut self) -> Addr { - self.inner.unsync_address() - } - #[doc(hidden)] - #[inline] - fn sync_address(&mut self) -> Addr { - self.inner.sync_address() + fn address(&mut self) -> Addr { + self.inner.address() } } @@ -223,14 +216,14 @@ where } } -impl ToEnvelope for HttpContext +impl ToEnvelope for HttpContext where A: Actor> + Handler, M: Message + Send + 'static, M::Result: Send, { - fn pack(msg: M, tx: Option>) -> SyncEnvelope { - SyncEnvelope::new(msg, tx) + fn pack(msg: M, tx: Option>) -> Envelope { + Envelope::new(msg, tx) } } diff --git a/src/pipeline.rs b/src/pipeline.rs index e315d4c09..e5152de50 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -806,7 +806,7 @@ mod tests { let req = HttpRequest::default(); let mut ctx = HttpContext::new(req.clone(), MyActor); - let addr: Addr = ctx.address(); + let addr = ctx.address(); let mut info = PipelineInfo::new(req); info.context = Some(Box::new(ctx)); let mut state = Completed::<(), Inner<()>>::init(&mut info) diff --git a/src/server/srv.rs b/src/server/srv.rs index 94e132e15..42b9d77d0 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -51,12 +51,12 @@ where keep_alive: KeepAlive, factory: Arc Vec + Send + Sync>, #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - workers: Vec<(usize, Addr>)>, + workers: Vec<(usize, Addr>)>, sockets: Vec, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, shutdown_timeout: u16, - signals: Option>, + signals: Option>, no_http2: bool, no_signals: bool, } @@ -177,7 +177,7 @@ where } /// Set alternative address for `ProcessSignals` actor. - pub fn signals(mut self, addr: Addr) -> Self { + pub fn signals(mut self, addr: Addr) -> Self { self.signals = Some(addr); self } @@ -380,12 +380,12 @@ where } // subscribe to os signals - fn subscribe_to_signals(&self) -> Option> { + fn subscribe_to_signals(&self) -> Option> { if !self.no_signals { if let Some(ref signals) = self.signals { Some(signals.clone()) } else { - Some(Arbiter::system_registry().get::()) + Some(Arbiter::registry().get::()) } } else { None @@ -422,7 +422,7 @@ impl HttpServer { /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// } /// ``` - pub fn start(mut self) -> Addr { + pub fn start(mut self) -> Addr { if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called before start()"); } else { @@ -458,7 +458,7 @@ impl HttpServer { // start http server actor let signals = self.subscribe_to_signals(); - let addr: Addr = Actor::create(move |ctx| { + let addr = Actor::create(move |ctx| { ctx.add_stream(rx); self }); @@ -510,7 +510,7 @@ impl HttpServer { )] impl HttpServer { /// Start listening for incoming tls connections. - pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { + pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result> { for sock in &mut self.sockets { match sock.tp { StreamHandlerType::Normal => (), @@ -533,7 +533,7 @@ impl HttpServer { /// This method sets alpn protocols to "h2" and "http/1.1" pub fn start_ssl( mut self, mut builder: SslAcceptorBuilder, - ) -> io::Result> { + ) -> io::Result> { // alpn support if !self.no_http2 { builder.set_alpn_protos(b"\x02h2\x08http/1.1")?; @@ -563,7 +563,7 @@ impl HttpServer { /// Start listening for incoming connections from a stream. /// /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr + pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr where S: Stream + 'static, T: AsyncRead + AsyncWrite + 'static, @@ -579,7 +579,7 @@ impl HttpServer { // start server let signals = self.subscribe_to_signals(); - let addr: Addr = HttpServer::create(move |ctx| { + let addr = HttpServer::create(move |ctx| { ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { io: WrapperStream::new(t), token: 0, diff --git a/src/test.rs b/src/test.rs index f950c17b5..bd2135cc4 100644 --- a/src/test.rs +++ b/src/test.rs @@ -5,7 +5,7 @@ use std::str::FromStr; use std::sync::mpsc; use std::{net, thread}; -use actix::{msgs, Actor, Addr, Arbiter, Syn, System, SystemRunner, Unsync}; +use actix::{msgs, Actor, Addr, Arbiter, System, SystemRunner}; use cookie::Cookie; use futures::Future; use http::header::HeaderName; @@ -64,9 +64,9 @@ pub struct TestServer { addr: net::SocketAddr, thread: Option>, system: SystemRunner, - server_sys: Addr, + server_sys: Addr, ssl: bool, - conn: Addr, + conn: Addr, } impl TestServer { @@ -135,7 +135,7 @@ impl TestServer { } } - fn get_conn() -> Addr { + fn get_conn() -> Addr { #[cfg(feature = "alpn")] { use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; diff --git a/src/ws/client.rs b/src/ws/client.rs index 1f35c1867..f8366752e 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -111,7 +111,7 @@ pub struct Client { http_err: Option, origin: Option, protocols: Option, - conn: Addr, + conn: Addr, max_size: usize, } @@ -122,9 +122,7 @@ impl Client { } /// Create new websocket connection with custom `ClientConnector` - pub fn with_connector>( - uri: S, conn: Addr, - ) -> Client { + pub fn with_connector>(uri: S, conn: Addr) -> Client { let mut cl = Client { request: ClientRequest::build(), err: None, diff --git a/src/ws/context.rs b/src/ws/context.rs index 226d93a14..03af169d6 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -3,11 +3,10 @@ use futures::unsync::oneshot; use futures::{Async, Poll}; use smallvec::SmallVec; -use actix::dev::{ContextImpl, SyncEnvelope, ToEnvelope}; +use actix::dev::{ContextImpl, Envelope, ToEnvelope}; use actix::fut::ActorFuture; use actix::{ Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, - Syn, Unsync, }; use body::{Binary, Body}; @@ -75,16 +74,9 @@ where self.inner.cancel_future(handle) } - #[doc(hidden)] #[inline] - fn unsync_address(&mut self) -> Addr { - self.inner.unsync_address() - } - - #[doc(hidden)] - #[inline] - fn sync_address(&mut self) -> Addr { - self.inner.sync_address() + fn address(&mut self) -> Addr { + self.inner.address() } } @@ -282,14 +274,14 @@ where } } -impl ToEnvelope for WebsocketContext +impl ToEnvelope for WebsocketContext where A: Actor> + Handler, M: Message + Send + 'static, M::Result: Send, { - fn pack(msg: M, tx: Option>) -> SyncEnvelope { - SyncEnvelope::new(msg, tx) + fn pack(msg: M, tx: Option>) -> Envelope { + Envelope::new(msg, tx) } }