diff --git a/diesel/Cargo.toml b/diesel/Cargo.toml index 69e3db66..a2f309b8 100644 --- a/diesel/Cargo.toml +++ b/diesel/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Nikolay Kim "] workspace = "../" [dependencies] +bytes = "0.4" env_logger = "0.5" actix = "0.7" diff --git a/diesel/src/main.rs b/diesel/src/main.rs index a94f65ab..d30519d6 100644 --- a/diesel/src/main.rs +++ b/diesel/src/main.rs @@ -16,16 +16,20 @@ extern crate env_logger; extern crate futures; extern crate r2d2; extern crate uuid; +extern crate bytes; +// extern crate json; + +use bytes::BytesMut; use actix::prelude::*; use actix_web::{ - http, middleware, server, App, AsyncResponder, FutureResponse, HttpResponse, Path, - State, + http, middleware, server, App, AsyncResponder, FutureResponse, HttpResponse, Path, Error, HttpRequest, + State, HttpMessage, error, Json }; use diesel::prelude::*; use diesel::r2d2::ConnectionManager; -use futures::Future; +use futures::{future, Future, Stream}; mod db; mod models; @@ -39,7 +43,7 @@ struct AppState { } /// Async request handler -fn index( +fn add( (name, state): (Path, State), ) -> FutureResponse { // send async `CreateUser` message to a `DbExecutor` @@ -56,6 +60,73 @@ fn index( .responder() } +#[derive(Debug, Serialize, Deserialize)] +struct MyUser { + name: String +} + +const MAX_SIZE: usize = 262_144; // max payload size is 256k + +/// This handler manually load request payload and parse json object +fn index_add((req, state): (HttpRequest, State)) -> impl Future { + // HttpRequest::payload() is stream of Bytes objects + req.payload() + // `Future::from_err` acts like `?` in that it coerces the error type from + // the future into the final error type + .from_err() + + // `fold` will asynchronously read each chunk of the request body and + // call supplied closure, then it resolves to result of closure + .fold(BytesMut::new(), move |mut body, chunk| { + // limit max size of in-memory payload + if (body.len() + chunk.len()) > MAX_SIZE { + Err(error::ErrorBadRequest("overflow")) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + // `Future::and_then` can be used to merge an asynchronous workflow with a + // synchronous workflow + // + // Douman NOTE: + // The return value in this closure helps, to clarify result for compiler + // as otheriwse it cannot understand it + .and_then(move |body| -> Box> { + // body is loaded, now we can deserialize serde-json + let r_obj = serde_json::from_slice::(&body); + + // Send to the db for create + match r_obj { + Ok(obj) => { + let res = state.db.send(CreateUser { name: obj.name, }) + .from_err() + .and_then(|res| match res { + Ok(user) => Ok(HttpResponse::Ok().json(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }); + + Box::new(res) + } + Err(_) => Box::new(future::err(error::ErrorBadRequest("Json Decode Failed"))) + } + }) +} + +fn add2((item, state): (Json, State)) -> impl Future { + state.db + .send(CreateUser { + // into_inner to move into the reference, then accessing name to + // move the name out. + name: item.into_inner().name, + }) + .from_err() + .and_then(|res| match res { + Ok(user) => Ok(HttpResponse::Ok().json(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }) +} + fn main() { ::std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); @@ -74,7 +145,21 @@ fn main() { App::with_state(AppState{db: addr.clone()}) // enable logger .middleware(middleware::Logger::default()) - .resource("/{name}", |r| r.method(http::Method::GET).with(index)) + // This can be called with: + // curl -S --header "Content-Type: application/json" --request POST --data '{"name":"xyz"}' http://127.0.0.1:8080/add + // Use of the extractors makes some post conditions simpler such + // as size limit protections and built in json validation. + .resource("/add2", |r| { + r.method(http::Method::POST) + .with_async_config(add2, |(json_cfg, )| { + json_cfg.0.limit(4096); // <- limit size of the payload + }) + }) + // Manual parsing would allow custom error construction, use of + // other parsers *beside* json (for example CBOR, protobuf, xml), and allows + // an application to standardise on a single parser implementation. + .resource("/add", |r| r.method(http::Method::POST).with_async(index_add)) + .resource("/add/{name}", |r| r.method(http::Method::GET).with(add)) }).bind("127.0.0.1:8080") .unwrap() .start(); diff --git a/diesel/test.db b/diesel/test.db deleted file mode 100644 index 65e590a6..00000000 Binary files a/diesel/test.db and /dev/null differ