1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

Merge pull request #12 from Dowwie/master

added async_db project
This commit is contained in:
Nikolay Kim 2018-05-26 14:34:36 -07:00 committed by GitHub
commit e582249686
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 14231 additions and 0 deletions

View File

@ -1,6 +1,7 @@
[workspace] [workspace]
members = [ members = [
"./", "./",
"async_db",
"async_ex1", "async_ex1",
"basics", "basics",
"cookie-auth", "cookie-auth",

18
async_db/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "async_db"
version = "0.1.0"
authors = ["Darin Gordon <dkcdkg@gmail.com>"]
[dependencies]
actix = "0.5"
actix-web = "^0.6"
dotenv = "0.10"
env_logger = "0.5"
failure = "0.1.1"
futures = "0.1"
num_cpus = "1.8.0"
r2d2 = "0.8.2"
r2d2_sqlite = "0.5.0"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"

40
async_db/README.md Normal file
View File

@ -0,0 +1,40 @@
Getting started using databases with Actix web, asynchronously.
## Usage
### init database sqlite
From the root directory of this project:
```bash
bash db/setup_db.sh
```
This creates a sqlite database, weather.db, in the root.
### server
```bash
# if ubuntu : sudo apt-get install libsqlite3-dev
# if fedora : sudo dnf install libsqlite3x-devel
cargo run (or ``cargo watch -x run``)
# Started http server: 127.0.0.1:8080
```
### web client
[http://127.0.0.1:8080/asyncio_weather](http://127.0.0.1:8080/asyncio_weather)
[http://127.0.0.1:8080/parallel_weather](http://127.0.0.1:8080/parallel_weather)
### sqlite client
```bash
# if ubuntu : sudo apt-get install sqlite3
# if fedora : sudo dnf install sqlite3x
sqlite3 weather.db
sqlite> .tables
sqlite> select * from nyc_weather;
```

Binary file not shown.

6
async_db/db/README.md Normal file
View File

@ -0,0 +1,6 @@
This directory includes weather information obtained from NOAA for NYC Central Park: https://www.ncdc.noaa.gov/cdo-web/
# Setup Instructions
Set up a sqlite3 database by executing the setup_db.sh file: ``bash sqlite_db.sh``

28
async_db/db/db.sql Executable file
View File

@ -0,0 +1,28 @@
CREATE TABLE nyc_weather(
STATION TEXT,
NAME TEXT,
DATE TEXT,
ACMH DOUBLE,
AWND DOUBLE,
FMTM DOUBLE,
PGTM DOUBLE,
PRCP DOUBLE,
PSUN DOUBLE,
SNOW DOUBLE,
SNWD DOUBLE,
TAVG DOUBLE,
TMAX DOUBLE,
TMIN DOUBLE,
TSUN DOUBLE,
WDF1 DOUBLE,
WDF2 DOUBLE,
WDF5 DOUBLE,
WDFG DOUBLE,
WDFM DOUBLE,
WSF1 DOUBLE,
WSF2 DOUBLE,
WSF5 DOUBLE,
WSFG DOUBLE,
WSFM DOUBLE
);

File diff suppressed because it is too large Load Diff

2
async_db/db/setup_db.sh Executable file
View File

@ -0,0 +1,2 @@
sqlite3 weather.db < db/db.sql ; \
sqlite3 -csv weather.db ".import db/nyc_centralpark_weather.csv nyc_weather"

136
async_db/src/db.rs Normal file
View File

@ -0,0 +1,136 @@
use actix::prelude::*;
use actix_web::*;
use std::{time::Duration, thread::sleep};
use failure::Error;
use r2d2;
use r2d2_sqlite;
pub type Pool = r2d2::Pool<r2d2_sqlite::SqliteConnectionManager>;
pub type Connection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
pub struct DbExecutor(pub Pool);
impl Actor for DbExecutor {
type Context = SyncContext<Self>;
}
#[derive(Debug, Serialize, Deserialize)]
pub enum WeatherAgg {
AnnualAgg {year: i32, total: f64},
MonthAgg {year: i32, month: i32, total: f64}
}
pub enum Queries {
GetTopTenHottestYears,
GetTopTenColdestYears,
GetTopTenHottestMonths,
GetTopTenColdestMonths
}
//pub struct GetTopTenHottestYears;
impl Message for Queries {
type Result = Result<Vec<WeatherAgg>, Error>;
}
impl Handler<Queries> for DbExecutor {
type Result = Result<Vec<WeatherAgg>, Error>;
fn handle(&mut self, msg: Queries, _: &mut Self::Context) -> Self::Result {
let conn: Connection = self.0.get()?;
match msg {
Queries::GetTopTenHottestYears => get_hottest_years(conn),
Queries::GetTopTenColdestYears => get_coldest_years(conn),
Queries::GetTopTenHottestMonths =>get_hottest_months(conn),
Queries::GetTopTenColdestMonths => get_coldest_months(conn),
}
}
}
fn get_hottest_years(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let stmt = "
SELECT cast(strftime('%Y', date) as int) as theyear,
sum(tmax) as total
FROM nyc_weather
WHERE tmax <> 'TMAX'
GROUP BY theyear
ORDER BY total DESC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| {
WeatherAgg::AnnualAgg{year: row.get(0),
total: row.get(1)}})
.and_then(|mapped_rows|
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?;
sleep(Duration::from_secs(2));
Ok(annuals)
}
fn get_coldest_years(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let stmt = "
SELECT cast(strftime('%Y', date) as int) as theyear,
sum(tmax) as total
FROM nyc_weather
WHERE tmax <> 'TMAX'
GROUP BY theyear
ORDER BY total ASC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| {
WeatherAgg::AnnualAgg{year: row.get(0),
total: row.get(1)}})
.and_then(|mapped_rows|
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?;
sleep(Duration::from_secs(2));
Ok(annuals)
}
fn get_hottest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let stmt =
"SELECT cast(strftime('%Y', date) as int) as theyear,
cast(strftime('%m', date) as int) as themonth,
sum(tmax) as total
FROM nyc_weather
WHERE tmax <> 'TMAX'
GROUP BY theyear, themonth
ORDER BY total DESC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| {
WeatherAgg::MonthAgg{year: row.get(0),
month: row.get(1),
total: row.get(2)}})
.and_then(|mapped_rows|
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?;
sleep(Duration::from_secs(2));
Ok(annuals)
}
fn get_coldest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let stmt =
"SELECT cast(strftime('%Y', date) as int) as theyear,
cast(strftime('%m', date) as int) as themonth,
sum(tmax) as total
FROM nyc_weather
WHERE tmax <> 'TMAX'
GROUP BY theyear, themonth
ORDER BY total ASC LIMIT 10;";
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt.query_map(&[], |row| {
WeatherAgg::MonthAgg{year: row.get(0),
month: row.get(1),
total: row.get(2)}})
.and_then(|mapped_rows|
Ok(mapped_rows.map(|row| row.unwrap()).collect::<Vec<WeatherAgg>>()))?;
sleep(Duration::from_secs(2));
Ok(annuals)
}

118
async_db/src/main.rs Normal file
View File

@ -0,0 +1,118 @@
/* Actix-Web Asynchronous Database Example
This project illustrates two examples:
1. An asynchronous handler that executes 4 queries in *sequential order*,
collecting the results and returning them as a single serialized json object
2. An asynchronous handler that executes 4 queries in *parallel*,
collecting the results and returning them as a single serialized json object
*/
#[macro_use] extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate failure;
extern crate futures;
extern crate num_cpus;
extern crate r2d2;
extern crate r2d2_sqlite;
extern crate serde;
#[macro_use] extern crate serde_derive;
extern crate serde_json;
use actix::prelude::*;
use actix_web::{
http, middleware, server, App, AsyncResponder, FutureResponse, HttpResponse,
State, Error as AWError
};
use std::error::Error as StdError;
use failure::Error;
use futures::future::{Future, join_all, ok as fut_ok, err as fut_err};
use r2d2_sqlite::SqliteConnectionManager;
mod db;
use db::{DbExecutor, Queries, WeatherAgg, Pool};
/// State with DbExecutor address
struct AppState {
db: Addr<Syn, DbExecutor>,
}
/// Version 1: Calls 4 queries in sequential order, as an asynchronous handler
fn asyncio_weather(state: State<AppState>) -> FutureResponse<HttpResponse> {
let mut result: Vec<Vec<WeatherAgg>> = vec![];
state.db.send(Queries::GetTopTenHottestYears).from_err()
.and_then(move |res| {
result.push(res.unwrap());
state.db.send(Queries::GetTopTenColdestYears).from_err()
.and_then(move |res| {
result.push(res.unwrap());
state.db.send(Queries::GetTopTenHottestMonths).from_err()
.and_then(move |res| {
result.push(res.unwrap());
state.db.send(Queries::GetTopTenColdestMonths).from_err()
.and_then(move |res| {
result.push(res.unwrap());
fut_ok(result)
})
})
})
})
.and_then(|res| Ok(HttpResponse::Ok().json(res)))
.responder()
}
/// Version 2: Calls 4 queries in parallel, as an asynchronous handler
/// Returning Error types turn into None values in the response
fn parallel_weather(state: State<AppState>) -> FutureResponse<HttpResponse> {
let fut_result = vec![
Box::new(state.db.send(Queries::GetTopTenHottestYears)),
Box::new(state.db.send(Queries::GetTopTenColdestYears)),
Box::new(state.db.send(Queries::GetTopTenHottestMonths)),
Box::new(state.db.send(Queries::GetTopTenColdestMonths))];
join_all(fut_result)
.map_err(AWError::from)
.and_then(|result| {
let res: Vec<Option<Vec<WeatherAgg>>> =
result.into_iter().map(|x| x.ok()).collect();
Ok(HttpResponse::Ok().json(res))
})
.responder()
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
env_logger::init();
let sys = actix::System::new("parallel_db_example");
// Start N db executor actors (N = number of cores avail)
let manager = SqliteConnectionManager::file("weather.db");
let pool = Pool::new(manager).unwrap();
let addr = SyncArbiter::start(num_cpus::get(), move || DbExecutor(pool.clone()));
// Start http server
server::new(move || {
App::with_state(AppState{db: addr.clone()})
// enable logger
.middleware(middleware::Logger::default())
.resource("/asyncio_weather", |r|
r.method(http::Method::GET)
.with(asyncio_weather))
.resource("/parallel_weather", |r|
r.method(http::Method::GET)
.with(parallel_weather))
}).bind("127.0.0.1:8080")
.unwrap()
.start();
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run();
}