1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

update actix usage

This commit is contained in:
Nikolay Kim 2018-06-17 02:58:56 +06:00
parent 342a194605
commit e4443226f6
4 changed files with 59 additions and 62 deletions

View File

@ -105,7 +105,7 @@ pub struct HttpRequest<S = ()>(SharedHttpInnerMessage, Option<Rc<S>>, Option<Rou
impl HttpRequest<()> { impl HttpRequest<()> {
/// Construct a new Request. /// Construct a new Request.
#[inline] #[inline]
pub fn new( pub(crate) fn new(
method: Method, uri: Uri, version: Version, headers: HeaderMap, method: Method, uri: Uri, version: Version, headers: HeaderMap,
payload: Option<Payload>, payload: Option<Payload>,
) -> HttpRequest { ) -> HttpRequest {

View File

@ -4,8 +4,8 @@ use std::time::Duration;
use std::{io, net, thread}; use std::{io, net, thread};
use actix::{ use actix::{
fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
ContextFutureSpawner, Handler, Response, StreamHandler, System, WrapFuture, Response, StreamHandler, System, WrapFuture,
}; };
use futures::sync::mpsc; use futures::sync::mpsc;
@ -64,8 +64,7 @@ where
no_signals: bool, no_signals: bool,
} }
unsafe impl<H> Sync for HttpServer<H> where H: IntoHttpHandler {} unsafe impl<H: IntoHttpHandler + 'static> Send for HttpServer<H> {}
unsafe impl<H> Send for HttpServer<H> where H: IntoHttpHandler {}
enum ServerCommand { enum ServerCommand {
WorkerDied(usize, Slab<SocketInfo>), WorkerDied(usize, Slab<SocketInfo>),
@ -485,11 +484,9 @@ impl<H: IntoHttpHandler> HttpServer<H> {
self.no_signals = false; self.no_signals = false;
let _ = thread::spawn(move || { let _ = thread::spawn(move || {
System::new("http-server") let sys = System::new("http-server");
.config(|| { self.start();
self.start(); sys.run();
})
.run();
}).join(); }).join();
} }
} }
@ -557,7 +554,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Self> pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Self>
where where
S: Stream<Item = T, Error = io::Error> + Send + 'static, S: Stream<Item = T, Error = io::Error> + Send + 'static,
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + Send + 'static,
{ {
// set server settings // set server settings
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
@ -730,25 +727,26 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H> {
}; };
for worker in &self.workers { for worker in &self.workers {
let tx2 = tx.clone(); let tx2 = tx.clone();
worker ctx.spawn(
.1 worker
.send(StopWorker { graceful: dur }) .1
.into_actor(self) .send(StopWorker { graceful: dur })
.then(move |_, slf, ctx| { .into_actor(self)
slf.workers.pop(); .then(move |_, slf, ctx| {
if slf.workers.is_empty() { slf.workers.pop();
let _ = tx2.send(()); if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned // we need to stop system if server was spawned
if slf.exit { if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| { ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop(); System::current().stop();
}); });
}
} }
} fut::ok(())
fut::ok(()) }),
}) );
.spawn(ctx);
} }
if !self.workers.is_empty() { if !self.workers.is_empty() {

View File

@ -65,6 +65,8 @@ where
tcp_ka: Option<time::Duration>, tcp_ka: Option<time::Duration>,
} }
unsafe impl<H: HttpHandler + 'static> Send for Worker<H> {}
impl<H: HttpHandler + 'static> Worker<H> { impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new( pub(crate) fn new(
h: Vec<H>, socks: Slab<SocketInfo>, keep_alive: KeepAlive, h: Vec<H>, socks: Slab<SocketInfo>, keep_alive: KeepAlive,

View File

@ -109,15 +109,14 @@ impl TestServer {
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
sys.config(move || { HttpServer::new(factory)
HttpServer::new(factory) .disable_signals()
.disable_signals() .listen(tcp)
.listen(tcp) .start();
.start();
tx.send((System::current(), local_addr, TestServer::get_conn())) tx.send((System::current(), local_addr, TestServer::get_conn()))
.unwrap(); .unwrap();
}).run(); sys.run();
}); });
let (system, addr, conn) = rx.recv().unwrap(); let (system, addr, conn) = rx.recv().unwrap();
@ -280,34 +279,32 @@ impl<S: 'static> TestServerBuilder<S> {
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
System::new("actix-test-server") let sys = System::new("actix-test-server");
.config(move || { let state = self.state;
let state = self.state; let srv = HttpServer::new(move || {
let srv = HttpServer::new(move || { let mut app = TestApp::new(state());
let mut app = TestApp::new(state()); config(&mut app);
config(&mut app); vec![app]
vec![app] }).workers(1)
}).workers(1) .disable_signals();
.disable_signals();
tx.send((System::current(), local_addr, TestServer::get_conn())) tx.send((System::current(), local_addr, TestServer::get_conn()))
.unwrap(); .unwrap();
#[cfg(feature = "alpn")] #[cfg(feature = "alpn")]
{ {
let ssl = self.ssl.take(); let ssl = self.ssl.take();
if let Some(ssl) = ssl { if let Some(ssl) = ssl {
srv.listen_ssl(tcp, ssl).unwrap().start(); srv.listen_ssl(tcp, ssl).unwrap().start();
} else { } else {
srv.listen(tcp).start(); srv.listen(tcp).start();
} }
} }
#[cfg(not(feature = "alpn"))] #[cfg(not(feature = "alpn"))]
{ {
srv.listen(tcp).start(); srv.listen(tcp).start();
} }
}) sys.run();
.run();
}); });
let (system, addr, conn) = rx.recv().unwrap(); let (system, addr, conn) = rx.recv().unwrap();