diff --git a/async_db/Cargo.toml b/async_db/Cargo.toml index 56e97c2f..9dee0d2f 100644 --- a/async_db/Cargo.toml +++ b/async_db/Cargo.toml @@ -2,10 +2,12 @@ name = "async_db" version = "0.1.0" authors = ["Darin Gordon "] +edition = "2018" +workspace = ".." [dependencies] -actix = "0.7" -actix-web = "0.7" +actix-rt = "0.2" +actix-web = { git="https://github.com/actix/actix-web.git", branch = "1.0" } dotenv = "0.10" env_logger = "0.5" diff --git a/async_db/src/db.rs b/async_db/src/db.rs index a7903d71..9b700237 100644 --- a/async_db/src/db.rs +++ b/async_db/src/db.rs @@ -1,17 +1,14 @@ -use actix::prelude::*; +use actix_web::{blocking, Error as AWError}; use failure::Error; +use futures::Future; use r2d2; use r2d2_sqlite; +use serde_derive::{Deserialize, Serialize}; use std::{thread::sleep, time::Duration}; pub type Pool = r2d2::Pool; pub type Connection = r2d2::PooledConnection; -pub struct DbExecutor(pub Pool); -impl Actor for DbExecutor { - type Context = SyncContext; -} - #[derive(Debug, Serialize, Deserialize)] pub enum WeatherAgg { AnnualAgg { year: i32, total: f64 }, @@ -25,29 +22,24 @@ pub enum Queries { GetTopTenColdestMonths, } -//pub struct GetTopTenHottestYears; -impl Message for Queries { - type Result = Result, Error>; -} -impl Handler for DbExecutor { - type Result = Result, Error>; - - fn handle(&mut self, msg: Queries, _: &mut Self::Context) -> Self::Result { - let conn: Connection = self.0.get()?; - - match msg { - Queries::GetTopTenHottestYears => get_hottest_years(conn), - Queries::GetTopTenColdestYears => get_coldest_years(conn), - Queries::GetTopTenHottestMonths => get_hottest_months(conn), - Queries::GetTopTenColdestMonths => get_coldest_months(conn), - } - } +pub fn execute( + pool: &Pool, + query: Queries, +) -> impl Future, Error = AWError> { + let pool = pool.clone(); + blocking::run(move || match query { + Queries::GetTopTenHottestYears => get_hottest_years(pool.get()?), + Queries::GetTopTenColdestYears => get_coldest_years(pool.get()?), + Queries::GetTopTenHottestMonths => get_hottest_months(pool.get()?), + Queries::GetTopTenColdestMonths => get_coldest_months(pool.get()?), + }) + .from_err() } fn get_hottest_years(conn: Connection) -> Result, Error> { let stmt = " SELECT cast(strftime('%Y', date) as int) as theyear, - sum(tmax) as total + sum(tmax) as total FROM nyc_weather WHERE tmax <> 'TMAX' GROUP BY theyear @@ -73,7 +65,7 @@ fn get_hottest_years(conn: Connection) -> Result, Error> { fn get_coldest_years(conn: Connection) -> Result, Error> { let stmt = " SELECT cast(strftime('%Y', date) as int) as theyear, - sum(tmax) as total + sum(tmax) as total FROM nyc_weather WHERE tmax <> 'TMAX' GROUP BY theyear @@ -98,8 +90,8 @@ fn get_coldest_years(conn: Connection) -> Result, Error> { fn get_hottest_months(conn: Connection) -> Result, Error> { let stmt = "SELECT cast(strftime('%Y', date) as int) as theyear, - cast(strftime('%m', date) as int) as themonth, - sum(tmax) as total + cast(strftime('%m', date) as int) as themonth, + sum(tmax) as total FROM nyc_weather WHERE tmax <> 'TMAX' GROUP BY theyear, themonth @@ -124,8 +116,8 @@ fn get_hottest_months(conn: Connection) -> Result, Error> { fn get_coldest_months(conn: Connection) -> Result, Error> { let stmt = "SELECT cast(strftime('%Y', date) as int) as theyear, - cast(strftime('%m', date) as int) as themonth, - sum(tmax) as total + cast(strftime('%m', date) as int) as themonth, + sum(tmax) as total FROM nyc_weather WHERE tmax <> 'TMAX' GROUP BY theyear, themonth diff --git a/async_db/src/main.rs b/async_db/src/main.rs index 42dcb1bd..cd66d3f0 100644 --- a/async_db/src/main.rs +++ b/async_db/src/main.rs @@ -8,122 +8,89 @@ This project illustrates two examples: 2. An asynchronous handler that executes 4 queries in *parallel*, collecting the results and returning them as a single serialized json object -*/ + */ +use std::io; -extern crate actix; -extern crate actix_web; -extern crate env_logger; -extern crate failure; -extern crate futures; -extern crate num_cpus; -extern crate r2d2; -extern crate r2d2_sqlite; -extern crate serde; -#[macro_use] -extern crate serde_derive; -extern crate serde_json; - -use actix::prelude::*; -use actix_web::{ - http, middleware, server, App, AsyncResponder, Error as AWError, FutureResponse, - HttpResponse, State, -}; +use actix_web::{web, App, Error as AWError, HttpResponse, HttpServer, State}; use futures::future::{join_all, ok as fut_ok, Future}; +use r2d2_sqlite; use r2d2_sqlite::SqliteConnectionManager; mod db; -use db::{DbExecutor, Pool, Queries, WeatherAgg}; - -/// State with DbExecutor address -struct AppState { - db: Addr, -} +use db::{Pool, Queries, WeatherAgg}; /// Version 1: Calls 4 queries in sequential order, as an asynchronous handler -fn asyncio_weather(state: State) -> FutureResponse { +fn asyncio_weather( + db: State, +) -> impl Future { let mut result: Vec> = vec![]; - state - .db - .send(Queries::GetTopTenHottestYears) + db::execute(&db, Queries::GetTopTenHottestYears) .from_err() .and_then(move |res| { - result.push(res.unwrap()); - state - .db - .send(Queries::GetTopTenColdestYears) + result.push(res); + db::execute(&db, Queries::GetTopTenColdestYears) .from_err() .and_then(move |res| { - result.push(res.unwrap()); - state - .db - .send(Queries::GetTopTenHottestMonths) + result.push(res); + db::execute(&db, Queries::GetTopTenHottestMonths) .from_err() .and_then(move |res| { - result.push(res.unwrap()); - state - .db - .send(Queries::GetTopTenColdestMonths) + result.push(res); + db::execute(&db, Queries::GetTopTenColdestMonths) .from_err() .and_then(move |res| { - result.push(res.unwrap()); + result.push(res); fut_ok(result) }) }) }) }) .and_then(|res| Ok(HttpResponse::Ok().json(res))) - .responder() } /// Version 2: Calls 4 queries in parallel, as an asynchronous handler /// Returning Error types turn into None values in the response -fn parallel_weather(state: State) -> FutureResponse { +fn parallel_weather( + db: State, +) -> impl Future { let fut_result = vec![ - Box::new(state.db.send(Queries::GetTopTenHottestYears)), - Box::new(state.db.send(Queries::GetTopTenColdestYears)), - Box::new(state.db.send(Queries::GetTopTenHottestMonths)), - Box::new(state.db.send(Queries::GetTopTenColdestMonths)), + Box::new(db::execute(&db, Queries::GetTopTenHottestYears)), + Box::new(db::execute(&db, Queries::GetTopTenColdestYears)), + Box::new(db::execute(&db, Queries::GetTopTenHottestMonths)), + Box::new(db::execute(&db, Queries::GetTopTenColdestMonths)), ]; join_all(fut_result) .map_err(AWError::from) - .and_then(|result| { - let res: Vec>> = - result.into_iter().map(|x| x.ok()).collect(); - - Ok(HttpResponse::Ok().json(res)) - }) - .responder() + .map(|result| HttpResponse::Ok().json(result)) } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); +fn main() -> io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); - let sys = actix::System::new("parallel_db_example"); + let sys = actix_rt::System::new("parallel_db_example"); // Start N db executor actors (N = number of cores avail) - let manager = SqliteConnectionManager::file("weather.db"); let pool = Pool::new(manager).unwrap(); - let addr = SyncArbiter::start(num_cpus::get(), move || DbExecutor(pool.clone())); - // Start http server - server::new(move || { - App::with_state(AppState{db: addr.clone()}) + HttpServer::new(move || { + App::new() + .state(pool.clone()) // enable logger - .middleware(middleware::Logger::default()) - .resource("/asyncio_weather", |r| - r.method(http::Method::GET) - .with(asyncio_weather)) - .resource("/parallel_weather", |r| - r.method(http::Method::GET) - .with(parallel_weather)) - }).bind("127.0.0.1:8080") - .unwrap() - .start(); + // .middleware(middleware::Logger::default()) + .resource("/asyncio_weather", |r| { + r.route(web::get().to_async(asyncio_weather)) + }) + .resource("/parallel_weather", |r| { + r.route(web::get().to_async(parallel_weather)) + }) + }) + .bind("127.0.0.1:8080")? + .start(); println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); + sys.run() }