1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-12-01 02:44:37 +01:00

update actix Addr; make ClientConnector thread safe

This commit is contained in:
Nikolay Kim 2018-05-27 05:02:49 -07:00
parent 7c71171602
commit be2ceb7c66
9 changed files with 157 additions and 253 deletions

View File

@ -1,20 +1,17 @@
use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::net::Shutdown; use std::net::Shutdown;
use std::rc::Rc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{fmt, io, mem, time}; use std::{fmt, io, mem, time};
use actix::actors::{Connect as ResolveConnect, Connector, ConnectorError}; use actix::actors::{Connect as ResolveConnect, Connector, ConnectorError};
use actix::fut::WrapFuture; use actix::fut::WrapFuture;
use actix::registry::ArbiterService; use actix::registry::SystemService;
use actix::{ use actix::{
fut, Actor, ActorFuture, ActorResponse, AsyncContext, Context, ContextFutureSpawner, fut, Actor, ActorFuture, ActorResponse, AsyncContext, Context, ContextFutureSpawner,
Handler, Message, Recipient, Supervised, Syn, Handler, Message, Recipient, StreamHandler, Supervised,
}; };
use futures::task::{current as current_task, Task}; use futures::sync::{mpsc, oneshot};
use futures::unsync::oneshot;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use http::{Error as HttpError, HttpTryFrom, Uri}; use http::{Error as HttpError, HttpTryFrom, Uri};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -167,6 +164,21 @@ struct Waiter {
conn_timeout: Duration, conn_timeout: Duration,
} }
enum Paused {
No,
Yes,
Timeout(Instant, Delay),
}
impl Paused {
fn is_paused(&self) -> bool {
match *self {
Paused::No => false,
_ => true,
}
}
}
/// `ClientConnector` type is responsible for transport layer of a /// `ClientConnector` type is responsible for transport layer of a
/// client connection. /// client connection.
pub struct ClientConnector { pub struct ClientConnector {
@ -176,10 +188,10 @@ pub struct ClientConnector {
connector: TlsConnector, connector: TlsConnector,
stats: ClientConnectorStats, stats: ClientConnectorStats,
subscriber: Option<Recipient<Syn, ClientConnectorStats>>, subscriber: Option<Recipient<ClientConnectorStats>>,
pool: Rc<Pool>, acq_tx: mpsc::UnboundedSender<AcquiredConnOperation>,
pool_modified: Rc<Cell<bool>>, acq_rx: Option<mpsc::UnboundedReceiver<AcquiredConnOperation>>,
conn_lifetime: Duration, conn_lifetime: Duration,
conn_keep_alive: Duration, conn_keep_alive: Duration,
@ -191,7 +203,7 @@ pub struct ClientConnector {
to_close: Vec<Connection>, to_close: Vec<Connection>,
waiters: HashMap<Key, VecDeque<Waiter>>, waiters: HashMap<Key, VecDeque<Waiter>>,
wait_timeout: Option<(Instant, Delay)>, wait_timeout: Option<(Instant, Delay)>,
paused: Option<Option<(Instant, Delay)>>, paused: Paused,
} }
impl Actor for ClientConnector { impl Actor for ClientConnector {
@ -199,17 +211,18 @@ impl Actor for ClientConnector {
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
self.collect_periodic(ctx); self.collect_periodic(ctx);
ctx.add_stream(self.acq_rx.take().unwrap());
ctx.spawn(Maintenance); ctx.spawn(Maintenance);
} }
} }
impl Supervised for ClientConnector {} impl Supervised for ClientConnector {}
impl ArbiterService for ClientConnector {} impl SystemService for ClientConnector {}
impl Default for ClientConnector { impl Default for ClientConnector {
fn default() -> ClientConnector { fn default() -> ClientConnector {
let _modified = Rc::new(Cell::new(false)); let (tx, rx) = mpsc::unbounded();
#[cfg(all(feature = "alpn"))] #[cfg(all(feature = "alpn"))]
{ {
@ -222,8 +235,8 @@ impl Default for ClientConnector {
ClientConnector { ClientConnector {
stats: ClientConnectorStats::default(), stats: ClientConnectorStats::default(),
subscriber: None, subscriber: None,
pool: Rc::new(Pool::new(Rc::clone(&_modified))), acq_tx: tx,
pool_modified: _modified, acq_rx: Some(rx),
connector: builder.build().unwrap(), connector: builder.build().unwrap(),
conn_lifetime: Duration::from_secs(75), conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(15),
@ -235,7 +248,7 @@ impl Default for ClientConnector {
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None, wait_timeout: None,
paused: None, paused: Paused::No,
} }
} }
@ -243,8 +256,8 @@ impl Default for ClientConnector {
ClientConnector { ClientConnector {
stats: ClientConnectorStats::default(), stats: ClientConnectorStats::default(),
subscriber: None, subscriber: None,
pool: Rc::new(Pool::new(Rc::clone(&_modified))), acq_tx: tx,
pool_modified: _modified, acq_rx: Some(rx),
conn_lifetime: Duration::from_secs(75), conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(15),
limit: 100, limit: 100,
@ -255,7 +268,7 @@ impl Default for ClientConnector {
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None, wait_timeout: None,
paused: None, paused: Paused::No,
} }
} }
} }
@ -286,7 +299,7 @@ impl ClientConnector {
/// ///
/// // Start `ClientConnector` with custom `SslConnector` /// // Start `ClientConnector` with custom `SslConnector`
/// let ssl_conn = SslConnector::builder(SslMethod::tls()).unwrap().build(); /// let ssl_conn = SslConnector::builder(SslMethod::tls()).unwrap().build();
/// let conn: Addr<Unsync, _> = ClientConnector::with_connector(ssl_conn).start(); /// let conn = ClientConnector::with_connector(ssl_conn).start();
/// ///
/// Arbiter::spawn( /// Arbiter::spawn(
/// conn.send( /// conn.send(
@ -305,13 +318,14 @@ impl ClientConnector {
/// } /// }
/// ``` /// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector { pub fn with_connector(connector: SslConnector) -> ClientConnector {
let modified = Rc::new(Cell::new(false)); let (tx, rx) = mpsc::unbounded();
ClientConnector { ClientConnector {
connector, connector,
stats: ClientConnectorStats::default(), stats: ClientConnectorStats::default(),
subscriber: None, subscriber: None,
pool: Rc::new(Pool::new(Rc::clone(&modified))), pool_tx: tx,
pool_modified: modified, pool_rx: Some(rx),
conn_lifetime: Duration::from_secs(75), conn_lifetime: Duration::from_secs(75),
conn_keep_alive: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(15),
limit: 100, limit: 100,
@ -322,7 +336,7 @@ impl ClientConnector {
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None, wait_timeout: None,
paused: None, paused: Paused::No,
} }
} }
@ -366,7 +380,7 @@ impl ClientConnector {
} }
/// Subscribe for connector stats. Only one subscriber is supported. /// Subscribe for connector stats. Only one subscriber is supported.
pub fn stats(mut self, subs: Recipient<Syn, ClientConnectorStats>) -> Self { pub fn stats(mut self, subs: Recipient<ClientConnectorStats>) -> Self {
self.subscriber = Some(subs); self.subscriber = Some(subs);
self self
} }
@ -448,59 +462,8 @@ impl ClientConnector {
} }
} }
fn collect(&mut self, periodic: bool) { fn collect_periodic(&mut self, ctx: &mut Context<Self>) {
let now = Instant::now();
if self.pool_modified.get() {
// collect half acquire keys
if let Some(keys) = self.pool.collect_keys() {
for key in keys {
self.release_key(&key);
}
}
// collect connections for close
if let Some(to_close) = self.pool.collect_close() {
for conn in to_close {
self.release_key(&conn.key);
self.to_close.push(conn);
self.stats.closed += 1;
}
}
// connection connections
if let Some(to_release) = self.pool.collect_release() {
for conn in to_release {
self.release_key(&conn.key);
// check connection lifetime and the return to available pool
if (now - conn.ts) < self.conn_lifetime {
self.available
.entry(conn.key.clone())
.or_insert_with(VecDeque::new)
.push_back(Conn(Instant::now(), conn));
}
}
}
}
// check keep-alive
for conns in self.available.values_mut() {
while !conns.is_empty() {
if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive
|| (now - conns[0].1.ts) > self.conn_lifetime
{
let conn = conns.pop_front().unwrap().1;
self.to_close.push(conn);
self.stats.closed += 1;
} else {
break;
}
}
}
// check connections for shutdown // check connections for shutdown
if periodic {
let mut idx = 0; let mut idx = 0;
while idx < self.to_close.len() { while idx < self.to_close.len() {
match AsyncWrite::shutdown(&mut self.to_close[idx]) { match AsyncWrite::shutdown(&mut self.to_close[idx]) {
@ -510,13 +473,7 @@ impl ClientConnector {
} }
} }
} }
}
self.pool_modified.set(false);
}
fn collect_periodic(&mut self, ctx: &mut Context<Self>) {
self.collect(true);
// re-schedule next collect period // re-schedule next collect period
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx)); ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx));
@ -598,9 +555,9 @@ impl Handler<Pause> for ClientConnector {
let when = Instant::now() + time; let when = Instant::now() + time;
let mut timeout = Delay::new(when); let mut timeout = Delay::new(when);
let _ = timeout.poll(); let _ = timeout.poll();
self.paused = Some(Some((when, timeout))); self.paused = Paused::Timeout(when, timeout);
} else if self.paused.is_none() { } else {
self.paused = Some(None); self.paused = Paused::Yes;
} }
} }
} }
@ -609,7 +566,7 @@ impl Handler<Resume> for ClientConnector {
type Result = (); type Result = ();
fn handle(&mut self, _: Resume, _: &mut Self::Context) { fn handle(&mut self, _: Resume, _: &mut Self::Context) {
self.paused.take(); self.paused = Paused::No;
} }
} }
@ -617,10 +574,6 @@ impl Handler<Connect> for ClientConnector {
type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>; type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>;
fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
if self.pool_modified.get() {
self.collect(false);
}
let uri = &msg.uri; let uri = &msg.uri;
let wait_timeout = msg.wait_timeout; let wait_timeout = msg.wait_timeout;
let conn_timeout = msg.conn_timeout; let conn_timeout = msg.conn_timeout;
@ -646,11 +599,6 @@ impl Handler<Connect> for ClientConnector {
return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported)); return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported));
} }
// check if pool has task reference
if self.pool.task.borrow().is_none() {
*self.pool.task.borrow_mut() = Some(current_task());
}
let host = uri.host().unwrap().to_owned(); let host = uri.host().unwrap().to_owned();
let port = uri.port().unwrap_or_else(|| proto.port()); let port = uri.port().unwrap_or_else(|| proto.port());
let key = Key { let key = Key {
@ -660,7 +608,7 @@ impl Handler<Connect> for ClientConnector {
}; };
// check pause state // check pause state
if self.paused.is_some() { if self.paused.is_paused() {
let rx = self.wait_for(key, wait_timeout, conn_timeout); let rx = self.wait_for(key, wait_timeout, conn_timeout);
self.stats.waits += 1; self.stats.waits += 1;
return ActorResponse::async( return ActorResponse::async(
@ -678,7 +626,7 @@ impl Handler<Connect> for ClientConnector {
match self.acquire(&key) { match self.acquire(&key) {
Acquire::Acquired(mut conn) => { Acquire::Acquired(mut conn) => {
// use existing connection // use existing connection
conn.pool = Some(AcquiredConn(key, Some(Rc::clone(&self.pool)))); conn.pool = Some(AcquiredConn(key, Some(self.acq_tx.clone())));
self.stats.reused += 1; self.stats.reused += 1;
return ActorResponse::async(fut::ok(conn)); return ActorResponse::async(fut::ok(conn));
} }
@ -695,7 +643,7 @@ impl Handler<Connect> for ClientConnector {
}), }),
); );
} }
Acquire::Available => Some(Rc::clone(&self.pool)), Acquire::Available => Some(self.acq_tx.clone()),
} }
} else { } else {
None None
@ -801,6 +749,49 @@ impl Handler<Connect> for ClientConnector {
} }
} }
impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector {
fn handle(&mut self, msg: AcquiredConnOperation, _: &mut Context<Self>) {
let now = Instant::now();
match msg {
AcquiredConnOperation::Close(conn) => {
self.release_key(&conn.key);
self.to_close.push(conn);
self.stats.closed += 1;
}
AcquiredConnOperation::Release(conn) => {
self.release_key(&conn.key);
// check connection lifetime and the return to available pool
if (Instant::now() - conn.ts) < self.conn_lifetime {
self.available
.entry(conn.key.clone())
.or_insert_with(VecDeque::new)
.push_back(Conn(Instant::now(), conn));
}
}
AcquiredConnOperation::ReleaseKey(key) => {
self.release_key(&key);
}
}
// check keep-alive
for conns in self.available.values_mut() {
while !conns.is_empty() {
if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive
|| (now - conns[0].1.ts) > self.conn_lifetime
{
let conn = conns.pop_front().unwrap().1;
self.to_close.push(conn);
self.stats.closed += 1;
} else {
break;
}
}
}
}
}
struct Maintenance; struct Maintenance;
impl fut::ActorFuture for Maintenance { impl fut::ActorFuture for Maintenance {
@ -812,18 +803,10 @@ impl fut::ActorFuture for Maintenance {
&mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>, &mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>,
) -> Poll<Self::Item, Self::Error> { ) -> Poll<Self::Item, Self::Error> {
// check pause duration // check pause duration
let done = if let Some(Some(ref pause)) = act.paused { if let Paused::Timeout(inst, _) = act.paused {
pause.0 <= Instant::now() if inst <= Instant::now() {
} else { act.paused = Paused::No;
false
};
if done {
act.paused.take();
} }
// collect connections
if act.pool_modified.get() {
act.collect(false);
} }
// collect wait timers // collect wait timers
@ -843,7 +826,7 @@ impl fut::ActorFuture for Maintenance {
// use existing connection // use existing connection
act.stats.reused += 1; act.stats.reused += 1;
conn.pool = conn.pool =
Some(AcquiredConn(key.clone(), Some(Rc::clone(&act.pool)))); Some(AcquiredConn(key.clone(), Some(act.acq_tx.clone())));
let _ = waiter.tx.send(Ok(conn)); let _ = waiter.tx.send(Ok(conn));
} }
Acquire::NotAvailable => { Acquire::NotAvailable => {
@ -851,7 +834,7 @@ impl fut::ActorFuture for Maintenance {
break; break;
} }
Acquire::Available => { Acquire::Available => {
let conn = AcquiredConn(key.clone(), Some(Rc::clone(&act.pool))); let conn = AcquiredConn(key.clone(), Some(act.acq_tx.clone()));
fut::WrapFuture::<ClientConnector>::actfuture( fut::WrapFuture::<ClientConnector>::actfuture(
Connector::from_registry().send( Connector::from_registry().send(
@ -1050,100 +1033,38 @@ enum Acquire {
NotAvailable, NotAvailable,
} }
struct AcquiredConn(Key, Option<Rc<Pool>>); enum AcquiredConnOperation {
Close(Connection),
Release(Connection),
ReleaseKey(Key),
}
struct AcquiredConn(Key, Option<mpsc::UnboundedSender<AcquiredConnOperation>>);
impl AcquiredConn { impl AcquiredConn {
fn close(&mut self, conn: Connection) { fn close(&mut self, conn: Connection) {
if let Some(pool) = self.1.take() { if let Some(tx) = self.1.take() {
pool.close(conn); let _ = tx.unbounded_send(AcquiredConnOperation::Close(conn));
} }
} }
fn release(&mut self, conn: Connection) { fn release(&mut self, conn: Connection) {
if let Some(pool) = self.1.take() { if let Some(tx) = self.1.take() {
pool.release(conn); let _ = tx.unbounded_send(AcquiredConnOperation::Release(conn));
} }
} }
} }
impl Drop for AcquiredConn { impl Drop for AcquiredConn {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(pool) = self.1.take() { if let Some(tx) = self.1.take() {
pool.release_key(self.0.clone()); let _ = tx.unbounded_send(AcquiredConnOperation::ReleaseKey(self.0.clone()));
}
}
}
pub struct Pool {
keys: RefCell<Vec<Key>>,
to_close: RefCell<Vec<Connection>>,
to_release: RefCell<Vec<Connection>>,
task: RefCell<Option<Task>>,
modified: Rc<Cell<bool>>,
}
impl Pool {
fn new(modified: Rc<Cell<bool>>) -> Pool {
Pool {
modified,
keys: RefCell::new(Vec::new()),
to_close: RefCell::new(Vec::new()),
to_release: RefCell::new(Vec::new()),
task: RefCell::new(None),
}
}
fn collect_keys(&self) -> Option<Vec<Key>> {
if self.keys.borrow().is_empty() {
None
} else {
Some(mem::replace(&mut *self.keys.borrow_mut(), Vec::new()))
}
}
fn collect_close(&self) -> Option<Vec<Connection>> {
if self.to_close.borrow().is_empty() {
None
} else {
Some(mem::replace(&mut *self.to_close.borrow_mut(), Vec::new()))
}
}
fn collect_release(&self) -> Option<Vec<Connection>> {
if self.to_release.borrow().is_empty() {
None
} else {
Some(mem::replace(&mut *self.to_release.borrow_mut(), Vec::new()))
}
}
fn close(&self, conn: Connection) {
self.modified.set(true);
self.to_close.borrow_mut().push(conn);
if let Some(ref task) = *self.task.borrow() {
task.notify()
}
}
fn release(&self, conn: Connection) {
self.modified.set(true);
self.to_release.borrow_mut().push(conn);
if let Some(ref task) = *self.task.borrow() {
task.notify()
}
}
fn release_key(&self, key: Key) {
self.modified.set(true);
self.keys.borrow_mut().push(key);
if let Some(ref task) = *self.task.borrow() {
task.notify()
} }
} }
} }
pub struct Connection { pub struct Connection {
key: Key, key: Key,
stream: Box<IoStream>, stream: Box<IoStream + Send>,
pool: Option<AcquiredConn>, pool: Option<AcquiredConn>,
ts: Instant, ts: Instant,
} }
@ -1155,7 +1076,7 @@ impl fmt::Debug for Connection {
} }
impl Connection { impl Connection {
fn new(key: Key, pool: Option<AcquiredConn>, stream: Box<IoStream>) -> Self { fn new(key: Key, pool: Option<AcquiredConn>, stream: Box<IoStream + Send>) -> Self {
Connection { Connection {
key, key,
stream, stream,
@ -1168,7 +1089,7 @@ impl Connection {
&mut *self.stream &mut *self.stream
} }
pub fn from_stream<T: IoStream>(io: T) -> Connection { pub fn from_stream<T: IoStream + Send>(io: T) -> Connection {
Connection::new(Key::empty(), None, Box::new(io)) Connection::new(Key::empty(), None, Box::new(io))
} }

View File

@ -56,7 +56,7 @@ impl From<ClientConnectorError> for SendRequestError {
enum State { enum State {
New, New,
Connect(actix::dev::Request<Unsync, ClientConnector, Connect>), Connect(actix::dev::Request<ClientConnector, Connect>),
Connection(Connection), Connection(Connection),
Send(Box<Pipeline>), Send(Box<Pipeline>),
None, None,
@ -68,7 +68,7 @@ enum State {
pub struct SendRequest { pub struct SendRequest {
req: ClientRequest, req: ClientRequest,
state: State, state: State,
conn: Addr<Unsync, ClientConnector>, conn: Addr<ClientConnector>,
conn_timeout: Duration, conn_timeout: Duration,
wait_timeout: Duration, wait_timeout: Duration,
timeout: Option<Delay>, timeout: Option<Delay>,
@ -80,7 +80,7 @@ impl SendRequest {
} }
pub(crate) fn with_connector( pub(crate) fn with_connector(
req: ClientRequest, conn: Addr<Unsync, ClientConnector>, req: ClientRequest, conn: Addr<ClientConnector>,
) -> SendRequest { ) -> SendRequest {
SendRequest { SendRequest {
req, req,

View File

@ -3,7 +3,7 @@ use std::io::Write;
use std::time::Duration; use std::time::Duration;
use std::{fmt, mem}; use std::{fmt, mem};
use actix::{Addr, Unsync}; use actix::Addr;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use cookie::{Cookie, CookieJar}; use cookie::{Cookie, CookieJar};
use futures::Stream; use futures::Stream;
@ -67,7 +67,7 @@ pub struct ClientRequest {
enum ConnectionType { enum ConnectionType {
Default, Default,
Connector(Addr<Unsync, ClientConnector>), Connector(Addr<ClientConnector>),
Connection(Connection), Connection(Connection),
} }
@ -541,7 +541,7 @@ impl ClientRequestBuilder {
} }
/// Send request using custom connector /// Send request using custom connector
pub fn with_connector(&mut self, conn: Addr<Unsync, ClientConnector>) -> &mut Self { pub fn with_connector(&mut self, conn: Addr<ClientConnector>) -> &mut Self {
if let Some(parts) = parts(&mut self.request, &self.err) { if let Some(parts) = parts(&mut self.request, &self.err) {
parts.conn = ConnectionType::Connector(conn); parts.conn = ConnectionType::Connector(conn);
} }

View File

@ -4,11 +4,10 @@ use futures::{Async, Future, Poll};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::marker::PhantomData; use std::marker::PhantomData;
use actix::dev::{ContextImpl, SyncEnvelope, ToEnvelope}; use actix::dev::{ContextImpl, Envelope, ToEnvelope};
use actix::fut::ActorFuture; use actix::fut::ActorFuture;
use actix::{ use actix::{
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
Syn, Unsync,
}; };
use body::{Binary, Body}; use body::{Binary, Body};
@ -90,15 +89,9 @@ where
fn cancel_future(&mut self, handle: SpawnHandle) -> bool { fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.inner.cancel_future(handle) self.inner.cancel_future(handle)
} }
#[doc(hidden)]
#[inline] #[inline]
fn unsync_address(&mut self) -> Addr<Unsync, A> { fn address(&mut self) -> Addr<A> {
self.inner.unsync_address() self.inner.address()
}
#[doc(hidden)]
#[inline]
fn sync_address(&mut self) -> Addr<Syn, A> {
self.inner.sync_address()
} }
} }
@ -223,14 +216,14 @@ where
} }
} }
impl<A, M, S> ToEnvelope<Syn, A, M> for HttpContext<A, S> impl<A, M, S> ToEnvelope<A, M> for HttpContext<A, S>
where where
A: Actor<Context = HttpContext<A, S>> + Handler<M>, A: Actor<Context = HttpContext<A, S>> + Handler<M>,
M: Message + Send + 'static, M: Message + Send + 'static,
M::Result: Send, M::Result: Send,
{ {
fn pack(msg: M, tx: Option<Sender<M::Result>>) -> SyncEnvelope<A> { fn pack(msg: M, tx: Option<Sender<M::Result>>) -> Envelope<A> {
SyncEnvelope::new(msg, tx) Envelope::new(msg, tx)
} }
} }

View File

@ -806,7 +806,7 @@ mod tests {
let req = HttpRequest::default(); let req = HttpRequest::default();
let mut ctx = HttpContext::new(req.clone(), MyActor); let mut ctx = HttpContext::new(req.clone(), MyActor);
let addr: Addr<Unsync, _> = ctx.address(); let addr = ctx.address();
let mut info = PipelineInfo::new(req); let mut info = PipelineInfo::new(req);
info.context = Some(Box::new(ctx)); info.context = Some(Box::new(ctx));
let mut state = Completed::<(), Inner<()>>::init(&mut info) let mut state = Completed::<(), Inner<()>>::init(&mut info)

View File

@ -51,12 +51,12 @@ where
keep_alive: KeepAlive, keep_alive: KeepAlive,
factory: Arc<Fn() -> Vec<H> + Send + Sync>, factory: Arc<Fn() -> Vec<H> + Send + Sync>,
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Syn, Worker<H::Handler>>)>, workers: Vec<(usize, Addr<Worker<H::Handler>>)>,
sockets: Vec<Socket>, sockets: Vec<Socket>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool, exit: bool,
shutdown_timeout: u16, shutdown_timeout: u16,
signals: Option<Addr<Syn, signal::ProcessSignals>>, signals: Option<Addr<signal::ProcessSignals>>,
no_http2: bool, no_http2: bool,
no_signals: bool, no_signals: bool,
} }
@ -177,7 +177,7 @@ where
} }
/// Set alternative address for `ProcessSignals` actor. /// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<Syn, signal::ProcessSignals>) -> Self { pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr); self.signals = Some(addr);
self self
} }
@ -380,12 +380,12 @@ where
} }
// subscribe to os signals // subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<Syn, signal::ProcessSignals>> { fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals { if !self.no_signals {
if let Some(ref signals) = self.signals { if let Some(ref signals) = self.signals {
Some(signals.clone()) Some(signals.clone())
} else { } else {
Some(Arbiter::system_registry().get::<signal::ProcessSignals>()) Some(Arbiter::registry().get::<signal::ProcessSignals>())
} }
} else { } else {
None None
@ -422,7 +422,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes
/// } /// }
/// ``` /// ```
pub fn start(mut self) -> Addr<Syn, Self> { pub fn start(mut self) -> Addr<Self> {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called before start()"); panic!("HttpServer::bind() has to be called before start()");
} else { } else {
@ -458,7 +458,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::create(move |ctx| { let addr = Actor::create(move |ctx| {
ctx.add_stream(rx); ctx.add_stream(rx);
self self
}); });
@ -510,7 +510,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
)] )]
impl<H: IntoHttpHandler> HttpServer<H> { impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming tls connections. /// Start listening for incoming tls connections.
pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result<Addr<Syn, Self>> { pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result<Addr<Self>> {
for sock in &mut self.sockets { for sock in &mut self.sockets {
match sock.tp { match sock.tp {
StreamHandlerType::Normal => (), StreamHandlerType::Normal => (),
@ -533,7 +533,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// This method sets alpn protocols to "h2" and "http/1.1" /// This method sets alpn protocols to "h2" and "http/1.1"
pub fn start_ssl( pub fn start_ssl(
mut self, mut builder: SslAcceptorBuilder, mut self, mut builder: SslAcceptorBuilder,
) -> io::Result<Addr<Syn, Self>> { ) -> io::Result<Addr<Self>> {
// alpn support // alpn support
if !self.no_http2 { if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?; builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
@ -563,7 +563,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
/// Start listening for incoming connections from a stream. /// Start listening for incoming connections from a stream.
/// ///
/// This method uses only one thread for handling incoming connections. /// This method uses only one thread for handling incoming connections.
pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Syn, Self> pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Self>
where where
S: Stream<Item = T, Error = io::Error> + 'static, S: Stream<Item = T, Error = io::Error> + 'static,
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
@ -579,7 +579,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
// start server // start server
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = HttpServer::create(move |ctx| { let addr = HttpServer::create(move |ctx| {
ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn {
io: WrapperStream::new(t), io: WrapperStream::new(t),
token: 0, token: 0,

View File

@ -5,7 +5,7 @@ use std::str::FromStr;
use std::sync::mpsc; use std::sync::mpsc;
use std::{net, thread}; use std::{net, thread};
use actix::{msgs, Actor, Addr, Arbiter, Syn, System, SystemRunner, Unsync}; use actix::{msgs, Actor, Addr, Arbiter, System, SystemRunner};
use cookie::Cookie; use cookie::Cookie;
use futures::Future; use futures::Future;
use http::header::HeaderName; use http::header::HeaderName;
@ -64,9 +64,9 @@ pub struct TestServer {
addr: net::SocketAddr, addr: net::SocketAddr,
thread: Option<thread::JoinHandle<()>>, thread: Option<thread::JoinHandle<()>>,
system: SystemRunner, system: SystemRunner,
server_sys: Addr<Syn, System>, server_sys: Addr<System>,
ssl: bool, ssl: bool,
conn: Addr<Unsync, ClientConnector>, conn: Addr<ClientConnector>,
} }
impl TestServer { impl TestServer {
@ -135,7 +135,7 @@ impl TestServer {
} }
} }
fn get_conn() -> Addr<Unsync, ClientConnector> { fn get_conn() -> Addr<ClientConnector> {
#[cfg(feature = "alpn")] #[cfg(feature = "alpn")]
{ {
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};

View File

@ -111,7 +111,7 @@ pub struct Client {
http_err: Option<HttpError>, http_err: Option<HttpError>,
origin: Option<HeaderValue>, origin: Option<HeaderValue>,
protocols: Option<String>, protocols: Option<String>,
conn: Addr<Unsync, ClientConnector>, conn: Addr<ClientConnector>,
max_size: usize, max_size: usize,
} }
@ -122,9 +122,7 @@ impl Client {
} }
/// Create new websocket connection with custom `ClientConnector` /// Create new websocket connection with custom `ClientConnector`
pub fn with_connector<S: AsRef<str>>( pub fn with_connector<S: AsRef<str>>(uri: S, conn: Addr<ClientConnector>) -> Client {
uri: S, conn: Addr<Unsync, ClientConnector>,
) -> Client {
let mut cl = Client { let mut cl = Client {
request: ClientRequest::build(), request: ClientRequest::build(),
err: None, err: None,

View File

@ -3,11 +3,10 @@ use futures::unsync::oneshot;
use futures::{Async, Poll}; use futures::{Async, Poll};
use smallvec::SmallVec; use smallvec::SmallVec;
use actix::dev::{ContextImpl, SyncEnvelope, ToEnvelope}; use actix::dev::{ContextImpl, Envelope, ToEnvelope};
use actix::fut::ActorFuture; use actix::fut::ActorFuture;
use actix::{ use actix::{
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
Syn, Unsync,
}; };
use body::{Binary, Body}; use body::{Binary, Body};
@ -75,16 +74,9 @@ where
self.inner.cancel_future(handle) self.inner.cancel_future(handle)
} }
#[doc(hidden)]
#[inline] #[inline]
fn unsync_address(&mut self) -> Addr<Unsync, A> { fn address(&mut self) -> Addr<A> {
self.inner.unsync_address() self.inner.address()
}
#[doc(hidden)]
#[inline]
fn sync_address(&mut self) -> Addr<Syn, A> {
self.inner.sync_address()
} }
} }
@ -282,14 +274,14 @@ where
} }
} }
impl<A, M, S> ToEnvelope<Syn, A, M> for WebsocketContext<A, S> impl<A, M, S> ToEnvelope<A, M> for WebsocketContext<A, S>
where where
A: Actor<Context = WebsocketContext<A, S>> + Handler<M>, A: Actor<Context = WebsocketContext<A, S>> + Handler<M>,
M: Message + Send + 'static, M: Message + Send + 'static,
M::Result: Send, M::Result: Send,
{ {
fn pack(msg: M, tx: Option<Sender<M::Result>>) -> SyncEnvelope<A> { fn pack(msg: M, tx: Option<Sender<M::Result>>) -> Envelope<A> {
SyncEnvelope::new(msg, tx) Envelope::new(msg, tx)
} }
} }