1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 07:14:35 +01:00

simplify application factory

This commit is contained in:
Nikolay Kim 2018-09-09 14:33:45 -07:00
parent a63d3f9a7a
commit 6f3e70a92a
6 changed files with 80 additions and 110 deletions

View File

@ -410,45 +410,52 @@ where
self.keepalive_timer.take();
// search handler for request
for h in self.settings.handlers().iter() {
msg = match h.handle(msg) {
Ok(mut pipe) => {
if self.tasks.is_empty() {
match pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
match self.settings.handler().handle(msg) {
Ok(mut pipe) => {
if self.tasks.is_empty() {
match pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
if !ready {
let item = Entry {
pipe: EntryPipe::Task(pipe),
flags: EntryFlags::EOF,
};
self.tasks.push_back(item);
}
continue 'outer;
}
Ok(Async::NotReady) => {}
Err(err) => {
error!("Unhandled error: {}", err);
self.flags.insert(Flags::ERROR);
return;
if !ready {
let item = Entry {
pipe: EntryPipe::Task(pipe),
flags: EntryFlags::EOF,
};
self.tasks.push_back(item);
}
continue 'outer;
}
Ok(Async::NotReady) => {}
Err(err) => {
error!("Unhandled error: {}", err);
self.flags.insert(Flags::ERROR);
return;
}
}
self.tasks.push_back(Entry {
pipe: EntryPipe::Task(pipe),
flags: EntryFlags::empty(),
});
continue 'outer;
}
Err(msg) => msg,
self.tasks.push_back(Entry {
pipe: EntryPipe::Task(pipe),
flags: EntryFlags::empty(),
});
continue 'outer;
}
Err(msg) => {
// handler is not found
self.tasks.push_back(Entry {
pipe: EntryPipe::Error(ServerError::err(
Version::HTTP_11,
StatusCode::NOT_FOUND,
)),
flags: EntryFlags::empty(),
});
}
}

View File

@ -368,28 +368,20 @@ impl<H: HttpHandler + 'static> Entry<H> {
let psender = PayloadType::new(msg.headers(), psender);
// start request processing
let mut task = None;
for h in settings.handlers().iter() {
msg = match h.handle(msg) {
Ok(t) => {
task = Some(t);
break;
}
Err(msg) => msg,
}
}
let task = match settings.handler().handle(msg) {
Ok(task) => EntryPipe::Task(task),
Err(msg) => EntryPipe::Error(ServerError::err(
Version::HTTP_2,
StatusCode::NOT_FOUND,
)),
};
Entry {
task: task.map(EntryPipe::Task).unwrap_or_else(|| {
EntryPipe::Error(ServerError::err(
Version::HTTP_2,
StatusCode::NOT_FOUND,
))
}),
task,
recv,
payload: psender,
stream: H2Writer::new(resp, settings),
flags: EntryFlags::empty(),
recv,
}
}

View File

@ -39,7 +39,7 @@ struct Socket<H: IntoHttpHandler> {
pub struct HttpServer<H, F>
where
H: IntoHttpHandler + 'static,
F: Fn() -> Vec<H> + Send + Clone,
F: Fn() -> H + Send + Clone,
{
factory: F,
host: Option<String>,
@ -58,33 +58,10 @@ where
impl<H, F> HttpServer<H, F>
where
H: IntoHttpHandler + 'static,
F: Fn() -> Vec<H> + Send + Clone + 'static,
F: Fn() -> H + Send + Clone + 'static,
{
/// Create new http server with application factory
pub fn new<F1, U>(factory: F1) -> HttpServer<H, impl Fn() -> Vec<H> + Send + Clone>
where
F1: Fn() -> U + Send + Clone,
U: IntoIterator<Item = H> + 'static,
{
let f = move || (factory.clone())().into_iter().collect();
HttpServer {
threads: num_cpus::get(),
factory: f,
host: None,
backlog: 2048,
keep_alive: KeepAlive::Os,
shutdown_timeout: 30,
exit: false,
no_http2: false,
no_signals: false,
maxconn: 25_600,
maxconnrate: 256,
sockets: Vec::new(),
}
}
pub(crate) fn with_factory(factory: F) -> HttpServer<H, F> {
pub fn new(factory: F) -> HttpServer<H, F> {
HttpServer {
factory,
threads: num_cpus::get(),
@ -489,7 +466,7 @@ where
// }
}
impl<H: IntoHttpHandler, F: Fn() -> Vec<H> + Send + Clone> HttpServer<H, F> {
impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
/// Start listening for incoming connections.
///
/// This method starts number of http workers in separate threads.
@ -629,7 +606,7 @@ impl<H: IntoHttpHandler, F: Fn() -> Vec<H> + Send + Clone> HttpServer<H, F> {
struct HttpService<F, H, Io>
where
F: Fn() -> Vec<H>,
F: Fn() -> H,
H: IntoHttpHandler,
Io: IoStream,
{
@ -642,7 +619,7 @@ where
impl<F, H, Io> NewService for HttpService<F, H, Io>
where
F: Fn() -> Vec<H>,
F: Fn() -> H,
H: IntoHttpHandler,
Io: IoStream,
{
@ -655,12 +632,9 @@ where
fn new_service(&self) -> Self::Future {
let s = ServerSettings::new(Some(self.addr), &self.host, false);
let apps: Vec<_> = (self.factory)()
.into_iter()
.map(|h| h.into_handler())
.collect();
let app = (self.factory)().into_handler();
ok(HttpServiceHandler::new(apps, self.keep_alive, s))
ok(HttpServiceHandler::new(app, self.keep_alive, s))
}
}
@ -680,14 +654,14 @@ where
Io: IoStream,
{
fn new(
apps: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
app: H, keep_alive: KeepAlive, settings: ServerSettings,
) -> HttpServiceHandler<H, Io> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
let settings = WorkerSettings::new(apps, keep_alive, settings);
let settings = WorkerSettings::new(app, keep_alive, settings);
HttpServiceHandler {
tcp_ka,
@ -733,7 +707,7 @@ where
struct SimpleFactory<H, F, P>
where
H: IntoHttpHandler,
F: Fn() -> Vec<H> + Send + Clone,
F: Fn() -> H + Send + Clone,
P: HttpPipelineFactory<Io = TcpStream>,
{
pub addr: net::SocketAddr,
@ -744,7 +718,7 @@ where
impl<H: IntoHttpHandler, F, P> Clone for SimpleFactory<H, F, P>
where
P: HttpPipelineFactory<Io = TcpStream>,
F: Fn() -> Vec<H> + Send + Clone,
F: Fn() -> H + Send + Clone,
{
fn clone(&self) -> Self {
SimpleFactory {
@ -758,7 +732,7 @@ where
impl<H, F, P> ServiceFactory<H> for SimpleFactory<H, F, P>
where
H: IntoHttpHandler + 'static,
F: Fn() -> Vec<H> + Send + Clone + 'static,
F: Fn() -> H + Send + Clone + 'static,
P: HttpPipelineFactory<Io = TcpStream>,
{
fn register(&self, server: Server, lst: net::TcpListener) -> Server {
@ -894,7 +868,7 @@ where
struct DefaultPipelineFactory<F, H, Io>
where
F: Fn() -> Vec<H> + Send + Clone,
F: Fn() -> H + Send + Clone,
{
factory: F,
host: Option<String>,
@ -906,7 +880,7 @@ where
impl<F, H, Io> DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> Vec<H> + Send + Clone + 'static,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
fn new(
@ -925,7 +899,7 @@ where
impl<F, H, Io> Clone for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream,
F: Fn() -> Vec<H> + Send + Clone,
F: Fn() -> H + Send + Clone,
H: IntoHttpHandler,
{
fn clone(&self) -> Self {
@ -942,7 +916,7 @@ where
impl<F, H, Io> HttpPipelineFactory for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> Vec<H> + Send + Clone + 'static,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
type Io = Io;

View File

@ -174,13 +174,12 @@ const HW_BUFFER_SIZE: usize = 32_768;
/// sys.run();
/// }
/// ```
pub fn new<F, U, H>(factory: F) -> HttpServer<H, impl Fn() -> Vec<H> + Send + Clone>
pub fn new<F, H>(factory: F) -> HttpServer<H, F>
where
F: Fn() -> U + Send + Clone + 'static,
U: IntoIterator<Item = H>,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
HttpServer::with_factory(move || (factory.clone())().into_iter().collect())
HttpServer::new(factory)
}
#[doc(hidden)]

View File

@ -136,7 +136,7 @@ const DATE_VALUE_LENGTH: usize = 29;
pub(crate) struct WorkerSettings<H>(Rc<Inner<H>>);
struct Inner<H> {
h: Vec<H>,
handler: H,
keep_alive: u64,
ka_enabled: bool,
bytes: Rc<SharedBytesPool>,
@ -153,7 +153,7 @@ impl<H> Clone for WorkerSettings<H> {
impl<H> WorkerSettings<H> {
pub(crate) fn new(
h: Vec<H>, keep_alive: KeepAlive, settings: ServerSettings,
handler: H, keep_alive: KeepAlive, settings: ServerSettings,
) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true),
@ -162,7 +162,7 @@ impl<H> WorkerSettings<H> {
};
WorkerSettings(Rc::new(Inner {
h,
handler,
keep_alive,
ka_enabled,
bytes: Rc::new(SharedBytesPool::new()),
@ -176,8 +176,8 @@ impl<H> WorkerSettings<H> {
self.0.node.borrow_mut()
}
pub fn handlers(&self) -> &Vec<H> {
&self.0.h
pub fn handler(&self) -> &H {
&self.0.handler
}
pub fn keep_alive_timer(&self) -> Option<Delay> {

View File

@ -103,14 +103,12 @@ impl TestServer {
}
/// Start new test server with application factory
pub fn with_factory<F, U, H>(factory: F) -> Self
pub fn with_factory<F, H>(factory: F) -> Self
where
F: Fn() -> U + Send + Clone + 'static,
U: IntoIterator<Item = H>,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
let (tx, rx) = mpsc::channel();
let factory = move || (factory.clone())().into_iter().collect();
// run server in separate thread
thread::spawn(move || {
@ -118,7 +116,7 @@ impl TestServer {
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
let _ = HttpServer::with_factory(factory)
let _ = HttpServer::new(factory)
.disable_signals()
.listen(tcp)
.keep_alive(5)
@ -328,10 +326,10 @@ where
let sys = System::new("actix-test-server");
let state = self.state;
let mut srv = HttpServer::with_factory(move || {
let mut srv = HttpServer::new(move || {
let mut app = TestApp::new(state());
config(&mut app);
vec![app]
app
}).workers(1)
.keep_alive(5)
.disable_signals();