diff --git a/async_db/src/main.rs b/async_db/src/main.rs index 1102703..e958399 100644 --- a/async_db/src/main.rs +++ b/async_db/src/main.rs @@ -11,9 +11,7 @@ This project illustrates two examples: */ use std::io; -use actix_web::{ - middleware, web, App, Error as AWError, HttpResponse, HttpServer, State, -}; +use actix_web::{middleware, web, App, Error as AWError, HttpResponse, HttpServer}; use futures::future::{join_all, ok as fut_ok, Future}; use r2d2_sqlite; use r2d2_sqlite::SqliteConnectionManager; @@ -23,7 +21,7 @@ use db::{Pool, Queries, WeatherAgg}; /// Version 1: Calls 4 queries in sequential order, as an asynchronous handler fn asyncio_weather( - db: State, + db: web::State, ) -> impl Future { let mut result: Vec> = vec![]; @@ -54,7 +52,7 @@ fn asyncio_weather( /// Version 2: Calls 4 queries in parallel, as an asynchronous handler /// Returning Error types turn into None values in the response fn parallel_weather( - db: State, + db: web::State, ) -> impl Future { let fut_result = vec![ Box::new(db::execute(&db, Queries::GetTopTenHottestYears)), diff --git a/basics/src/main.rs b/basics/src/main.rs index 8768718..55f25bb 100644 --- a/basics/src/main.rs +++ b/basics/src/main.rs @@ -1,8 +1,10 @@ +#[macro_use] +extern crate actix_web; + use std::{env, io}; use actix_files as fs; use actix_session::{CookieSession, Session}; -use actix_web::extract::Path; use actix_web::http::{header, Method, StatusCode}; use actix_web::{ error, guard, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, @@ -13,11 +15,13 @@ use futures::unsync::mpsc; use futures::{future::ok, Future, Stream}; /// favicon handler +#[get("/favicon")] fn favicon() -> Result { Ok(fs::NamedFile::open("static/favicon.ico")?) } /// simple index handler +#[get("/welcome")] fn welcome(session: Session, req: HttpRequest) -> Result { println!("{:?}", req); @@ -52,7 +56,7 @@ fn index_async(req: HttpRequest) -> impl Future) -> HttpResponse { +fn index_async_body(path: web::Path) -> HttpResponse { let text = format!("Hello {}!", *path); let (tx, rx_body) = mpsc::unbounded(); @@ -63,7 +67,7 @@ fn index_async_body(path: Path) -> HttpResponse { } /// handler with path parameters like `/user/{name}/` -fn with_param(req: HttpRequest, path: Path<(String,)>) -> HttpResponse { +fn with_param(req: HttpRequest, path: web::Path<(String,)>) -> HttpResponse { println!("{:?}", req); HttpResponse::Ok() @@ -84,9 +88,9 @@ fn main() -> io::Result<()> { // cookie session middleware .middleware(CookieSession::signed(&[0; 32]).secure(false)) // register favicon - .service(web::resource("/favicon").to(favicon)) + .service(favicon) // register simple route, handle all methods - .service(web::resource("/welcome").to(welcome)) + .service(welcome) // with path parameters .service(web::resource("/user/{name}").route(web::get().to(with_param))) // async handler diff --git a/diesel/Cargo.toml b/diesel/Cargo.toml index a2f309b..af94c95 100644 --- a/diesel/Cargo.toml +++ b/diesel/Cargo.toml @@ -2,15 +2,14 @@ name = "diesel-example" version = "0.1.0" authors = ["Nikolay Kim "] -workspace = "../" +workspace = ".." +edition = "2018" [dependencies] +actix-web = { git="https://github.com/actix/actix-web.git", branch = "1.0" } + bytes = "0.4" -env_logger = "0.5" - -actix = "0.7" -actix-web = "0.7" - +env_logger = "0.6" futures = "0.1" uuid = { version = "0.5", features = ["serde", "v4"] } serde = "1.0" diff --git a/diesel/src/db.rs b/diesel/src/db.rs deleted file mode 100644 index de3dd17..0000000 --- a/diesel/src/db.rs +++ /dev/null @@ -1,55 +0,0 @@ -//! Db executor actor -use actix::prelude::*; -use actix_web::*; -use diesel; -use diesel::prelude::*; -use diesel::r2d2::{ConnectionManager, Pool}; -use uuid; - -use models; -use schema; - -/// This is db executor actor. We are going to run 3 of them in parallel. -pub struct DbExecutor(pub Pool>); - -/// This is only message that this actor can handle, but it is easy to extend -/// number of messages. -pub struct CreateUser { - pub name: String, -} - -impl Message for CreateUser { - type Result = Result; -} - -impl Actor for DbExecutor { - type Context = SyncContext; -} - -impl Handler for DbExecutor { - type Result = Result; - - fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result { - use self::schema::users::dsl::*; - - let uuid = format!("{}", uuid::Uuid::new_v4()); - let new_user = models::NewUser { - id: &uuid, - name: &msg.name, - }; - - let conn: &SqliteConnection = &self.0.get().unwrap(); - - diesel::insert_into(users) - .values(&new_user) - .execute(conn) - .map_err(|_| error::ErrorInternalServerError("Error inserting person"))?; - - let mut items = users - .filter(id.eq(&uuid)) - .load::(conn) - .map_err(|_| error::ErrorInternalServerError("Error loading person"))?; - - Ok(items.pop().unwrap()) - } -} diff --git a/diesel/src/main.rs b/diesel/src/main.rs index d30519d..8ff8ce6 100644 --- a/diesel/src/main.rs +++ b/diesel/src/main.rs @@ -4,77 +4,74 @@ //! Actix supports sync actors by default, so we going to create sync actor //! that use diesel. Technically sync actors are worker style actors, multiple //! of them can run in parallel and process messages from same queue. -extern crate serde; -extern crate serde_json; -#[macro_use] -extern crate serde_derive; #[macro_use] extern crate diesel; -extern crate actix; -extern crate actix_web; -extern crate env_logger; -extern crate futures; -extern crate r2d2; -extern crate uuid; -extern crate bytes; -// extern crate json; - - -use bytes::BytesMut; -use actix::prelude::*; -use actix_web::{ - http, middleware, server, App, AsyncResponder, FutureResponse, HttpResponse, Path, Error, HttpRequest, - State, HttpMessage, error, Json -}; +#[macro_use] +extern crate serde_derive; +use actix_web::{error, middleware, web, App, Error, HttpResponse, HttpServer}; +use bytes::{Bytes, BytesMut}; use diesel::prelude::*; -use diesel::r2d2::ConnectionManager; -use futures::{future, Future, Stream}; +use diesel::r2d2::{self, ConnectionManager}; +use futures::future::{err, Either}; +use futures::{Future, Stream}; -mod db; mod models; mod schema; -use db::{CreateUser, DbExecutor}; +type Pool = r2d2::Pool>; -/// State with DbExecutor address -struct AppState { - db: Addr, +/// Diesel query +fn query( + nm: String, + pool: web::State, +) -> Result { + use self::schema::users::dsl::*; + + let uuid = format!("{}", uuid::Uuid::new_v4()); + let new_user = models::NewUser { + id: &uuid, + name: nm.as_str(), + }; + let conn: &SqliteConnection = &pool.get().unwrap(); + + diesel::insert_into(users).values(&new_user).execute(conn)?; + + let mut items = users.filter(id.eq(&uuid)).load::(conn)?; + Ok(items.pop().unwrap()) } /// Async request handler fn add( - (name, state): (Path, State), -) -> FutureResponse { - // send async `CreateUser` message to a `DbExecutor` - state - .db - .send(CreateUser { - name: name.into_inner(), - }) - .from_err() - .and_then(|res| match res { - Ok(user) => Ok(HttpResponse::Ok().json(user)), - Err(_) => Ok(HttpResponse::InternalServerError().into()), - }) - .responder() + name: web::Path, + pool: web::State, +) -> impl Future { + // run diesel blocking code + web::block(move || query(name.into_inner(), pool)).then(|res| match res { + Ok(user) => Ok(HttpResponse::Ok().json(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }) } #[derive(Debug, Serialize, Deserialize)] struct MyUser { - name: String + name: String, } const MAX_SIZE: usize = 262_144; // max payload size is 256k /// This handler manually load request payload and parse json object -fn index_add((req, state): (HttpRequest, State)) -> impl Future { - // HttpRequest::payload() is stream of Bytes objects - req.payload() +fn index_add

( + pl: web::Payload

, + pool: web::State, +) -> impl Future +where + P: Stream, +{ + pl // `Future::from_err` acts like `?` in that it coerces the error type from // the future into the final error type .from_err() - // `fold` will asynchronously read each chunk of the request body and // call supplied closure, then it resolves to result of closure .fold(BytesMut::new(), move |mut body, chunk| { @@ -92,45 +89,39 @@ fn index_add((req, state): (HttpRequest, State)) -> impl Fut // Douman NOTE: // The return value in this closure helps, to clarify result for compiler // as otheriwse it cannot understand it - .and_then(move |body| -> Box> { + .and_then(move |body| { // body is loaded, now we can deserialize serde-json let r_obj = serde_json::from_slice::(&body); // Send to the db for create match r_obj { Ok(obj) => { - let res = state.db.send(CreateUser { name: obj.name, }) - .from_err() - .and_then(|res| match res { - Ok(user) => Ok(HttpResponse::Ok().json(user)), - Err(_) => Ok(HttpResponse::InternalServerError().into()), - }); - - Box::new(res) + Either::A(web::block(move || query(obj.name, pool)).then(|res| { + match res { + Ok(user) => Ok(HttpResponse::Ok().json(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()), + } + })) } - Err(_) => Box::new(future::err(error::ErrorBadRequest("Json Decode Failed"))) + Err(_) => Either::B(err(error::ErrorBadRequest("Json Decode Failed"))), } }) } -fn add2((item, state): (Json, State)) -> impl Future { - state.db - .send(CreateUser { - // into_inner to move into the reference, then accessing name to - // move the name out. - name: item.into_inner().name, - }) - .from_err() - .and_then(|res| match res { - Ok(user) => Ok(HttpResponse::Ok().json(user)), - Err(_) => Ok(HttpResponse::InternalServerError().into()), - }) +fn add2( + item: web::Json, + pool: web::State, +) -> impl Future { + // run diesel blocking code + web::block(move || query(item.into_inner().name, pool)).then(|res| match res { + Ok(user) => Ok(HttpResponse::Ok().json(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }) } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); +fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); - let sys = actix::System::new("diesel-example"); // Start 3 db executor actors let manager = ConnectionManager::::new("test.db"); @@ -138,32 +129,29 @@ fn main() { .build(manager) .expect("Failed to create pool."); - let addr = SyncArbiter::start(3, move || DbExecutor(pool.clone())); - // Start http server - server::new(move || { - App::with_state(AppState{db: addr.clone()}) + HttpServer::new(move || { + App::new() + .state(pool.clone()) // enable logger .middleware(middleware::Logger::default()) // This can be called with: // curl -S --header "Content-Type: application/json" --request POST --data '{"name":"xyz"}' http://127.0.0.1:8080/add // Use of the extractors makes some post conditions simpler such // as size limit protections and built in json validation. - .resource("/add2", |r| { - r.method(http::Method::POST) - .with_async_config(add2, |(json_cfg, )| { - json_cfg.0.limit(4096); // <- limit size of the payload - }) - }) + .service( + web::resource("/add2").route( + web::post() + .config(web::JsonConfig::default().limit(4096)) // <- limit size of the payload + .to_async(add2), + ), + ) // Manual parsing would allow custom error construction, use of // other parsers *beside* json (for example CBOR, protobuf, xml), and allows // an application to standardise on a single parser implementation. - .resource("/add", |r| r.method(http::Method::POST).with_async(index_add)) - .resource("/add/{name}", |r| r.method(http::Method::GET).with(add)) - }).bind("127.0.0.1:8080") - .unwrap() - .start(); - - println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); + .service(web::resource("/add").route(web::post().to_async(index_add))) + .service(web::resource("/add/{name}").route(web::get().to_async(add))) + }) + .bind("127.0.0.1:8080")? + .run() } diff --git a/json/Cargo.toml b/json/Cargo.toml index c358f4a..ff724d2 100644 --- a/json/Cargo.toml +++ b/json/Cargo.toml @@ -2,9 +2,12 @@ name = "json-example" version = "0.1.0" authors = ["Nikolay Kim "] -workspace = "../" +workspace = ".." +edition = "2018" [dependencies] +actix-web = { git="https://github.com/actix/actix-web.git", branch = "1.0" } + bytes = "0.4" futures = "0.1" env_logger = "*" @@ -13,6 +16,3 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" json = "*" - -actix = "0.7" -actix-web = "^0.7" diff --git a/json/src/main.rs b/json/src/main.rs index a93f761..6c9d7a5 100644 --- a/json/src/main.rs +++ b/json/src/main.rs @@ -1,22 +1,13 @@ -extern crate actix; -extern crate actix_web; -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate serde_json; -#[macro_use] -extern crate serde_derive; #[macro_use] extern crate json; use actix_web::{ - error, http, middleware, server, App, AsyncResponder, Error, HttpMessage, - HttpRequest, HttpResponse, Json, + error, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, }; - -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::{Future, Stream}; use json::JsonValue; +use serde_derive::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] struct MyObj { @@ -24,39 +15,34 @@ struct MyObj { number: i32, } -/// This handler uses `HttpRequest::json()` for loading json object. -fn index(req: &HttpRequest) -> Box> { - req.json() - .from_err() // convert all errors into `Error` - .and_then(|val: MyObj| { - println!("model: {:?}", val); - Ok(HttpResponse::Ok().json(val)) // <- send response - }) - .responder() -} - /// This handler uses json extractor -fn extract_item(item: Json) -> HttpResponse { +fn index(item: web::Json) -> HttpResponse { println!("model: {:?}", &item); HttpResponse::Ok().json(item.0) // <- send response } /// This handler uses json extractor with limit -fn extract_item_limit((item, _req): (Json, HttpRequest)) -> HttpResponse { - println!("model: {:?}", &item); - HttpResponse::Ok().json(item.0) // <- send response +fn extract_item(item: web::Json, req: HttpRequest) -> HttpResponse { + println!("request: {:?}", req); + println!("model: {:?}", item); + + HttpResponse::Ok().json(item.0) // <- send json response } const MAX_SIZE: usize = 262_144; // max payload size is 256k /// This handler manually load request payload and parse json object -fn index_manual(req: &HttpRequest) -> Box> { - // HttpRequest::payload() is stream of Bytes objects - req.payload() +fn index_manual

( + payload: web::Payload

, +) -> impl Future +where + P: Stream, +{ + // payload is a stream of Bytes objects + payload // `Future::from_err` acts like `?` in that it coerces the error type from // the future into the final error type .from_err() - // `fold` will asynchronously read each chunk of the request body and // call supplied closure, then it resolves to result of closure .fold(BytesMut::new(), move |mut body, chunk| { @@ -75,59 +61,56 @@ fn index_manual(req: &HttpRequest) -> Box(&body)?; Ok(HttpResponse::Ok().json(obj)) // <- send response }) - .responder() } /// This handler manually load request payload and parse json-rust -fn index_mjsonrust( - req: &HttpRequest, -) -> Box> { - req.payload() - .concat2() - .from_err() - .and_then(|body| { - // body is loaded, now we can deserialize json-rust - let result = json::parse(std::str::from_utf8(&body).unwrap()); // return Result - let injson: JsonValue = match result { - Ok(v) => v, - Err(e) => object!{"err" => e.to_string() }, - }; - Ok(HttpResponse::Ok() - .content_type("application/json") - .body(injson.dump())) - }) - .responder() +fn index_mjsonrust

( + pl: web::Payload

, +) -> impl Future +where + P: Stream, +{ + pl.concat2().from_err().and_then(|body| { + // body is loaded, now we can deserialize json-rust + let result = json::parse(std::str::from_utf8(&body).unwrap()); // return Result + let injson: JsonValue = match result { + Ok(v) => v, + Err(e) => json::object! {"err" => e.to_string() }, + }; + Ok(HttpResponse::Ok() + .content_type("application/json") + .body(injson.dump())) + }) } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); +fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); - let sys = actix::System::new("json-example"); - server::new(|| { + HttpServer::new(|| { App::new() // enable logger .middleware(middleware::Logger::default()) - .resource("/extractor", |r| { - r.method(http::Method::POST) - .with_config(extract_item, |(cfg,)| { - cfg.limit(4096); // <- limit size of the payload - }) - }) - .resource("/extractor2", |r| { - r.method(http::Method::POST) - .with_config(extract_item_limit, |((cfg, _),)| { - cfg.limit(4096); // <- limit size of the payload - }) - }) - .resource("/manual", |r| r.method(http::Method::POST).f(index_manual)) - .resource("/mjsonrust", |r| r.method(http::Method::POST).f(index_mjsonrust)) - .resource("/", |r| r.method(http::Method::POST).f(index)) - }).bind("127.0.0.1:8080") - .unwrap() - .shutdown_timeout(1) - .start(); - - println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); + .service( + web::resource("/extractor").route( + web::post() + .config(web::JsonConfig::default().limit(4096)) // <- limit size of the payload + .to(index), + ), + ) + .service( + web::resource("/extractor2").route( + web::post() + .config(web::JsonConfig::default().limit(4096)) // <- limit size of the payload + .to_async(extract_item), + ), + ) + .service(web::resource("/manual").route(web::post().to_async(index_manual))) + .service( + web::resource("/mjsonrust").route(web::post().to_async(index_mjsonrust)), + ) + .service(web::resource("/").route(web::post().to(index))) + }) + .bind("127.0.0.1:8080")? + .run() } diff --git a/r2d2/src/main.rs b/r2d2/src/main.rs index da83a8a..9b1060d 100644 --- a/r2d2/src/main.rs +++ b/r2d2/src/main.rs @@ -1,10 +1,7 @@ //! Actix web r2d2 example use std::io; -use actix_web::{ - blocking, extract::Path, middleware, web, App, Error, HttpResponse, HttpServer, - State, -}; +use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer}; use futures::Future; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; @@ -12,11 +9,11 @@ use uuid; /// Async request handler. Ddb pool is stored in application state. fn index( - path: Path, - db: State>, + path: web::Path, + db: web::State>, ) -> impl Future { // execute sync code in threadpool - blocking::run(move || { + web::block(move || { let conn = db.get().unwrap(); let uuid = format!("{}", uuid::Uuid::new_v4()); diff --git a/state/Cargo.toml b/state/Cargo.toml index edf0e38..7d8e0a0 100644 --- a/state/Cargo.toml +++ b/state/Cargo.toml @@ -2,11 +2,11 @@ name = "state" version = "0.1.0" authors = ["Nikolay Kim "] -workspace = "../" +workspace = ".." +edition = "2018" [dependencies] -futures = "0.1" -env_logger = "0.5" +actix-web = { git="https://github.com/actix/actix-web.git", branch = "1.0" } -actix = "0.7" -actix-web = "0.7" +futures = "0.1" +env_logger = "0.6" diff --git a/state/src/main.rs b/state/src/main.rs index be1bd46..f67e929 100644 --- a/state/src/main.rs +++ b/state/src/main.rs @@ -1,7 +1,7 @@ #![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] -//! There are two level of statefulness in actix-web. Application has state -//! that is shared across all handlers within same Application. -//! And individual handler can have state. +//! Application may have multiple states that are shared across +//! all handlers within same Application. State could be added +//! with `App::state()` method, multiple different states could be added. //! //! > **Note**: http server accepts an application factory rather than an //! application > instance. Http server constructs an application instance for @@ -11,45 +11,34 @@ //! //! Check [user guide](https://actix.rs/book/actix-web/sec-2-application.html) for more info. -extern crate actix; -extern crate actix_web; -extern crate env_logger; +use std::io; +use std::sync::{Arc, Mutex}; -use std::sync::Arc; -use std::sync::Mutex; - -use actix_web::{middleware, server, App, HttpRequest, HttpResponse}; - -/// Application state -struct AppState { - counter: Arc>, -} +use actix_web::{middleware, web, App, HttpRequest, HttpResponse, HttpServer}; /// simple handle -fn index(req: &HttpRequest) -> HttpResponse { +fn index(state: web::State>>, req: HttpRequest) -> HttpResponse { println!("{:?}", req); - *(req.state().counter.lock().unwrap()) += 1; + *(state.lock().unwrap()) += 1; - HttpResponse::Ok().body(format!("Num of requests: {}", req.state().counter.lock().unwrap())) + HttpResponse::Ok().body(format!("Num of requests: {}", state.lock().unwrap())) } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); +fn main() -> io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); - let sys = actix::System::new("ws-example"); let counter = Arc::new(Mutex::new(0)); + //move is necessary to give closure below ownership of counter - server::new(move || { - App::with_state(AppState{counter: counter.clone()}) // <- create app with shared state + HttpServer::new(move || { + App::new() + .state(counter.clone()) // <- create app with shared state // enable logger .middleware(middleware::Logger::default()) // register simple handler, handle all methods - .resource("/", |r| r.f(index)) - }).bind("127.0.0.1:8080") - .unwrap() - .start(); - - println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); + .service(web::resource("/").to(index)) + }) + .bind("127.0.0.1:8080")? + .run() }