From 8371d77f7c06fa59efe74e5ba3f250e02d8ad214 Mon Sep 17 00:00:00 2001 From: mohanson Date: Mon, 15 Apr 2019 10:05:22 +0800 Subject: [PATCH 1/3] Add jsonrpc example --- Cargo.toml | 1 + jsonrpc/Cargo.toml | 17 +++++ jsonrpc/README.md | 34 +++++++++ jsonrpc/src/convention.rs | 137 +++++++++++++++++++++++++++++++++ jsonrpc/src/main.rs | 143 +++++++++++++++++++++++++++++++++++ jsonrpc/tests/test_client.py | 38 ++++++++++ 6 files changed, 370 insertions(+) create mode 100644 jsonrpc/Cargo.toml create mode 100644 jsonrpc/README.md create mode 100644 jsonrpc/src/convention.rs create mode 100644 jsonrpc/src/main.rs create mode 100644 jsonrpc/tests/test_client.py diff --git a/Cargo.toml b/Cargo.toml index d6285920..14ad4224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "http-proxy", "http-full-proxy", "json", + "jsonrpc", "juniper", "middleware", "multipart", diff --git a/jsonrpc/Cargo.toml b/jsonrpc/Cargo.toml new file mode 100644 index 00000000..65410619 --- /dev/null +++ b/jsonrpc/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "jsonrpc-example" +version = "0.1.0" +authors = ["mohanson "] +edition = "2018" +workspace = ".." + +[dependencies] +actix = "0.8.0-alpha.2" +actix-web = "1.0.0-alpha.4" +env_logger = "0.6" +futures = "0.1.23" +futures-timer = "0.1" +log = "0.4" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" diff --git a/jsonrpc/README.md b/jsonrpc/README.md new file mode 100644 index 00000000..35966663 --- /dev/null +++ b/jsonrpc/README.md @@ -0,0 +1,34 @@ +A simple demo for building a `JSONRPC over HTTP` server in [actix-web](https://github.com/actix/actix-web). + +# Server + +```sh +$ cargo run +# Starting server on 127.0.0.1:8080 +``` + +# Client + +**curl** + +```sh +$ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc": "2.0", "method": "ping", "params": [], "id": 1}' http://127.0.0.1:8080 +# {"jsonrpc":"2.0","result":"pong","error":null,"id":1} +``` + + +**python** + +```sh +$ python tests\test_client.py +# {'jsonrpc': '2.0', 'result': 'pong', 'error': None, 'id': 1} +``` + +# Methods + +- `ping`: Pong immeditely +- `pong`: Wait `n` seconds, and then pong +- `get`: Get global count +- `inc`: Increment global count + +See `tests\test_client.py` to get more information. diff --git a/jsonrpc/src/convention.rs b/jsonrpc/src/convention.rs new file mode 100644 index 00000000..300b826d --- /dev/null +++ b/jsonrpc/src/convention.rs @@ -0,0 +1,137 @@ +//! JSON-RPC 2.0 Specification +//! See: https://www.jsonrpc.org/specification +use std::error; +use std::fmt; + +use serde_derive::{Deserialize, Serialize}; +use serde_json::Value; + +pub static JSONRPC_VERSION: &str = "2.0"; + +/// When a rpc call encounters an error, the Response Object MUST contain the +/// error member with a value that is a Object with the following members: +#[derive(Debug, Serialize, Deserialize)] +pub struct ErrorData { + /// A Number that indicates the error type that occurred. This MUST be an integer. + pub code: i32, + + /// A String providing a short description of the error. The message SHOULD be + /// limited to a concise single sentence. + pub message: String, + + /// A Primitive or Structured value that contains additional information + /// about the error. This may be omitted. The value of this member is + /// defined by the Server (e.g. detailed error information, nested errors + /// etc.). + pub data: Value, +} + +impl ErrorData { + pub fn new(code: i32, message: &str) -> Self { + Self { + code, + message: String::from(message), + data: Value::Null, + } + } + + pub fn std(code: i32) -> Self { + match code { + // Invalid JSON was received by the server. An error occurred on the server while parsing the JSON text. + -32700 => ErrorData::new(-32700, "Parse error"), + // The JSON sent is not a valid Request object. + -32600 => ErrorData::new(-32600, "Invalid Request"), + // The method does not exist / is not available. + -32601 => ErrorData::new(-32601, "Method not found"), + // Invalid method parameter(s). + -32602 => ErrorData::new(-32602, "Invalid params"), + // Internal JSON-RPC error. + -32603 => ErrorData::new(-32603, "Internal error"), + // The error codes from and including -32768 to -32000 are reserved for pre-defined errors. Any code within + // this range, but not defined explicitly below is reserved for future use. + _ => panic!("Undefined pre-defined error codes"), + } + } + + /// Prints out the value as JSON string. + pub fn dump(&self) -> String { + serde_json::to_string(self).expect("Should never failed") + } +} + +impl error::Error for ErrorData {} +impl fmt::Display for ErrorData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "({}, {}, {})", self.code, self.message, self.data) + } +} + +/// A rpc call is represented by sending a Request object to a Server. +#[derive(Debug, Serialize, Deserialize)] +pub struct Request { + /// A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0". + pub jsonrpc: String, + + /// A String containing the name of the method to be invoked. Method names that begin with the word rpc followed by + /// a period character (U+002E or ASCII 46) are reserved for rpc-internal methods and extensions and MUST NOT be + /// used for anything else. + pub method: String, + + /// A Structured value that holds the parameter values to be used during the invocation of the method. This member + /// MAY be omitted. + pub params: Vec, + + /// An identifier established by the Client that MUST contain a String, Number, or NULL value if included. If it is + /// not included it is assumed to be a notification. The value SHOULD normally not be Null [1] and Numbers SHOULD + /// NOT contain fractional parts. + pub id: Value, +} + +impl Request { + /// Prints out the value as JSON string. + pub fn dump(&self) -> String { + serde_json::to_string(self).expect("Should never failed") + } +} + +/// When a rpc call is made, the Server MUST reply with a Response, except for in the case of Notifications. The +/// Response is expressed as a single JSON Object, with the following members: +#[derive(Debug, Serialize, Deserialize)] +pub struct Response { + /// A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0". + pub jsonrpc: String, + + /// This member is REQUIRED on success. + /// This member MUST NOT exist if there was an error invoking the method. + /// The value of this member is determined by the method invoked on the Server. + pub result: Value, + + // This member is REQUIRED on error. + // This member MUST NOT exist if there was no error triggered during invocation. + // The value for this member MUST be an Object as defined in section 5.1. + pub error: Option, + + /// This member is REQUIRED. + /// It MUST be the same as the value of the id member in the Request Object. + /// If there was an error in detecting the id in the Request object (e.g. Parse error/Invalid Request), + /// it MUST be Null. + pub id: Value, +} + +impl Response { + /// Prints out the value as JSON string. + pub fn dump(&self) -> String { + serde_json::to_string(self).expect("Should never failed") + } +} + +impl Default for Response { + fn default() -> Self { + Self { + jsonrpc: JSONRPC_VERSION.into(), + result: Value::Null, + error: None, + id: Value::Null, + } + } +} diff --git a/jsonrpc/src/main.rs b/jsonrpc/src/main.rs new file mode 100644 index 00000000..08237762 --- /dev/null +++ b/jsonrpc/src/main.rs @@ -0,0 +1,143 @@ +use std::error; +use std::sync::Arc; +use std::sync::RwLock; +use std::time::Duration; + +use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use futures::{future, Future, Stream}; +use futures_timer::Delay; +use serde_json; +use serde_json::Value; + +#[allow(dead_code)] +mod convention; + +/// The main handler for JSONRPC server. +fn rpc_handler( + req: HttpRequest, + payload: web::Payload, +) -> impl Future { + payload.concat2().from_err().and_then(move |body| { + let reqjson: convention::Request = match serde_json::from_slice(body.as_ref()) { + Ok(ok) => ok, + Err(_) => { + let r = convention::Response { + jsonrpc: String::from(convention::JSONRPC_VERSION), + result: Value::Null, + error: Some(convention::ErrorData::std(-32700)), + id: Value::Null, + }; + return Ok(HttpResponse::Ok() + .content_type("application/json") + .body(r.dump())); + } + }; + let app_state = req.app_data().unwrap(); + let mut result = convention::Response::default(); + result.id = reqjson.id.clone(); + + match rpc_select(&app_state, reqjson.method.as_str()) { + Ok(ok) => result.result = ok, + Err(e) => result.error = Some(e), + } + + Ok(HttpResponse::Ok() + .content_type("application/json") + .body(result.dump())) + }) +} + +fn rpc_select(app_state: &AppState, method: &str) -> Result { + match method { + "ping" => { + let r = app_state.network.read().unwrap().ping(); + Ok(Value::from(r)) + } + "wait" => match app_state.network.read().unwrap().wait(4).wait() { + Ok(ok) => Ok(Value::from(ok)), + Err(e) => Err(convention::ErrorData::new(500, &format!("{:?}", e)[..])), + }, + "get" => { + let r = app_state.network.read().unwrap().get(); + Ok(Value::from(r)) + } + "inc" => { + app_state.network.write().unwrap().inc(); + Ok(Value::Null) + } + _ => Err(convention::ErrorData::std(-32601)), + } +} + +pub trait ImplNetwork { + fn ping(&self) -> String; + fn wait(&self, d: u64) -> Box>>; + + fn get(&self) -> u32; + fn inc(&mut self); +} + +pub struct ObjNetwork { + c: u32, +} + +impl ObjNetwork { + fn new() -> Self { + Self { c: 0 } + } +} + +impl ImplNetwork for ObjNetwork { + fn ping(&self) -> String { + String::from("pong") + } + + fn wait(&self, d: u64) -> Box>> { + if let Err(e) = Delay::new(Duration::from_secs(d)).wait() { + let e: Box = Box::new(e); + return Box::new(future::err(e)); + }; + Box::new(future::ok(String::from("pong"))) + } + + fn get(&self) -> u32 { + self.c + } + + fn inc(&mut self) { + self.c += 1; + } +} + +#[derive(Clone)] +pub struct AppState { + network: Arc>, +} + +impl AppState { + pub fn new(network: Arc>) -> Self { + Self { network } + } +} + +fn main() { + std::env::set_var("RUST_LOG", "info"); + env_logger::init(); + + let network = Arc::new(RwLock::new(ObjNetwork::new())); + + let sys = actix::System::new("actix_jrpc"); + HttpServer::new(move || { + let app_state = AppState::new(network.clone()); + App::new() + .data(app_state) + .wrap(middleware::Logger::default()) + .service(web::resource("/").route(web::post().to_async(rpc_handler))) + }) + .bind("127.0.0.1:8080") + .unwrap() + .workers(1) + .start(); + + let _ = sys.run(); +} diff --git a/jsonrpc/tests/test_client.py b/jsonrpc/tests/test_client.py new file mode 100644 index 00000000..1df92bbd --- /dev/null +++ b/jsonrpc/tests/test_client.py @@ -0,0 +1,38 @@ +import requests + +print('ping: pong immediately') +r = requests.post('http://127.0.0.1:8080/', json={ + 'jsonrpc': '2.0', + 'method': 'ping', + 'params': [], + 'id': 1 +}) +print(r.json()) + + +print('ping: pong after 4 secs') +r = requests.post('http://127.0.0.1:8080/', json={ + 'jsonrpc': '2.0', + 'method': 'wait', + 'params': [4], + 'id': 1 +}) +print(r.json()) + +for i in range(10): + print(f'inc {i:>02}') + r = requests.post('http://127.0.0.1:8080/', json={ + 'jsonrpc': '2.0', + 'method': 'inc', + 'params': [], + 'id': 1 + }) + +print(f'get') +r = requests.post('http://127.0.0.1:8080/', json={ + 'jsonrpc': '2.0', + 'method': 'get', + 'params': [], + 'id': 1 +}) +print(r.json()) From 397b8e22fa8999e1a415530af040d2eac9aa6bc9 Mon Sep 17 00:00:00 2001 From: mohanson Date: Mon, 15 Apr 2019 10:27:25 +0800 Subject: [PATCH 2/3] Fix an error in method `wait` --- jsonrpc/src/main.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/jsonrpc/src/main.rs b/jsonrpc/src/main.rs index 08237762..18ba5545 100644 --- a/jsonrpc/src/main.rs +++ b/jsonrpc/src/main.rs @@ -36,7 +36,7 @@ fn rpc_handler( let mut result = convention::Response::default(); result.id = reqjson.id.clone(); - match rpc_select(&app_state, reqjson.method.as_str()) { + match rpc_select(&app_state, reqjson.method.as_str(), reqjson.params) { Ok(ok) => result.result = ok, Err(e) => result.error = Some(e), } @@ -47,16 +47,31 @@ fn rpc_handler( }) } -fn rpc_select(app_state: &AppState, method: &str) -> Result { +fn rpc_select( + app_state: &AppState, + method: &str, + params: Vec, +) -> Result { match method { "ping" => { let r = app_state.network.read().unwrap().ping(); Ok(Value::from(r)) } - "wait" => match app_state.network.read().unwrap().wait(4).wait() { - Ok(ok) => Ok(Value::from(ok)), - Err(e) => Err(convention::ErrorData::new(500, &format!("{:?}", e)[..])), - }, + "wait" => { + if params.len() != 1 || !params[0].is_u64() { + return Err(convention::ErrorData::std(-32602)); + } + match app_state + .network + .read() + .unwrap() + .wait(params[0].as_u64().unwrap()) + .wait() + { + Ok(ok) => Ok(Value::from(ok)), + Err(e) => Err(convention::ErrorData::new(500, &format!("{:?}", e)[..])), + } + } "get" => { let r = app_state.network.read().unwrap().get(); Ok(Value::from(r)) From 15833104ef6a638254d8b76b297ec6b8fb4b424a Mon Sep 17 00:00:00 2001 From: Mohanson Date: Mon, 15 Apr 2019 14:16:38 +0800 Subject: [PATCH 3/3] Update README.md --- jsonrpc/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jsonrpc/README.md b/jsonrpc/README.md index 35966663..b9a4ceac 100644 --- a/jsonrpc/README.md +++ b/jsonrpc/README.md @@ -27,7 +27,7 @@ $ python tests\test_client.py # Methods - `ping`: Pong immeditely -- `pong`: Wait `n` seconds, and then pong +- `wait`: Wait `n` seconds, and then pong - `get`: Get global count - `inc`: Increment global count