1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-18 05:41:50 +01:00

use new actix api

This commit is contained in:
Nikolay Kim 2018-02-12 16:08:04 -08:00
parent 720d8c36c1
commit 335ca8ff33
9 changed files with 29 additions and 28 deletions

View File

@ -66,7 +66,7 @@ fn main() {
}); });
let addr = rx.recv().unwrap(); let addr = rx.recv().unwrap();
let _ = addr.call_fut( let _ = addr.call(
server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. server::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
} }
``` ```

View File

@ -191,7 +191,8 @@ impl Handler<Connect> for ClientConnector {
ActorResponse::async( ActorResponse::async(
Connector::from_registry() Connector::from_registry()
.call(self, ResolveConnect::host_and_port(&host, port)) .call(ResolveConnect::host_and_port(&host, port))
.into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected) .map_err(|_, _, _| ClientConnectorError::Disconnected)
.and_then(move |res, _act, _| { .and_then(move |res, _act, _| {
#[cfg(feature="alpn")] #[cfg(feature="alpn")]

View File

@ -83,12 +83,12 @@ impl<A, S> AsyncContext<A> for HttpContext<A, S> where A: Actor<Context=Self>
} }
#[doc(hidden)] #[doc(hidden)]
#[inline] #[inline]
fn unsync_address(&mut self) -> Addr<Unsync<A>> { fn unsync_address(&mut self) -> Addr<Unsync, A> {
self.inner.unsync_address() self.inner.unsync_address()
} }
#[doc(hidden)] #[doc(hidden)]
#[inline] #[inline]
fn sync_address(&mut self) -> Addr<Syn<A>> { fn sync_address(&mut self) -> Addr<Syn, A> {
self.inner.sync_address() self.inner.sync_address()
} }
} }
@ -205,12 +205,12 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
} }
} }
impl<A, M, S> ToEnvelope<Syn<A>, M> for HttpContext<A, S> impl<A, M, S> ToEnvelope<Syn, A, M> for HttpContext<A, S>
where A: Actor<Context=HttpContext<A, S>> + Handler<M>, where A: Actor<Context=HttpContext<A, S>> + Handler<M>,
M: Message + Send + 'static, M::Result: Send, M: Message + Send + 'static, M::Result: Send,
{ {
fn pack(msg: M, tx: Option<Sender<M::Result>>) -> Syn<A> { fn pack(msg: M, tx: Option<Sender<M::Result>>) -> SyncEnvelope<A> {
Syn::new(Box::new(SyncEnvelope::envelope(msg, tx))) SyncEnvelope::new(msg, tx)
} }
} }

View File

@ -739,7 +739,7 @@ mod tests {
let req = HttpRequest::default(); let req = HttpRequest::default();
let mut ctx = HttpContext::new(req.clone(), MyActor); let mut ctx = HttpContext::new(req.clone(), MyActor);
let addr: Addr<Unsync<_>> = ctx.address(); let addr: Addr<Unsync, _> = ctx.address();
let mut info = PipelineInfo::new(req); let mut info = PipelineInfo::new(req);
info.context = Some(Box::new(ctx)); info.context = Some(Box::new(ctx));
let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap(); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap();

View File

@ -36,12 +36,12 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
host: Option<String>, host: Option<String>,
keep_alive: Option<u64>, keep_alive: Option<u64>,
factory: Arc<Fn() -> Vec<H> + Send + Sync>, factory: Arc<Fn() -> Vec<H> + Send + Sync>,
workers: Vec<Addr<Syn<Worker<H::Handler>>>>, workers: Vec<Addr<Syn, Worker<H::Handler>>>,
sockets: HashMap<net::SocketAddr, net::TcpListener>, sockets: HashMap<net::SocketAddr, net::TcpListener>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool, exit: bool,
shutdown_timeout: u16, shutdown_timeout: u16,
signals: Option<Addr<Syn<signal::ProcessSignals>>>, signals: Option<Addr<Syn, signal::ProcessSignals>>,
no_signals: bool, no_signals: bool,
} }
@ -146,7 +146,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
} }
/// Set alternative address for `ProcessSignals` actor. /// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: Addr<Syn<signal::ProcessSignals>>) -> Self { pub fn signals(mut self, addr: Addr<Syn, signal::ProcessSignals>) -> Self {
self.signals = Some(addr); self.signals = Some(addr);
self self
} }
@ -227,7 +227,7 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
} }
// subscribe to os signals // subscribe to os signals
fn subscribe_to_signals(&self) -> Option<Addr<Syn<signal::ProcessSignals>>> { fn subscribe_to_signals(&self) -> Option<Addr<Syn, signal::ProcessSignals>> {
if !self.no_signals { if !self.no_signals {
if let Some(ref signals) = self.signals { if let Some(ref signals) = self.signals {
Some(signals.clone()) Some(signals.clone())
@ -269,7 +269,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
/// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes /// let _ = sys.run(); // <- Run actix system, this method actually starts all async processes
/// } /// }
/// ``` /// ```
pub fn start(mut self) -> Addr<Syn<Self>> pub fn start(mut self) -> Addr<Syn, Self>
{ {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called before start()"); panic!("HttpServer::bind() has to be called before start()");
@ -288,7 +288,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn<_>> = Actor::start(self); let addr: Addr<Syn, _> = Actor::start(self);
signals.map(|signals| signals.send( signals.map(|signals| signals.send(
signal::Subscribe(addr.clone().subscriber()))); signal::Subscribe(addr.clone().subscriber())));
addr addr
@ -407,7 +407,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
/// Start listening for incoming connections from a stream. /// Start listening for incoming connections from a stream.
/// ///
/// This method uses only one thread for handling incoming connections. /// This method uses only one thread for handling incoming connections.
pub fn start_incoming<T, A, S>(mut self, stream: S, secure: bool) -> Addr<Syn<Self>> pub fn start_incoming<T, A, S>(mut self, stream: S, secure: bool) -> Addr<Syn, Self>
where S: Stream<Item=(T, A), Error=io::Error> + 'static, where S: Stream<Item=(T, A), Error=io::Error> + 'static,
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
A: 'static A: 'static
@ -435,7 +435,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start server // start server
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn<_>> = HttpServer::create(move |ctx| { let addr: Addr<Syn, _> = HttpServer::create(move |ctx| {
ctx.add_message_stream( ctx.add_message_stream(
stream stream
.map_err(|_| ()) .map_err(|_| ())
@ -536,7 +536,7 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H>
}; };
for worker in &self.workers { for worker in &self.workers {
let tx2 = tx.clone(); let tx2 = tx.clone();
let fut = worker.call(self, StopWorker{graceful: dur}); let fut = worker.call(StopWorker{graceful: dur}).into_actor(self);
ActorFuture::then(fut, move |_, slf, _| { ActorFuture::then(fut, move |_, slf, _| {
slf.workers.pop(); slf.workers.pop();
if slf.workers.is_empty() { if slf.workers.is_empty() {

View File

@ -56,7 +56,7 @@ pub struct TestServer {
addr: net::SocketAddr, addr: net::SocketAddr,
thread: Option<thread::JoinHandle<()>>, thread: Option<thread::JoinHandle<()>>,
system: SystemRunner, system: SystemRunner,
server_sys: Addr<Syn<System>>, server_sys: Addr<Syn, System>,
} }
impl TestServer { impl TestServer {

View File

@ -103,7 +103,7 @@ pub struct WsClient {
http_err: Option<HttpError>, http_err: Option<HttpError>,
origin: Option<HeaderValue>, origin: Option<HeaderValue>,
protocols: Option<String>, protocols: Option<String>,
conn: Addr<Unsync<ClientConnector>>, conn: Addr<Unsync, ClientConnector>,
} }
impl WsClient { impl WsClient {
@ -114,7 +114,7 @@ impl WsClient {
} }
/// Create new websocket connection with custom `ClientConnector` /// Create new websocket connection with custom `ClientConnector`
pub fn with_connector<S: AsRef<str>>(uri: S, conn: Addr<Unsync<ClientConnector>>) -> WsClient { pub fn with_connector<S: AsRef<str>>(uri: S, conn: Addr<Unsync, ClientConnector>) -> WsClient {
let mut cl = WsClient { let mut cl = WsClient {
request: ClientRequest::build(), request: ClientRequest::build(),
err: None, err: None,
@ -200,7 +200,7 @@ impl WsClient {
// get connection and start handshake // get connection and start handshake
Ok(Box::new( Ok(Box::new(
self.conn.call_fut(Connect(request.uri().clone())) self.conn.call(Connect(request.uri().clone()))
.map_err(|_| WsClientError::Disconnected) .map_err(|_| WsClientError::Disconnected)
.and_then(|res| match res { .and_then(|res| match res {
Ok(stream) => Either::A(WsHandshake::new(stream, request)), Ok(stream) => Either::A(WsHandshake::new(stream, request)),

View File

@ -67,13 +67,13 @@ impl<A, S> AsyncContext<A> for WebsocketContext<A, S> where A: Actor<Context=Sel
#[doc(hidden)] #[doc(hidden)]
#[inline] #[inline]
fn unsync_address(&mut self) -> Addr<Unsync<A>> { fn unsync_address(&mut self) -> Addr<Unsync, A> {
self.inner.unsync_address() self.inner.unsync_address()
} }
#[doc(hidden)] #[doc(hidden)]
#[inline] #[inline]
fn sync_address(&mut self) -> Addr<Syn<A>> { fn sync_address(&mut self) -> Addr<Syn, A> {
self.inner.sync_address() self.inner.sync_address()
} }
} }
@ -217,12 +217,12 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
} }
} }
impl<A, M, S> ToEnvelope<Syn<A>, M> for WebsocketContext<A, S> impl<A, M, S> ToEnvelope<Syn, A, M> for WebsocketContext<A, S>
where A: Actor<Context=WebsocketContext<A, S>> + Handler<M>, where A: Actor<Context=WebsocketContext<A, S>> + Handler<M>,
M: Message + Send + 'static, M::Result: Send M: Message + Send + 'static, M::Result: Send
{ {
fn pack(msg: M, tx: Option<Sender<M::Result>>) -> Syn<A> { fn pack(msg: M, tx: Option<Sender<M::Result>>) -> SyncEnvelope<A> {
Syn::new(Box::new(SyncEnvelope::envelope(msg, tx))) SyncEnvelope::new(msg, tx)
} }
} }

View File

@ -72,12 +72,12 @@ fn test_start() {
assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success());
// pause // pause
let _ = srv_addr.call_fut(server::PauseServer).wait(); let _ = srv_addr.call(server::PauseServer).wait();
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err()); assert!(net::TcpStream::connect(addr).is_err());
// resume // resume
let _ = srv_addr.call_fut(server::ResumeServer).wait(); let _ = srv_addr.call(server::ResumeServer).wait();
assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success()); assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success());
} }