diff --git a/.gitignore b/.gitignore index 50281a4..fcaefd7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,10 @@ # Generated by Cargo # will have compiled files and executables -/target/ +/*/target/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here http://doc.crates.io/guide.html#cargotoml-vs-cargolock -Cargo.lock +/*/Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk diff --git a/basics/Cargo.toml b/basics/Cargo.toml new file mode 100644 index 0000000..3f53e5b --- /dev/null +++ b/basics/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "basics" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +futures = "0.1" +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" diff --git a/basics/README.md b/basics/README.md new file mode 100644 index 0000000..82e35e0 --- /dev/null +++ b/basics/README.md @@ -0,0 +1,20 @@ +# basics + +## Usage + +### server + +```bash +cd actix-web/examples/basics +cargo run +# Started http server: 127.0.0.1:8080 +``` + +### web client + +- [http://localhost:8080/index.html](http://localhost:8080/index.html) +- [http://localhost:8080/async/bob](http://localhost:8080/async/bob) +- [http://localhost:8080/user/bob/](http://localhost:8080/user/bob/) plain/text download +- [http://localhost:8080/test](http://localhost:8080/test) (return status switch GET or POST or other) +- [http://localhost:8080/static/index.html](http://localhost:8080/static/index.html) +- [http://localhost:8080/static/notexit](http://localhost:8080/static/notexit) display 404 page diff --git a/basics/src/main.rs b/basics/src/main.rs new file mode 100644 index 0000000..633f482 --- /dev/null +++ b/basics/src/main.rs @@ -0,0 +1,136 @@ +#![allow(unused_variables)] +#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))] + +extern crate actix; +extern crate actix_web; +extern crate env_logger; +extern crate futures; +use futures::Stream; + +use std::{io, env}; +use actix_web::{error, fs, pred, server, + App, HttpRequest, HttpResponse, Result, Error}; +use actix_web::http::{header, Method, StatusCode}; +use actix_web::middleware::{self, RequestSession}; +use futures::future::{FutureResult, result}; + +/// favicon handler +fn favicon(req: HttpRequest) -> Result { + Ok(fs::NamedFile::open("../static/favicon.ico")?) +} + +/// simple index handler +fn index(mut req: HttpRequest) -> Result { + println!("{:?}", req); + + // example of ... + if let Ok(ch) = req.poll() { + if let futures::Async::Ready(Some(d)) = ch { + println!("{}", String::from_utf8_lossy(d.as_ref())); + } + } + + // session + let mut counter = 1; + if let Some(count) = req.session().get::("counter")? { + println!("SESSION value: {}", count); + counter = count + 1; + req.session().set("counter", counter)?; + } else { + req.session().set("counter", counter)?; + } + + + // response + Ok(HttpResponse::build(StatusCode::OK) + .content_type("text/html; charset=utf-8") + .body(include_str!("../static/welcome.html"))) + +} + +/// 404 handler +fn p404(req: HttpRequest) -> Result { + Ok(fs::NamedFile::open("./static/404.html")? + .set_status_code(StatusCode::NOT_FOUND)) +} + + +/// async handler +fn index_async(req: HttpRequest) -> FutureResult +{ + println!("{:?}", req); + + result(Ok(HttpResponse::Ok() + .content_type("text/html") + .body(format!("Hello {}!", req.match_info().get("name").unwrap())))) +} + +/// handler with path parameters like `/user/{name}/` +fn with_param(req: HttpRequest) -> HttpResponse +{ + println!("{:?}", req); + + HttpResponse::Ok() + .content_type("test/plain") + .body(format!("Hello {}!", req.match_info().get("name").unwrap())) +} + +fn main() { + env::set_var("RUST_LOG", "actix_web=debug"); + env::set_var("RUST_BACKTRACE", "1"); + env_logger::init(); + let sys = actix::System::new("basic-example"); + + let addr = server::new( + || App::new() + // enable logger + .middleware(middleware::Logger::default()) + // cookie session middleware + .middleware(middleware::SessionStorage::new( + middleware::CookieSessionBackend::signed(&[0; 32]).secure(false) + )) + // register favicon + .resource("/favicon.ico", |r| r.f(favicon)) + // register simple route, handle all methods + .resource("/index.html", |r| r.f(index)) + // with path parameters + .resource("/user/{name}/", |r| r.method(Method::GET).f(with_param)) + // async handler + .resource("/async/{name}", |r| r.method(Method::GET).a(index_async)) + .resource("/test", |r| r.f(|req| { + match *req.method() { + Method::GET => HttpResponse::Ok(), + Method::POST => HttpResponse::MethodNotAllowed(), + _ => HttpResponse::NotFound(), + } + })) + .resource("/error.html", |r| r.f(|req| { + error::InternalError::new( + io::Error::new(io::ErrorKind::Other, "test"), StatusCode::OK) + })) + // static files + .handler("/static/", fs::StaticFiles::new("../static/")) + // redirect + .resource("/", |r| r.method(Method::GET).f(|req| { + println!("{:?}", req); + HttpResponse::Found() + .header(header::LOCATION, "/index.html") + .finish() + })) + // default + .default_resource(|r| { + // 404 for GET request + r.method(Method::GET).f(p404); + + // all requests that are not `GET` + r.route().filter(pred::Not(pred::Get())).f( + |req| HttpResponse::MethodNotAllowed()); + })) + + .bind("127.0.0.1:8080").expect("Can not bind to 127.0.0.1:8080") + .shutdown_timeout(0) // <- Set shutdown timeout to 0 seconds (default 60s) + .start(); + + println!("Starting http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/basics/static/404.html b/basics/static/404.html new file mode 100644 index 0000000..eda58c3 --- /dev/null +++ b/basics/static/404.html @@ -0,0 +1,7 @@ +actix - basics + + + back to home +

404

+ + diff --git a/basics/static/welcome.html b/basics/static/welcome.html new file mode 100644 index 0000000..b85527f --- /dev/null +++ b/basics/static/welcome.html @@ -0,0 +1,6 @@ +actix - basics + + +

Welcome

+ + diff --git a/diesel/.env b/diesel/.env new file mode 100644 index 0000000..1fbc5af --- /dev/null +++ b/diesel/.env @@ -0,0 +1 @@ +DATABASE_URL=file:test.db diff --git a/diesel/Cargo.toml b/diesel/Cargo.toml new file mode 100644 index 0000000..bda8e6e --- /dev/null +++ b/diesel/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "diesel-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" + +futures = "0.1" +uuid = { version = "0.5", features = ["serde", "v4"] } +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +diesel = { version = "^1.1.0", features = ["sqlite", "r2d2"] } +r2d2 = "0.8" +dotenv = "0.10" diff --git a/diesel/README.md b/diesel/README.md new file mode 100644 index 0000000..922ba1e --- /dev/null +++ b/diesel/README.md @@ -0,0 +1,43 @@ +# diesel + +Diesel's `Getting Started` guide using SQLite for Actix web + +## Usage + +### init database sqlite + +```bash +cargo install diesel_cli --no-default-features --features sqlite +cd actix-web/examples/diesel +echo "DATABASE_URL=file:test.db" > .env +diesel migration run +``` + +### server + +```bash +# if ubuntu : sudo apt-get install libsqlite3-dev +# if fedora : sudo dnf install libsqlite3x-devel +cd actix-web/examples/diesel +cargo run (or ``cargo watch -x run``) +# Started http server: 127.0.0.1:8080 +``` + +### web client + +[http://127.0.0.1:8080/NAME](http://127.0.0.1:8080/NAME) + +### sqlite client + +```bash +# if ubuntu : sudo apt-get install sqlite3 +# if fedora : sudo dnf install sqlite3x +sqlite3 test.db +sqlite> .tables +sqlite> select * from users; +``` + + +## Postgresql + +You will also find another complete example of diesel+postgresql on [https://github.com/TechEmpower/FrameworkBenchmarks/tree/master/frameworks/Rust/actix](https://github.com/TechEmpower/FrameworkBenchmarks/tree/master/frameworks/Rust/actix) \ No newline at end of file diff --git a/diesel/migrations/20170124012402_create_users/down.sql b/diesel/migrations/20170124012402_create_users/down.sql new file mode 100644 index 0000000..9951735 --- /dev/null +++ b/diesel/migrations/20170124012402_create_users/down.sql @@ -0,0 +1 @@ +DROP TABLE users diff --git a/diesel/migrations/20170124012402_create_users/up.sql b/diesel/migrations/20170124012402_create_users/up.sql new file mode 100644 index 0000000..d88d44f --- /dev/null +++ b/diesel/migrations/20170124012402_create_users/up.sql @@ -0,0 +1,4 @@ +CREATE TABLE users ( + id VARCHAR NOT NULL PRIMARY KEY, + name VARCHAR NOT NULL +) diff --git a/diesel/src/db.rs b/diesel/src/db.rs new file mode 100644 index 0000000..78806c2 --- /dev/null +++ b/diesel/src/db.rs @@ -0,0 +1,55 @@ +//! Db executor actor +use uuid; +use diesel; +use actix_web::*; +use actix::prelude::*; +use diesel::prelude::*; +use diesel::r2d2::{Pool, ConnectionManager}; + +use models; +use schema; + +/// This is db executor actor. We are going to run 3 of them in parallel. +pub struct DbExecutor(pub Pool>); + +/// This is only message that this actor can handle, but it is easy to extend number of +/// messages. +pub struct CreateUser { + pub name: String, +} + +impl Message for CreateUser { + type Result = Result; +} + +impl Actor for DbExecutor { + type Context = SyncContext; +} + +impl Handler for DbExecutor { + type Result = Result; + + fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result { + use self::schema::users::dsl::*; + + let uuid = format!("{}", uuid::Uuid::new_v4()); + let new_user = models::NewUser { + id: &uuid, + name: &msg.name, + }; + + let conn: &SqliteConnection = &self.0.get().unwrap(); + + diesel::insert_into(users) + .values(&new_user) + .execute(conn) + .expect("Error inserting person"); + + let mut items = users + .filter(id.eq(&uuid)) + .load::(conn) + .expect("Error loading person"); + + Ok(items.pop().unwrap()) + } +} diff --git a/diesel/src/main.rs b/diesel/src/main.rs new file mode 100644 index 0000000..2fd7087 --- /dev/null +++ b/diesel/src/main.rs @@ -0,0 +1,78 @@ +//! Actix web diesel example +//! +//! Diesel does not support tokio, so we have to run it in separate threads. +//! Actix supports sync actors by default, so we going to create sync actor that use diesel. +//! Technically sync actors are worker style actors, multiple of them +//! can run in parallel and process messages from same queue. +extern crate serde; +extern crate serde_json; +#[macro_use] +extern crate serde_derive; +#[macro_use] +extern crate diesel; +extern crate r2d2; +extern crate uuid; +extern crate futures; +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use actix::prelude::*; +use actix_web::{http, server, middleware, + App, Path, State, HttpResponse, AsyncResponder, FutureResponse}; + +use diesel::prelude::*; +use diesel::r2d2::{ Pool, ConnectionManager }; +use futures::future::Future; + +mod db; +mod models; +mod schema; + +use db::{CreateUser, DbExecutor}; + + +/// State with DbExecutor address +struct AppState { + db: Addr, +} + +/// Async request handler +fn index(name: Path, state: State) -> FutureResponse { + // send async `CreateUser` message to a `DbExecutor` + state.db.send(CreateUser{name: name.into_inner()}) + .from_err() + .and_then(|res| { + match res { + Ok(user) => Ok(HttpResponse::Ok().json(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()) + } + }) + .responder() +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("diesel-example"); + + // Start 3 db executor actors + let manager = ConnectionManager::::new("test.db"); + let pool = r2d2::Pool::builder().build(manager).expect("Failed to create pool."); + + let addr = SyncArbiter::start(3, move || { + DbExecutor(pool.clone()) + }); + + // Start http server + server::new(move || { + App::with_state(AppState{db: addr.clone()}) + // enable logger + .middleware(middleware::Logger::default()) + .resource("/{name}", |r| r.method(http::Method::GET).with2(index))}) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/diesel/src/models.rs b/diesel/src/models.rs new file mode 100644 index 0000000..315d59f --- /dev/null +++ b/diesel/src/models.rs @@ -0,0 +1,14 @@ +use super::schema::users; + +#[derive(Serialize, Queryable)] +pub struct User { + pub id: String, + pub name: String, +} + +#[derive(Insertable)] +#[table_name = "users"] +pub struct NewUser<'a> { + pub id: &'a str, + pub name: &'a str, +} diff --git a/diesel/src/schema.rs b/diesel/src/schema.rs new file mode 100644 index 0000000..51aa40b --- /dev/null +++ b/diesel/src/schema.rs @@ -0,0 +1,6 @@ +table! { + users (id) { + id -> Text, + name -> Text, + } +} diff --git a/diesel/test.db b/diesel/test.db new file mode 100644 index 0000000..65e590a Binary files /dev/null and b/diesel/test.db differ diff --git a/hello-world/Cargo.toml b/hello-world/Cargo.toml new file mode 100644 index 0000000..e195e20 --- /dev/null +++ b/hello-world/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "hello-world" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" diff --git a/hello-world/src/main.rs b/hello-world/src/main.rs new file mode 100644 index 0000000..2af4789 --- /dev/null +++ b/hello-world/src/main.rs @@ -0,0 +1,28 @@ +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use actix_web::{App, HttpRequest, server, middleware}; + + +fn index(_req: HttpRequest) -> &'static str { + "Hello world!" +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("hello-world"); + + server::new( + || App::new() + // enable logger + .middleware(middleware::Logger::default()) + .resource("/index.html", |r| r.f(|_| "Hello world!")) + .resource("/", |r| r.f(index))) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/http-proxy/Cargo.toml b/http-proxy/Cargo.toml new file mode 100644 index 0000000..4a151c6 --- /dev/null +++ b/http-proxy/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "http-proxy" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +env_logger = "0.5" +futures = "0.1" +actix = "0.5" +actix-web = { version = "^0.5", features=["alpn"] } diff --git a/http-proxy/src/main.rs b/http-proxy/src/main.rs new file mode 100644 index 0000000..0a392ed --- /dev/null +++ b/http-proxy/src/main.rs @@ -0,0 +1,58 @@ +extern crate actix; +extern crate actix_web; +extern crate futures; +extern crate env_logger; + +use futures::{Future, Stream}; +use actix_web::{ + client, server, middleware, + App, AsyncResponder, Body, HttpRequest, HttpResponse, HttpMessage, Error}; + + +/// Stream client request response and then send body to a server response +fn index(_req: HttpRequest) -> Box> { + client::ClientRequest::get("https://www.rust-lang.org/en-US/") + .finish().unwrap() + .send() + .map_err(Error::from) // <- convert SendRequestError to an Error + .and_then( + |resp| resp.body() // <- this is MessageBody type, resolves to complete body + .from_err() // <- convert PayloadError to a Error + .and_then(|body| { // <- we got complete body, now send as server response + Ok(HttpResponse::Ok().body(body)) + })) + .responder() +} + +/// streaming client request to a streaming server response +fn streaming(_req: HttpRequest) -> Box> { + // send client request + client::ClientRequest::get("https://www.rust-lang.org/en-US/") + .finish().unwrap() + .send() // <- connect to host and send request + .map_err(Error::from) // <- convert SendRequestError to an Error + .and_then(|resp| { // <- we received client response + Ok(HttpResponse::Ok() + // read one chunk from client response and send this chunk to a server response + // .from_err() converts PayloadError to a Error + .body(Body::Streaming(Box::new(resp.from_err())))) + }) + .responder() +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("http-proxy"); + + server::new( + || App::new() + .middleware(middleware::Logger::default()) + .resource("/streaming", |r| r.f(streaming)) + .resource("/", |r| r.f(index))) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/json/Cargo.toml b/json/Cargo.toml new file mode 100644 index 0000000..290da28 --- /dev/null +++ b/json/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "json-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +env_logger = "*" + +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" +json = "*" + +actix = "0.5" +actix-web = "^0.5" diff --git a/json/README.md b/json/README.md new file mode 100644 index 0000000..167c390 --- /dev/null +++ b/json/README.md @@ -0,0 +1,48 @@ +# json + +Json's `Getting Started` guide using json (serde-json or json-rust) for Actix web + +## Usage + +### server + +```bash +cd actix-web/examples/json +cargo run +# Started http server: 127.0.0.1:8080 +``` + +### web client + +With [Postman](https://www.getpostman.com/) or [Rested](moz-extension://60daeb1c-5b1b-4afd-9842-0579ed34dfcb/dist/index.html) + +- POST / (embed serde-json): + + - method : ``POST`` + - url : ``http://127.0.0.1:8080/`` + - header : ``Content-Type`` = ``application/json`` + - body (raw) : ``{"name": "Test user", "number": 100}`` + +- POST /manual (manual serde-json): + + - method : ``POST`` + - url : ``http://127.0.0.1:8080/manual`` + - header : ``Content-Type`` = ``application/json`` + - body (raw) : ``{"name": "Test user", "number": 100}`` + +- POST /mjsonrust (manual json-rust): + + - method : ``POST`` + - url : ``http://127.0.0.1:8080/mjsonrust`` + - header : ``Content-Type`` = ``application/json`` + - body (raw) : ``{"name": "Test user", "number": 100}`` (you can also test ``{notjson}``) + +### python client + +- ``pip install aiohttp`` +- ``python client.py`` + +if ubuntu : + +- ``pip3 install aiohttp`` +- ``python3 client.py`` diff --git a/json/client.py b/json/client.py new file mode 100644 index 0000000..e89ffe0 --- /dev/null +++ b/json/client.py @@ -0,0 +1,18 @@ +# This script could be used for actix-web multipart example test +# just start server and run client.py + +import json +import asyncio +import aiohttp + +async def req(): + resp = await aiohttp.ClientSession().request( + "post", 'http://localhost:8080/', + data=json.dumps({"name": "Test user", "number": 100}), + headers={"content-type": "application/json"}) + print(str(resp)) + print(await resp.text()) + assert 200 == resp.status + + +asyncio.get_event_loop().run_until_complete(req()) diff --git a/json/src/main.rs b/json/src/main.rs new file mode 100644 index 0000000..f864e00 --- /dev/null +++ b/json/src/main.rs @@ -0,0 +1,110 @@ +extern crate actix; +extern crate actix_web; +extern crate bytes; +extern crate futures; +extern crate env_logger; +extern crate serde_json; +#[macro_use] extern crate serde_derive; +#[macro_use] extern crate json; + +use actix_web::{ + middleware, http, error, server, + App, AsyncResponder, HttpRequest, HttpResponse, HttpMessage, Error, Json}; + +use bytes::BytesMut; +use futures::{Future, Stream}; +use json::JsonValue; + +#[derive(Debug, Serialize, Deserialize)] +struct MyObj { + name: String, + number: i32, +} + +/// This handler uses `HttpRequest::json()` for loading json object. +fn index(req: HttpRequest) -> Box> { + req.json() + .from_err() // convert all errors into `Error` + .and_then(|val: MyObj| { + println!("model: {:?}", val); + Ok(HttpResponse::Ok().json(val)) // <- send response + }) + .responder() +} + +/// This handler uses json extractor +fn extract_item(item: Json) -> HttpResponse { + println!("model: {:?}", &item); + HttpResponse::Ok().json(item.0) // <- send response +} + +const MAX_SIZE: usize = 262_144; // max payload size is 256k + +/// This handler manually load request payload and parse json object +fn index_manual(req: HttpRequest) -> Box> { + // HttpRequest is stream of Bytes objects + req + // `Future::from_err` acts like `?` in that it coerces the error type from + // the future into the final error type + .from_err() + + // `fold` will asynchronously read each chunk of the request body and + // call supplied closure, then it resolves to result of closure + .fold(BytesMut::new(), move |mut body, chunk| { + // limit max size of in-memory payload + if (body.len() + chunk.len()) > MAX_SIZE { + Err(error::ErrorBadRequest("overflow")) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + // `Future::and_then` can be used to merge an asynchronous workflow with a + // synchronous workflow + .and_then(|body| { + // body is loaded, now we can deserialize serde-json + let obj = serde_json::from_slice::(&body)?; + Ok(HttpResponse::Ok().json(obj)) // <- send response + }) + .responder() +} + +/// This handler manually load request payload and parse json-rust +fn index_mjsonrust(req: HttpRequest) -> Box> { + req.concat2() + .from_err() + .and_then(|body| { + // body is loaded, now we can deserialize json-rust + let result = json::parse(std::str::from_utf8(&body).unwrap()); // return Result + let injson: JsonValue = match result { Ok(v) => v, Err(e) => object!{"err" => e.to_string() } }; + Ok(HttpResponse::Ok() + .content_type("application/json") + .body(injson.dump())) + }) + .responder() +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("json-example"); + + server::new(|| { + App::new() + // enable logger + .middleware(middleware::Logger::default()) + .resource("/extractor", |r| { + r.method(http::Method::POST) + .with(extract_item) + .limit(4096); // <- limit size of the payload + }) + .resource("/manual", |r| r.method(http::Method::POST).f(index_manual)) + .resource("/mjsonrust", |r| r.method(http::Method::POST).f(index_mjsonrust)) + .resource("/", |r| r.method(http::Method::POST).f(index))}) + .bind("127.0.0.1:8080").unwrap() + .shutdown_timeout(1) + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/juniper/Cargo.toml b/juniper/Cargo.toml new file mode 100644 index 0000000..8c0ec36 --- /dev/null +++ b/juniper/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "juniper-example" +version = "0.1.0" +authors = ["pyros2097 "] + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" + +futures = "0.1" +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +juniper = "0.9.2" diff --git a/juniper/README.md b/juniper/README.md new file mode 100644 index 0000000..2ac0eac --- /dev/null +++ b/juniper/README.md @@ -0,0 +1,15 @@ +# juniper + +Juniper integration for Actix web + +### server + +```bash +cd actix-web/examples/juniper +cargo run (or ``cargo watch -x run``) +# Started http server: 127.0.0.1:8080 +``` + +### web client + +[http://127.0.0.1:8080/graphiql](http://127.0.0.1:8080/graphiql) diff --git a/juniper/src/main.rs b/juniper/src/main.rs new file mode 100644 index 0000000..a92ce3f --- /dev/null +++ b/juniper/src/main.rs @@ -0,0 +1,108 @@ +//! Actix web juniper example +//! +//! A simple example integrating juniper in actix-web +extern crate serde; +extern crate serde_json; +#[macro_use] +extern crate serde_derive; +#[macro_use] +extern crate juniper; +extern crate futures; +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use actix::prelude::*; +use actix_web::{ + middleware, http, server, + App, AsyncResponder, HttpRequest, HttpResponse, FutureResponse, Error, State, Json}; +use juniper::http::graphiql::graphiql_source; +use juniper::http::GraphQLRequest; +use futures::future::Future; + +mod schema; + +use schema::Schema; +use schema::create_schema; + +struct AppState { + executor: Addr, +} + +#[derive(Serialize, Deserialize)] +pub struct GraphQLData(GraphQLRequest); + +impl Message for GraphQLData { + type Result = Result; +} + +pub struct GraphQLExecutor { + schema: std::sync::Arc +} + +impl GraphQLExecutor { + fn new(schema: std::sync::Arc) -> GraphQLExecutor { + GraphQLExecutor { + schema: schema, + } + } +} + +impl Actor for GraphQLExecutor { + type Context = SyncContext; +} + +impl Handler for GraphQLExecutor { + type Result = Result; + + fn handle(&mut self, msg: GraphQLData, _: &mut Self::Context) -> Self::Result { + let res = msg.0.execute(&self.schema, &()); + let res_text = serde_json::to_string(&res)?; + Ok(res_text) + } +} + +fn graphiql(_req: HttpRequest) -> Result { + let html = graphiql_source("http://127.0.0.1:8080/graphql"); + Ok(HttpResponse::Ok() + .content_type("text/html; charset=utf-8") + .body(html)) +} + +fn graphql(st: State, data: Json) -> FutureResponse { + st.executor.send(data.0) + .from_err() + .and_then(|res| { + match res { + Ok(user) => Ok(HttpResponse::Ok() + .content_type("application/json") + .body(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()) + } + }) + .responder() +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("juniper-example"); + + let schema = std::sync::Arc::new(create_schema()); + let addr = SyncArbiter::start(3, move || { + GraphQLExecutor::new(schema.clone()) + }); + + // Start http server + server::new(move || { + App::with_state(AppState{executor: addr.clone()}) + // enable logger + .middleware(middleware::Logger::default()) + .resource("/graphql", |r| r.method(http::Method::POST).with2(graphql)) + .resource("/graphiql", |r| r.method(http::Method::GET).h(graphiql))}) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/juniper/src/schema.rs b/juniper/src/schema.rs new file mode 100644 index 0000000..2b4cf30 --- /dev/null +++ b/juniper/src/schema.rs @@ -0,0 +1,58 @@ +use juniper::FieldResult; +use juniper::RootNode; + +#[derive(GraphQLEnum)] +enum Episode { + NewHope, + Empire, + Jedi, +} + +#[derive(GraphQLObject)] +#[graphql(description = "A humanoid creature in the Star Wars universe")] +struct Human { + id: String, + name: String, + appears_in: Vec, + home_planet: String, +} + +#[derive(GraphQLInputObject)] +#[graphql(description = "A humanoid creature in the Star Wars universe")] +struct NewHuman { + name: String, + appears_in: Vec, + home_planet: String, +} + +pub struct QueryRoot; + +graphql_object!(QueryRoot: () |&self| { + field human(&executor, id: String) -> FieldResult { + Ok(Human{ + id: "1234".to_owned(), + name: "Luke".to_owned(), + appears_in: vec![Episode::NewHope], + home_planet: "Mars".to_owned(), + }) + } +}); + +pub struct MutationRoot; + +graphql_object!(MutationRoot: () |&self| { + field createHuman(&executor, new_human: NewHuman) -> FieldResult { + Ok(Human{ + id: "1234".to_owned(), + name: new_human.name, + appears_in: new_human.appears_in, + home_planet: new_human.home_planet, + }) + } +}); + +pub type Schema = RootNode<'static, QueryRoot, MutationRoot>; + +pub fn create_schema() -> Schema { + Schema::new(QueryRoot {}, MutationRoot {}) +} diff --git a/multipart/Cargo.toml b/multipart/Cargo.toml new file mode 100644 index 0000000..233fbe4 --- /dev/null +++ b/multipart/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "multipart-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[[bin]] +name = "multipart" +path = "src/main.rs" + +[dependencies] +env_logger = "*" +futures = "0.1" +actix = "0.5" +actix-web = "^0.5" diff --git a/multipart/README.md b/multipart/README.md new file mode 100644 index 0000000..348d286 --- /dev/null +++ b/multipart/README.md @@ -0,0 +1,24 @@ +# multipart + +Multipart's `Getting Started` guide for Actix web + +## Usage + +### server + +```bash +cd actix-web/examples/multipart +cargo run (or ``cargo watch -x run``) +# Started http server: 127.0.0.1:8080 +``` + +### client + +- ``pip install aiohttp`` +- ``python client.py`` +- you must see in server console multipart fields + +if ubuntu : + +- ``pip3 install aiohttp`` +- ``python3 client.py`` diff --git a/multipart/client.py b/multipart/client.py new file mode 100644 index 0000000..afc07f1 --- /dev/null +++ b/multipart/client.py @@ -0,0 +1,34 @@ +# This script could be used for actix-web multipart example test +# just start server and run client.py + +import asyncio +import aiohttp + +async def req1(): + with aiohttp.MultipartWriter() as writer: + writer.append('test') + writer.append_json({'passed': True}) + + resp = await aiohttp.ClientSession().request( + "post", 'http://localhost:8080/multipart', + data=writer, headers=writer.headers) + print(resp) + assert 200 == resp.status + + +async def req2(): + with aiohttp.MultipartWriter() as writer: + writer.append('test') + writer.append_json({'passed': True}) + writer.append(open('src/main.rs')) + + resp = await aiohttp.ClientSession().request( + "post", 'http://localhost:8080/multipart', + data=writer, headers=writer.headers) + print(resp) + assert 200 == resp.status + + +loop = asyncio.get_event_loop() +loop.run_until_complete(req1()) +loop.run_until_complete(req2()) diff --git a/multipart/src/main.rs b/multipart/src/main.rs new file mode 100644 index 0000000..75f2896 --- /dev/null +++ b/multipart/src/main.rs @@ -0,0 +1,61 @@ +#![allow(unused_variables)] +extern crate actix; +extern crate actix_web; +extern crate env_logger; +extern crate futures; + +use actix::*; +use actix_web::{ + http, middleware, multipart, server, + App, AsyncResponder, HttpRequest, HttpResponse, HttpMessage, Error}; + +use futures::{Future, Stream}; +use futures::future::{result, Either}; + + +fn index(req: HttpRequest) -> Box> +{ + println!("{:?}", req); + + req.multipart() // <- get multipart stream for current request + .from_err() // <- convert multipart errors + .and_then(|item| { // <- iterate over multipart items + match item { + // Handle multipart Field + multipart::MultipartItem::Field(field) => { + println!("==== FIELD ==== {:?}", field); + + // Field in turn is stream of *Bytes* object + Either::A( + field.map_err(Error::from) + .map(|chunk| { + println!("-- CHUNK: \n{}", + std::str::from_utf8(&chunk).unwrap());}) + .finish()) + }, + multipart::MultipartItem::Nested(mp) => { + // Or item could be nested Multipart stream + Either::B(result(Ok(()))) + } + } + }) + .finish() // <- Stream::finish() combinator from actix + .map(|_| HttpResponse::Ok().into()) + .responder() +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + let _ = env_logger::init(); + let sys = actix::System::new("multipart-example"); + + server::new( + || App::new() + .middleware(middleware::Logger::default()) // <- logger + .resource("/multipart", |r| r.method(http::Method::POST).a(index))) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Starting http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/protobuf/Cargo.toml b/protobuf/Cargo.toml new file mode 100644 index 0000000..0e95ea7 --- /dev/null +++ b/protobuf/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "protobuf-example" +version = "0.1.0" +authors = ["kingxsp "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +failure = "0.1" +env_logger = "*" + +prost = "0.2.0" +prost-derive = "0.2.0" + +actix = "0.5" +actix-web = "^0.5" diff --git a/protobuf/client.py b/protobuf/client.py new file mode 100644 index 0000000..ab91365 --- /dev/null +++ b/protobuf/client.py @@ -0,0 +1,66 @@ +# just start server and run client.py + +# wget https://github.com/google/protobuf/releases/download/v3.5.1/protobuf-python-3.5.1.zip +# unzip protobuf-python-3.5.1.zip.1 +# cd protobuf-3.5.1/python/ +# python3.6 setup.py install + +# pip3.6 install --upgrade pip +# pip3.6 install aiohttp + +#!/usr/bin/env python +import test_pb2 +import traceback +import sys + +import asyncio +import aiohttp + +def op(): + try: + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + + #Serialize + sendDataStr = obj.SerializeToString() + #print serialized string value + print('serialized string:', sendDataStr) + #------------------------# + # message transmission # + #------------------------# + receiveDataStr = sendDataStr + receiveData = test_pb2.MyObj() + + #Deserialize + receiveData.ParseFromString(receiveDataStr) + print('pares serialize string, return: devId = ', receiveData.number, ', name = ', receiveData.name) + except(Exception, e): + print(Exception, ':', e) + print(traceback.print_exc()) + errInfo = sys.exc_info() + print(errInfo[0], ':', errInfo[1]) + + +async def fetch(session): + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + async with session.post('http://localhost:8080/', data=obj.SerializeToString(), + headers={"content-type": "application/protobuf"}) as resp: + print(resp.status) + data = await resp.read() + receiveObj = test_pb2.MyObj() + receiveObj.ParseFromString(data) + print(receiveObj) + +async def go(loop): + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + async with aiohttp.ClientSession(loop=loop) as session: + await fetch(session) + +loop = asyncio.get_event_loop() +loop.run_until_complete(go(loop)) +loop.close() \ No newline at end of file diff --git a/protobuf/src/main.rs b/protobuf/src/main.rs new file mode 100644 index 0000000..c0a2abb --- /dev/null +++ b/protobuf/src/main.rs @@ -0,0 +1,57 @@ +extern crate actix; +extern crate actix_web; +extern crate bytes; +extern crate futures; +#[macro_use] +extern crate failure; +extern crate env_logger; +extern crate prost; +#[macro_use] +extern crate prost_derive; + +use futures::Future; +use actix_web::{ + http, middleware, server, + App, AsyncResponder, HttpRequest, HttpResponse, Error}; + +mod protobuf; +use protobuf::ProtoBufResponseBuilder; + + +#[derive(Clone, Debug, PartialEq, Message)] +pub struct MyObj { + #[prost(int32, tag="1")] + pub number: i32, + #[prost(string, tag="2")] + pub name: String, +} + + +/// This handler uses `ProtoBufMessage` for loading protobuf object. +fn index(req: HttpRequest) -> Box> { + protobuf::ProtoBufMessage::new(req) + .from_err() // convert all errors into `Error` + .and_then(|val: MyObj| { + println!("model: {:?}", val); + Ok(HttpResponse::Ok().protobuf(val)?) // <- send response + }) + .responder() +} + + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("protobuf-example"); + + server::new(|| { + App::new() + .middleware(middleware::Logger::default()) + .resource("/", |r| r.method(http::Method::POST).f(index))}) + .bind("127.0.0.1:8080").unwrap() + .shutdown_timeout(1) + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/protobuf/src/protobuf.rs b/protobuf/src/protobuf.rs new file mode 100644 index 0000000..2b117fe --- /dev/null +++ b/protobuf/src/protobuf.rs @@ -0,0 +1,168 @@ +use bytes::{Bytes, BytesMut}; +use futures::{Poll, Future, Stream}; + +use bytes::IntoBuf; +use prost::Message; +use prost::DecodeError as ProtoBufDecodeError; +use prost::EncodeError as ProtoBufEncodeError; + +use actix_web::http::header::{CONTENT_TYPE, CONTENT_LENGTH}; +use actix_web::{Responder, HttpMessage, HttpRequest, HttpResponse}; +use actix_web::dev::HttpResponseBuilder; +use actix_web::error::{Error, PayloadError, ResponseError}; + + +#[derive(Fail, Debug)] +pub enum ProtoBufPayloadError { + /// Payload size is bigger than 256k + #[fail(display="Payload size is bigger than 256k")] + Overflow, + /// Content type error + #[fail(display="Content type error")] + ContentType, + /// Serialize error + #[fail(display="ProtoBud serialize error: {}", _0)] + Serialize(#[cause] ProtoBufEncodeError), + /// Deserialize error + #[fail(display="ProtoBud deserialize error: {}", _0)] + Deserialize(#[cause] ProtoBufDecodeError), + /// Payload error + #[fail(display="Error that occur during reading payload: {}", _0)] + Payload(#[cause] PayloadError), +} + +impl ResponseError for ProtoBufPayloadError { + + fn error_response(&self) -> HttpResponse { + match *self { + ProtoBufPayloadError::Overflow => HttpResponse::PayloadTooLarge().into(), + _ => HttpResponse::BadRequest().into(), + } + } +} + +impl From for ProtoBufPayloadError { + fn from(err: PayloadError) -> ProtoBufPayloadError { + ProtoBufPayloadError::Payload(err) + } +} + +impl From for ProtoBufPayloadError { + fn from(err: ProtoBufDecodeError) -> ProtoBufPayloadError { + ProtoBufPayloadError::Deserialize(err) + } +} + +#[derive(Debug)] +pub struct ProtoBuf(pub T); + +impl Responder for ProtoBuf { + type Item = HttpResponse; + type Error = Error; + + fn respond_to(self, _: HttpRequest) -> Result { + let mut buf = Vec::new(); + self.0.encode(&mut buf) + .map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e))) + .and_then(|()| { + Ok(HttpResponse::Ok() + .content_type("application/protobuf") + .body(buf) + .into()) + }) + } +} + +pub struct ProtoBufMessage{ + limit: usize, + ct: &'static str, + req: Option, + fut: Option>>, +} + +impl ProtoBufMessage { + + /// Create `ProtoBufMessage` for request. + pub fn new(req: T) -> Self { + ProtoBufMessage{ + limit: 262_144, + req: Some(req), + fut: None, + ct: "application/protobuf", + } + } + + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + /// Set allowed content type. + /// + /// By default *application/protobuf* content type is used. Set content type + /// to empty string if you want to disable content type check. + pub fn content_type(mut self, ct: &'static str) -> Self { + self.ct = ct; + self + } +} + +impl Future for ProtoBufMessage + where T: HttpMessage + Stream + 'static +{ + type Item = U; + type Error = ProtoBufPayloadError; + + fn poll(&mut self) -> Poll { + if let Some(req) = self.req.take() { + if let Some(len) = req.headers().get(CONTENT_LENGTH) { + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + if len > self.limit { + return Err(ProtoBufPayloadError::Overflow); + } + } else { + return Err(ProtoBufPayloadError::Overflow); + } + } + } + // check content-type + if !self.ct.is_empty() && req.content_type() != self.ct { + return Err(ProtoBufPayloadError::ContentType) + } + + let limit = self.limit; + let fut = req.from_err() + .fold(BytesMut::new(), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(ProtoBufPayloadError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .and_then(|body| Ok(::decode(&mut body.into_buf())?)); + self.fut = Some(Box::new(fut)); + } + + self.fut.as_mut().expect("ProtoBufBody could not be used second time").poll() + } +} + + +pub trait ProtoBufResponseBuilder { + + fn protobuf(&mut self, value: T) -> Result; +} + +impl ProtoBufResponseBuilder for HttpResponseBuilder { + + fn protobuf(&mut self, value: T) -> Result { + self.header(CONTENT_TYPE, "application/protobuf"); + + let mut body = Vec::new(); + value.encode(&mut body).map_err(|e| ProtoBufPayloadError::Serialize(e))?; + Ok(self.body(body)) + } +} diff --git a/protobuf/test.proto b/protobuf/test.proto new file mode 100644 index 0000000..8ec278c --- /dev/null +++ b/protobuf/test.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message MyObj { + int32 number = 1; + string name = 2; +} \ No newline at end of file diff --git a/protobuf/test_pb2.py b/protobuf/test_pb2.py new file mode 100644 index 0000000..05e71f3 --- /dev/null +++ b/protobuf/test_pb2.py @@ -0,0 +1,76 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test.proto', + package='', + syntax='proto3', + serialized_pb=_b('\n\ntest.proto\"%\n\x05MyObj\x12\x0e\n\x06number\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\tb\x06proto3') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_MYOBJ = _descriptor.Descriptor( + name='MyObj', + full_name='MyObj', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='number', full_name='MyObj.number', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='name', full_name='MyObj.name', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=14, + serialized_end=51, +) + +DESCRIPTOR.message_types_by_name['MyObj'] = _MYOBJ + +MyObj = _reflection.GeneratedProtocolMessageType('MyObj', (_message.Message,), dict( + DESCRIPTOR = _MYOBJ, + __module__ = 'test_pb2' + # @@protoc_insertion_point(class_scope:MyObj) + )) +_sym_db.RegisterMessage(MyObj) + + +# @@protoc_insertion_point(module_scope) diff --git a/r2d2/Cargo.toml b/r2d2/Cargo.toml new file mode 100644 index 0000000..0746db7 --- /dev/null +++ b/r2d2/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "r2d2-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" + +futures = "0.1" +uuid = { version = "0.5", features = ["serde", "v4"] } +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +r2d2 = "*" +r2d2_sqlite = "*" +rusqlite = "*" diff --git a/r2d2/src/db.rs b/r2d2/src/db.rs new file mode 100644 index 0000000..6e2ddc0 --- /dev/null +++ b/r2d2/src/db.rs @@ -0,0 +1,41 @@ +//! Db executor actor +use std::io; +use uuid; +use actix_web::*; +use actix::prelude::*; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; + + +/// This is db executor actor. We are going to run 3 of them in parallel. +pub struct DbExecutor(pub Pool); + +/// This is only message that this actor can handle, but it is easy to extend number of +/// messages. +pub struct CreateUser { + pub name: String, +} + +impl Message for CreateUser { + type Result = Result; +} + +impl Actor for DbExecutor { + type Context = SyncContext; +} + +impl Handler for DbExecutor { + type Result = Result; + + fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result { + let conn = self.0.get().unwrap(); + + let uuid = format!("{}", uuid::Uuid::new_v4()); + conn.execute("INSERT INTO users (id, name) VALUES ($1, $2)", + &[&uuid, &msg.name]).unwrap(); + + Ok(conn.query_row("SELECT name FROM users WHERE id=$1", &[&uuid], |row| { + row.get(0) + }).map_err(|_| io::Error::new(io::ErrorKind::Other, "db error"))?) + } +} diff --git a/r2d2/src/main.rs b/r2d2/src/main.rs new file mode 100644 index 0000000..5e6d07f --- /dev/null +++ b/r2d2/src/main.rs @@ -0,0 +1,65 @@ +//! Actix web r2d2 example +extern crate serde; +extern crate serde_json; +extern crate uuid; +extern crate futures; +extern crate actix; +extern crate actix_web; +extern crate env_logger; +extern crate r2d2; +extern crate r2d2_sqlite; +extern crate rusqlite; + +use actix::prelude::*; +use actix_web::{ + middleware, http, server, App, AsyncResponder, HttpRequest, HttpResponse, Error}; +use futures::future::Future; +use r2d2_sqlite::SqliteConnectionManager; + +mod db; +use db::{CreateUser, DbExecutor}; + + +/// State with DbExecutor address +struct State { + db: Addr, +} + +/// Async request handler +fn index(req: HttpRequest) -> Box> { + let name = &req.match_info()["name"]; + + req.state().db.send(CreateUser{name: name.to_owned()}) + .from_err() + .and_then(|res| { + match res { + Ok(user) => Ok(HttpResponse::Ok().json(user)), + Err(_) => Ok(HttpResponse::InternalServerError().into()) + } + }) + .responder() +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=debug"); + env_logger::init(); + let sys = actix::System::new("r2d2-example"); + + // r2d2 pool + let manager = SqliteConnectionManager::file("test.db"); + let pool = r2d2::Pool::new(manager).unwrap(); + + // Start db executor actors + let addr = SyncArbiter::start(3, move || DbExecutor(pool.clone())); + + // Start http server + server::new(move || { + App::with_state(State{db: addr.clone()}) + // enable logger + .middleware(middleware::Logger::default()) + .resource("/{name}", |r| r.method(http::Method::GET).a(index))}) + .bind("127.0.0.1:8080").unwrap() + .start(); + + let _ = sys.run(); +} diff --git a/r2d2/test.db b/r2d2/test.db new file mode 100644 index 0000000..3ea0c83 Binary files /dev/null and b/r2d2/test.db differ diff --git a/redis-session/Cargo.toml b/redis-session/Cargo.toml new file mode 100644 index 0000000..bec8b8a --- /dev/null +++ b/redis-session/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "redis-session" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" +actix-redis = { version = "0.3", features = ["web"] } diff --git a/redis-session/src/main.rs b/redis-session/src/main.rs new file mode 100644 index 0000000..f61496f --- /dev/null +++ b/redis-session/src/main.rs @@ -0,0 +1,48 @@ +#![allow(unused_variables)] + +extern crate actix; +extern crate actix_web; +extern crate actix_redis; +extern crate env_logger; + +use actix_web::{server, App, HttpRequest, HttpResponse, Result}; +use actix_web::middleware::{Logger, SessionStorage, RequestSession}; +use actix_redis::RedisSessionBackend; + + +/// simple handler +fn index(mut req: HttpRequest) -> Result { + println!("{:?}", req); + + // session + if let Some(count) = req.session().get::("counter")? { + println!("SESSION value: {}", count); + req.session().set("counter", count+1)?; + } else { + req.session().set("counter", 1)?; + } + + Ok("Welcome!".into()) +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); + env_logger::init(); + let sys = actix::System::new("basic-example"); + + server::new( + || App::new() + // enable logger + .middleware(Logger::default()) + // cookie session middleware + .middleware(SessionStorage::new( + RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) + )) + // register simple route, handle all methods + .resource("/", |r| r.f(index))) + .bind("0.0.0.0:8080").unwrap() + .threads(1) + .start(); + + let _ = sys.run(); +} diff --git a/state/Cargo.toml b/state/Cargo.toml new file mode 100644 index 0000000..ef95d99 --- /dev/null +++ b/state/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "state" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +futures = "0.1" +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" diff --git a/state/README.md b/state/README.md new file mode 100644 index 0000000..127ed2a --- /dev/null +++ b/state/README.md @@ -0,0 +1,15 @@ +# state + +## Usage + +### server + +```bash +cd actix-web/examples/state +cargo run +# Started http server: 127.0.0.1:8080 +``` + +### web client + +- [http://localhost:8080/](http://localhost:8080/) diff --git a/state/src/main.rs b/state/src/main.rs new file mode 100644 index 0000000..804b68c --- /dev/null +++ b/state/src/main.rs @@ -0,0 +1,77 @@ +#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))] +//! There are two level of statefulness in actix-web. Application has state +//! that is shared across all handlers within same Application. +//! And individual handler can have state. + +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use std::cell::Cell; + +use actix::prelude::*; +use actix_web::{ + http, server, ws, middleware, App, HttpRequest, HttpResponse}; + +/// Application state +struct AppState { + counter: Cell, +} + +/// simple handle +fn index(req: HttpRequest) -> HttpResponse { + println!("{:?}", req); + req.state().counter.set(req.state().counter.get() + 1); + + HttpResponse::Ok().body(format!("Num of requests: {}", req.state().counter.get())) +} + +/// `MyWebSocket` counts how many messages it receives from peer, +/// websocket-client.py could be used for tests +struct MyWebSocket { + counter: usize, +} + +impl Actor for MyWebSocket { + type Context = ws::WebsocketContext; +} + +impl StreamHandler for MyWebSocket { + + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + self.counter += 1; + println!("WS({}): {:?}", self.counter, msg); + match msg { + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Text(text) => ctx.text(text), + ws::Message::Binary(bin) => ctx.binary(bin), + ws::Message::Close(_) => { + ctx.stop(); + } + _ => (), + } + } +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("ws-example"); + + server::new( + || App::with_state(AppState{counter: Cell::new(0)}) + // enable logger + .middleware(middleware::Logger::default()) + // websocket route + .resource( + "/ws/", |r| + r.method(http::Method::GET).f( + |req| ws::start(req, MyWebSocket{counter: 0}))) + // register simple handler, handle all methods + .resource("/", |r| r.f(index))) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/static/actixLogo.png b/static/actixLogo.png new file mode 100644 index 0000000..1e2509a Binary files /dev/null and b/static/actixLogo.png differ diff --git a/static/favicon.ico b/static/favicon.ico new file mode 100644 index 0000000..03018db Binary files /dev/null and b/static/favicon.ico differ diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..e59e13f --- /dev/null +++ b/static/index.html @@ -0,0 +1,90 @@ + + + + + + + + +

Chat!

+
+  | Status: + disconnected +
+
+
+
+ + +
+ + diff --git a/template_tera/Cargo.toml b/template_tera/Cargo.toml new file mode 100644 index 0000000..5b6b032 --- /dev/null +++ b/template_tera/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "template-tera" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" +tera = "*" diff --git a/template_tera/README.md b/template_tera/README.md new file mode 100644 index 0000000..3582959 --- /dev/null +++ b/template_tera/README.md @@ -0,0 +1,17 @@ +# template_tera + +Minimal example of using the template [tera](https://github.com/Keats/tera) that displays a form. + +## Usage + +### server + +```bash +cd actix-web/examples/template_tera +cargo run (or ``cargo watch -x run``) +# Started http server: 127.0.0.1:8080 +``` + +### web client + +- [http://localhost:8080](http://localhost:8080) diff --git a/template_tera/src/main.rs b/template_tera/src/main.rs new file mode 100644 index 0000000..e1a738d --- /dev/null +++ b/template_tera/src/main.rs @@ -0,0 +1,48 @@ +extern crate actix; +extern crate actix_web; +extern crate env_logger; +#[macro_use] +extern crate tera; + +use actix_web::{ + http, error, middleware, server, App, HttpRequest, HttpResponse, Error}; + + +struct State { + template: tera::Tera, // <- store tera template in application state +} + +fn index(req: HttpRequest) -> Result { + let s = if let Some(name) = req.query().get("name") { // <- submitted form + let mut ctx = tera::Context::new(); + ctx.add("name", &name.to_owned()); + ctx.add("text", &"Welcome!".to_owned()); + req.state().template.render("user.html", &ctx) + .map_err(|_| error::ErrorInternalServerError("Template error"))? + } else { + req.state().template.render("index.html", &tera::Context::new()) + .map_err(|_| error::ErrorInternalServerError("Template error"))? + }; + Ok(HttpResponse::Ok() + .content_type("text/html") + .body(s)) +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("tera-example"); + + server::new(|| { + let tera = compile_templates!(concat!(env!("CARGO_MANIFEST_DIR"), "/templates/**/*")); + + App::with_state(State{template: tera}) + // enable logger + .middleware(middleware::Logger::default()) + .resource("/", |r| r.method(http::Method::GET).f(index))}) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/template_tera/templates/index.html b/template_tera/templates/index.html new file mode 100644 index 0000000..d8a47bc --- /dev/null +++ b/template_tera/templates/index.html @@ -0,0 +1,17 @@ + + + + + Actix web + + +

Welcome!

+

+

What is your name?

+
+
+

+
+

+ + diff --git a/template_tera/templates/user.html b/template_tera/templates/user.html new file mode 100644 index 0000000..cb53289 --- /dev/null +++ b/template_tera/templates/user.html @@ -0,0 +1,13 @@ + + + + + Actix web + + +

Hi, {{ name }}!

+

+ {{ text }} +

+ + diff --git a/tls/Cargo.toml b/tls/Cargo.toml new file mode 100644 index 0000000..8e2ab66 --- /dev/null +++ b/tls/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tls-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[[bin]] +name = "server" +path = "src/main.rs" + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = { version = "^0.5", features=["alpn"] } +openssl = { version="0.10" } diff --git a/tls/README.md b/tls/README.md new file mode 100644 index 0000000..1bc9ba3 --- /dev/null +++ b/tls/README.md @@ -0,0 +1,16 @@ +# tls example + +## Usage + +### server + +```bash +cd actix-web/examples/tls +cargo run (or ``cargo watch -x run``) +# Started http server: 127.0.0.1:8443 +``` + +### web client + +- curl: ``curl -v https://127.0.0.1:8443/index.html --compress -k`` +- browser: [https://127.0.0.1:8443/index.html](https://127.0.0.1:8080/index.html) diff --git a/tls/cert.pem b/tls/cert.pem new file mode 100644 index 0000000..159aace --- /dev/null +++ b/tls/cert.pem @@ -0,0 +1,31 @@ +-----BEGIN CERTIFICATE----- +MIIFPjCCAyYCCQDvLYiYD+jqeTANBgkqhkiG9w0BAQsFADBhMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0ExCzAJBgNVBAcMAlNGMRAwDgYDVQQKDAdDb21wYW55MQww +CgYDVQQLDANPcmcxGDAWBgNVBAMMD3d3dy5leGFtcGxlLmNvbTAeFw0xODAxMjUx +NzQ2MDFaFw0xOTAxMjUxNzQ2MDFaMGExCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJD +QTELMAkGA1UEBwwCU0YxEDAOBgNVBAoMB0NvbXBhbnkxDDAKBgNVBAsMA09yZzEY +MBYGA1UEAwwPd3d3LmV4YW1wbGUuY29tMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A +MIICCgKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEPn8k1 +sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+MIK5U +NLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM54jXy +voLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZWLWr +odGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAkoqND +xdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNliJDmA +CRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6/stI +yFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuDYX2U +UuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nPwPTO +vRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA69un +CEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEAATAN +BgkqhkiG9w0BAQsFAAOCAgEApavsgsn7SpPHfhDSN5iZs1ILZQRewJg0Bty0xPfk +3tynSW6bNH3nSaKbpsdmxxomthNSQgD2heOq1By9YzeOoNR+7Pk3s4FkASnf3ToI +JNTUasBFFfaCG96s4Yvs8KiWS/k84yaWuU8c3Wb1jXs5Rv1qE1Uvuwat1DSGXSoD +JNluuIkCsC4kWkyq5pWCGQrabWPRTWsHwC3PTcwSRBaFgYLJaR72SloHB1ot02zL +d2age9dmFRFLLCBzP+D7RojBvL37qS/HR+rQ4SoQwiVc/JzaeqSe7ZbvEH9sZYEu +ALowJzgbwro7oZflwTWunSeSGDSltkqKjvWvZI61pwfHKDahUTmZ5h2y67FuGEaC +CIOUI8dSVSPKITxaq3JL4ze2e9/0Lt7hj19YK2uUmtMAW5Tirz4Yx5lyGH9U8Wur +y/X8VPxTc4A9TMlJgkyz0hqvhbPOT/zSWB10zXh0glKAsSBryAOEDxV1UygmSir7 +YV8Qaq+oyKUTMc1MFq5vZ07M51EPaietn85t8V2Y+k/8XYltRp32NxsypxAJuyxh +g/ko6RVTrWa1sMvz/F9LFqAdKiK5eM96lh9IU4xiLg4ob8aS/GRAA8oIFkZFhLrt +tOwjIUPmEPyHWFi8dLpNuQKYalLYhuwZftG/9xV+wqhKGZO9iPrpHSYBRTap8w2y +1QU= +-----END CERTIFICATE----- diff --git a/tls/key.pem b/tls/key.pem new file mode 100644 index 0000000..aac387c --- /dev/null +++ b/tls/key.pem @@ -0,0 +1,51 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIJKAIBAAKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEP +n8k1sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+M +IK5UNLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM5 +4jXyvoLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZ +WLWrodGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAk +oqNDxdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNli +JDmACRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6 +/stIyFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuD +YX2UUuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nP +wPTOvRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA +69unCEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEA +AQKCAgAME3aoeXNCPxMrSri7u4Xnnk71YXl0Tm9vwvjRQlMusXZggP8VKN/KjP0/ +9AE/GhmoxqPLrLCZ9ZE1EIjgmZ9Xgde9+C8rTtfCG2RFUL7/5J2p6NonlocmxoJm +YkxYwjP6ce86RTjQWL3RF3s09u0inz9/efJk5O7M6bOWMQ9VZXDlBiRY5BYvbqUR +6FeSzD4MnMbdyMRoVBeXE88gTvZk8xhB6DJnLzYgc0tKiRoeKT0iYv5JZw25VyRM +ycLzfTrFmXCPfB1ylb483d9Ly4fBlM8nkx37PzEnAuukIawDxsPOb9yZC+hfvNJI +7NFiMN+3maEqG2iC00w4Lep4skHY7eHUEUMl+Wjr+koAy2YGLWAwHZQTm7iXn9Ab +L6adL53zyCKelRuEQOzbeosJAqS+5fpMK0ekXyoFIuskj7bWuIoCX7K/kg6q5IW+ +vC2FrlsrbQ79GztWLVmHFO1I4J9M5r666YS0qdh8c+2yyRl4FmSiHfGxb3eOKpxQ +b6uI97iZlkxPF9LYUCSc7wq0V2gGz+6LnGvTHlHrOfVXqw/5pLAKhXqxvnroDTwz +0Ay/xFF6ei/NSxBY5t8ztGCBm45wCU3l8pW0X6dXqwUipw5b4MRy1VFRu6rqlmbL +OPSCuLxqyqsigiEYsBgS/icvXz9DWmCQMPd2XM9YhsHvUq+R4QKCAQEA98EuMMXI +6UKIt1kK2t/3OeJRyDd4iv/fCMUAnuPjLBvFE4cXD/SbqCxcQYqb+pue3PYkiTIC +71rN8OQAc5yKhzmmnCE5N26br/0pG4pwEjIr6mt8kZHmemOCNEzvhhT83nfKmV0g +9lNtuGEQMiwmZrpUOF51JOMC39bzcVjYX2Cmvb7cFbIq3lR0zwM+aZpQ4P8LHCIu +bgHmwbdlkLyIULJcQmHIbo6nPFB3ZZE4mqmjwY+rA6Fh9rgBa8OFCfTtrgeYXrNb +IgZQ5U8GoYRPNC2ot0vpTinraboa/cgm6oG4M7FW1POCJTl+/ktHEnKuO5oroSga +/BSg7hCNFVaOhwKCAQEA4Kkys0HtwEbV5mY/NnvUD5KwfXX7BxoXc9lZ6seVoLEc +KjgPYxqYRVrC7dB2YDwwp3qcRTi/uBAgFNm3iYlDzI4xS5SeaudUWjglj7BSgXE2 +iOEa7EwcvVPluLaTgiWjlzUKeUCNNHWSeQOt+paBOT+IgwRVemGVpAgkqQzNh/nP +tl3p9aNtgzEm1qVlPclY/XUCtf3bcOR+z1f1b4jBdn0leu5OhnxkC+Htik+2fTXD +jt6JGrMkanN25YzsjnD3Sn+v6SO26H99wnYx5oMSdmb8SlWRrKtfJHnihphjG/YY +l1cyorV6M/asSgXNQfGJm4OuJi0I4/FL2wLUHnU+JwKCAQEAzh4WipcRthYXXcoj +gMKRkMOb3GFh1OpYqJgVExtudNTJmZxq8GhFU51MR27Eo7LycMwKy2UjEfTOnplh +Us2qZiPtW7k8O8S2m6yXlYUQBeNdq9IuuYDTaYD94vsazscJNSAeGodjE+uGvb1q +1wLqE87yoE7dUInYa1cOA3+xy2/CaNuviBFJHtzOrSb6tqqenQEyQf6h9/12+DTW +t5pSIiixHrzxHiFqOoCLRKGToQB+71rSINwTf0nITNpGBWmSj5VcC3VV3TG5/XxI +fPlxV2yhD5WFDPVNGBGvwPDSh4jSMZdZMSNBZCy4XWFNSKjGEWoK4DFYed3DoSt9 +5IG1YwKCAQA63ntHl64KJUWlkwNbboU583FF3uWBjee5VqoGKHhf3CkKMxhtGqnt ++oN7t5VdUEhbinhqdx1dyPPvIsHCS3K1pkjqii4cyzNCVNYa2dQ00Qq+QWZBpwwc +3GAkz8rFXsGIPMDa1vxpU6mnBjzPniKMcsZ9tmQDppCEpBGfLpio2eAA5IkK8eEf +cIDB3CM0Vo94EvI76CJZabaE9IJ+0HIJb2+jz9BJ00yQBIqvJIYoNy9gP5Xjpi+T +qV/tdMkD5jwWjHD3AYHLWKUGkNwwkAYFeqT/gX6jpWBP+ZRPOp011X3KInJFSpKU +DT5GQ1Dux7EMTCwVGtXqjO8Ym5wjwwsfAoIBAEcxlhIW1G6BiNfnWbNPWBdh3v/K +5Ln98Rcrz8UIbWyl7qNPjYb13C1KmifVG1Rym9vWMO3KuG5atK3Mz2yLVRtmWAVc +fxzR57zz9MZFDun66xo+Z1wN3fVxQB4CYpOEI4Lb9ioX4v85hm3D6RpFukNtRQEc +Gfr4scTjJX4jFWDp0h6ffMb8mY+quvZoJ0TJqV9L9Yj6Ksdvqez/bdSraev97bHQ +4gbQxaTZ6WjaD4HjpPQefMdWp97Metg0ZQSS8b8EzmNFgyJ3XcjirzwliKTAQtn6 +I2sd0NCIooelrKRD8EJoDUwxoOctY7R97wpZ7/wEHU45cBCbRV3H4JILS5c= +-----END RSA PRIVATE KEY----- diff --git a/tls/src/main.rs b/tls/src/main.rs new file mode 100644 index 0000000..479ef8c --- /dev/null +++ b/tls/src/main.rs @@ -0,0 +1,49 @@ +#![allow(unused_variables)] +extern crate actix; +extern crate actix_web; +extern crate env_logger; +extern crate openssl; + +use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use actix_web::{ + http, middleware, server, App, HttpRequest, HttpResponse, Error}; + + +/// simple handle +fn index(req: HttpRequest) -> Result { + println!("{:?}", req); + Ok(HttpResponse::Ok() + .content_type("text/plain") + .body("Welcome!")) +} + +fn main() { + if ::std::env::var("RUST_LOG").is_err() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + } + env_logger::init(); + let sys = actix::System::new("ws-example"); + + // load ssl keys + let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + builder.set_private_key_file("key.pem", SslFiletype::PEM).unwrap(); + builder.set_certificate_chain_file("cert.pem").unwrap(); + + server::new( + || App::new() + // enable logger + .middleware(middleware::Logger::default()) + // register simple handler, handle all methods + .resource("/index.html", |r| r.f(index)) + // with path parameters + .resource("/", |r| r.method(http::Method::GET).f(|req| { + HttpResponse::Found() + .header("LOCATION", "/index.html") + .finish() + }))) + .bind("127.0.0.1:8443").unwrap() + .start_ssl(builder).unwrap(); + + println!("Started http server: 127.0.0.1:8443"); + let _ = sys.run(); +} diff --git a/unix-socket/Cargo.toml b/unix-socket/Cargo.toml new file mode 100644 index 0000000..806a4bc --- /dev/null +++ b/unix-socket/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "unix-socket" +version = "0.1.0" +authors = ["Messense Lv "] + +[dependencies] +env_logger = "0.5" +actix = "0.5" +actix-web = "^0.5" +tokio-uds = "0.1" diff --git a/unix-socket/README.md b/unix-socket/README.md new file mode 100644 index 0000000..03b0066 --- /dev/null +++ b/unix-socket/README.md @@ -0,0 +1,14 @@ +## Unix domain socket example + +```bash +$ curl --unix-socket /tmp/actix-uds.socket http://localhost/ +Hello world! +``` + +Although this will only one thread for handling incoming connections +according to the +[documentation](https://actix.github.io/actix-web/actix_web/struct.HttpServer.html#method.start_incoming). + +And it does not delete the socket file (`/tmp/actix-uds.socket`) when stopping +the server so it will fail to start next time you run it unless you delete +the socket file manually. diff --git a/unix-socket/src/main.rs b/unix-socket/src/main.rs new file mode 100644 index 0000000..c307184 --- /dev/null +++ b/unix-socket/src/main.rs @@ -0,0 +1,32 @@ +extern crate actix; +extern crate actix_web; +extern crate env_logger; +extern crate tokio_uds; + +use actix::*; +use actix_web::{middleware, server, App, HttpRequest}; +use tokio_uds::UnixListener; + + +fn index(_req: HttpRequest) -> &'static str { + "Hello world!" +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("unix-socket"); + + let listener = UnixListener::bind( + "/tmp/actix-uds.socket", Arbiter::handle()).expect("bind failed"); + server::new( + || App::new() + // enable logger + .middleware(middleware::Logger::default()) + .resource("/index.html", |r| r.f(|_| "Hello world!")) + .resource("/", |r| r.f(index))) + .start_incoming(listener.incoming(), false); + + println!("Started http server: /tmp/actix-uds.socket"); + let _ = sys.run(); +} diff --git a/web-cors/README.md b/web-cors/README.md new file mode 100644 index 0000000..6dd3d77 --- /dev/null +++ b/web-cors/README.md @@ -0,0 +1,15 @@ +# Actix Web CORS example + +## start +1 - backend server +```bash +$ cd web-cors/backend +$ cargo run +``` +2 - frontend server +```bash +$ cd web-cors/frontend +$ npm install +$ npm run dev +``` +then open browser 'http://localhost:1234/' diff --git a/web-cors/backend/.gitignore b/web-cors/backend/.gitignore new file mode 100644 index 0000000..250b626 --- /dev/null +++ b/web-cors/backend/.gitignore @@ -0,0 +1,4 @@ + +/target/ +**/*.rs.bk +Cargo.lock \ No newline at end of file diff --git a/web-cors/backend/Cargo.toml b/web-cors/backend/Cargo.toml new file mode 100644 index 0000000..23eace1 --- /dev/null +++ b/web-cors/backend/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "actix-web-cors" +version = "0.1.0" +authors = ["krircc "] + +[dependencies] +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +http = "0.1" + +actix = "0.5" +actix-web = "^0.5" +dotenv = "0.10" +env_logger = "0.5" +futures = "0.1" diff --git a/web-cors/backend/src/main.rs b/web-cors/backend/src/main.rs new file mode 100644 index 0000000..599be2c --- /dev/null +++ b/web-cors/backend/src/main.rs @@ -0,0 +1,43 @@ +#[macro_use] extern crate serde_derive; +extern crate serde; +extern crate serde_json; +extern crate futures; +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use std::env; +use actix_web::{http, middleware, server, App}; + +mod user; +use user::info; + + +fn main() { + env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + + let sys = actix::System::new("Actix-web-CORS"); + + server::new( + || App::new() + .middleware(middleware::Logger::default()) + .resource("/user/info", |r| { + middleware::cors::Cors::build() + .allowed_origin("http://localhost:1234") + .allowed_methods(vec!["GET", "POST"]) + .allowed_headers( + vec![http::header::AUTHORIZATION, + http::header::ACCEPT, + http::header::CONTENT_TYPE]) + .max_age(3600) + .finish().expect("Can not create CORS middleware") + .register(r); + r.method(http::Method::POST).a(info); + })) + .bind("127.0.0.1:8000").unwrap() + .shutdown_timeout(200) + .start(); + + let _ = sys.run(); +} diff --git a/web-cors/backend/src/user.rs b/web-cors/backend/src/user.rs new file mode 100644 index 0000000..364430f --- /dev/null +++ b/web-cors/backend/src/user.rs @@ -0,0 +1,19 @@ +use actix_web::{AsyncResponder, Error, HttpMessage, HttpResponse, HttpRequest}; +use futures::Future; + + +#[derive(Deserialize,Serialize, Debug)] +struct Info { + username: String, + email: String, + password: String, + confirm_password: String, +} + +pub fn info(req: HttpRequest) -> Box> { + req.json() + .from_err() + .and_then(|res: Info| { + Ok(HttpResponse::Ok().json(res)) + }).responder() +} diff --git a/web-cors/frontend/.babelrc b/web-cors/frontend/.babelrc new file mode 100644 index 0000000..002b4aa --- /dev/null +++ b/web-cors/frontend/.babelrc @@ -0,0 +1,3 @@ +{ + "presets": ["env"] +} diff --git a/web-cors/frontend/.gitignore b/web-cors/frontend/.gitignore new file mode 100644 index 0000000..8875af8 --- /dev/null +++ b/web-cors/frontend/.gitignore @@ -0,0 +1,14 @@ +.DS_Store +node_modules/ +/dist/ +.cache +npm-debug.log* +yarn-debug.log* +yarn-error.log* + +# Editor directories and files +.idea +*.suo +*.ntvs* +*.njsproj +*.sln diff --git a/web-cors/frontend/index.html b/web-cors/frontend/index.html new file mode 100644 index 0000000..d71de81 --- /dev/null +++ b/web-cors/frontend/index.html @@ -0,0 +1,13 @@ + + + + + + webapp + + +
+ + + + \ No newline at end of file diff --git a/web-cors/frontend/package.json b/web-cors/frontend/package.json new file mode 100644 index 0000000..7ce2f64 --- /dev/null +++ b/web-cors/frontend/package.json @@ -0,0 +1,22 @@ +{ + "name": "actix-web-cors", + "version": "0.1.0", + "description": "webapp", + "main": "main.js", + "scripts": { + "dev": "rm -rf dist/ && NODE_ENV=development parcel index.html", + "build": "NODE_ENV=production parcel build index.html", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "license": "ISC", + "dependencies": { + "vue": "^2.5.13", + "vue-router": "^3.0.1", + "axios": "^0.17.1" + }, + "devDependencies": { + "babel-preset-env": "^1.6.1", + "parcel-bundler": "^1.4.1", + "parcel-plugin-vue": "^1.5.0" + } +} diff --git a/web-cors/frontend/src/app.vue b/web-cors/frontend/src/app.vue new file mode 100644 index 0000000..0c054c2 --- /dev/null +++ b/web-cors/frontend/src/app.vue @@ -0,0 +1,145 @@ + + + + + \ No newline at end of file diff --git a/web-cors/frontend/src/main.js b/web-cors/frontend/src/main.js new file mode 100644 index 0000000..df1e4b7 --- /dev/null +++ b/web-cors/frontend/src/main.js @@ -0,0 +1,11 @@ +import Vue from 'vue' +import App from './app' + +new Vue({ + el: '#app', + render: h => h(App) +}) + +if (module.hot) { + module.hot.accept(); +} \ No newline at end of file diff --git a/websocket-chat/Cargo.toml b/websocket-chat/Cargo.toml new file mode 100644 index 0000000..8109c4a --- /dev/null +++ b/websocket-chat/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "websocket-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[[bin]] +name = "server" +path = "src/main.rs" + +[[bin]] +name = "client" +path = "src/client.rs" + +[dependencies] +rand = "*" +bytes = "0.4" +byteorder = "1.1" +futures = "0.1" +tokio-io = "0.1" +tokio-core = "0.1" +env_logger = "*" + +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +actix = "0.5" +actix-web = "^0.5" diff --git a/websocket-chat/README.md b/websocket-chat/README.md new file mode 100644 index 0000000..a01dd68 --- /dev/null +++ b/websocket-chat/README.md @@ -0,0 +1,32 @@ +# Websocket chat example + +This is extension of the +[actix chat example](https://github.com/actix/actix/tree/master/examples/chat) + +Added features: + +* Browser WebSocket client +* Chat server runs in separate thread +* Tcp listener runs in separate thread + +## Server + +Chat server listens for incoming tcp connections. Server can access several types of message: + +* `\list` - list all available rooms +* `\join name` - join room, if room does not exist, create new one +* `\name name` - set session name +* `some message` - just string, send message to all peers in same room +* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped + +To start server use command: `cargo run --bin server` + +## Client + +Client connects to server. Reads input from stdin and sends to server. + +To run client use command: `cargo run --bin client` + +## WebSocket Browser Client + +Open url: [http://localhost:8080/](http://localhost:8080/) diff --git a/websocket-chat/client.py b/websocket-chat/client.py new file mode 100755 index 0000000..8a1bd9a --- /dev/null +++ b/websocket-chat/client.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +"""websocket cmd client for wssrv.py example.""" +import argparse +import asyncio +import signal +import sys + +import aiohttp + + +def start_client(loop, url): + name = input('Please enter your name: ') + + # send request + ws = yield from aiohttp.ClientSession().ws_connect(url, autoclose=False, autoping=False) + + # input reader + def stdin_callback(): + line = sys.stdin.buffer.readline().decode('utf-8') + if not line: + loop.stop() + else: + ws.send_str(name + ': ' + line) + loop.add_reader(sys.stdin.fileno(), stdin_callback) + + @asyncio.coroutine + def dispatch(): + while True: + msg = yield from ws.receive() + + if msg.type == aiohttp.WSMsgType.TEXT: + print('Text: ', msg.data.strip()) + elif msg.type == aiohttp.WSMsgType.BINARY: + print('Binary: ', msg.data) + elif msg.type == aiohttp.WSMsgType.PING: + ws.pong() + elif msg.type == aiohttp.WSMsgType.PONG: + print('Pong received') + else: + if msg.type == aiohttp.WSMsgType.CLOSE: + yield from ws.close() + elif msg.type == aiohttp.WSMsgType.ERROR: + print('Error during receive %s' % ws.exception()) + elif msg.type == aiohttp.WSMsgType.CLOSED: + pass + + break + + yield from dispatch() + + +ARGS = argparse.ArgumentParser( + description="websocket console client for wssrv.py example.") +ARGS.add_argument( + '--host', action="store", dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action="store", dest='port', + default=8080, type=int, help='Port number') + +if __name__ == '__main__': + args = ARGS.parse_args() + if ':' in args.host: + args.host, port = args.host.split(':', 1) + args.port = int(port) + + url = 'http://{}:{}/ws/'.format(args.host, args.port) + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, loop.stop) + asyncio.Task(start_client(loop, url)) + loop.run_forever() diff --git a/websocket-chat/src/client.rs b/websocket-chat/src/client.rs new file mode 100644 index 0000000..e2e6a7c --- /dev/null +++ b/websocket-chat/src/client.rs @@ -0,0 +1,153 @@ +#[macro_use] extern crate actix; +extern crate bytes; +extern crate byteorder; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate serde; +extern crate serde_json; +#[macro_use] extern crate serde_derive; + +use std::{io, net, process, thread}; +use std::str::FromStr; +use std::time::Duration; +use futures::Future; +use tokio_io::AsyncRead; +use tokio_io::io::WriteHalf; +use tokio_io::codec::FramedRead; +use tokio_core::net::TcpStream; +use actix::prelude::*; + +mod codec; + + +fn main() { + let sys = actix::System::new("chat-client"); + + // Connect to server + let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); + Arbiter::handle().spawn( + TcpStream::connect(&addr, Arbiter::handle()) + .and_then(|stream| { + let addr: Addr = ChatClient::create(|ctx| { + let (r, w) = stream.split(); + ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); + ChatClient{ + framed: actix::io::FramedWrite::new( + w, codec::ClientChatCodec, ctx)}}); + + // start console loop + thread::spawn(move|| { + loop { + let mut cmd = String::new(); + if io::stdin().read_line(&mut cmd).is_err() { + println!("error"); + return + } + + addr.do_send(ClientCommand(cmd)); + } + }); + + futures::future::ok(()) + }) + .map_err(|e| { + println!("Can not connect to server: {}", e); + process::exit(1) + }) + ); + + println!("Running chat client"); + sys.run(); +} + + +struct ChatClient { + framed: actix::io::FramedWrite, codec::ClientChatCodec>, +} + +#[derive(Message)] +struct ClientCommand(String); + +impl Actor for ChatClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } + + fn stopped(&mut self, _: &mut Context) { + println!("Disconnected"); + + // Stop application on disconnect + Arbiter::system().do_send(actix::msgs::SystemExit(0)); + } +} + +impl ChatClient { + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + act.framed.write(codec::ChatRequest::Ping); + act.hb(ctx); + }); + } +} + +impl actix::io::WriteHandler for ChatClient {} + +/// Handle stdin commands +impl Handler for ChatClient { + type Result = (); + + fn handle(&mut self, msg: ClientCommand, _: &mut Context) { + let m = msg.0.trim(); + if m.is_empty() { + return + } + + // we check for /sss type of messages + if m.starts_with('/') { + let v: Vec<&str> = m.splitn(2, ' ').collect(); + match v[0] { + "/list" => { + self.framed.write(codec::ChatRequest::List); + }, + "/join" => { + if v.len() == 2 { + self.framed.write(codec::ChatRequest::Join(v[1].to_owned())); + } else { + println!("!!! room name is required"); + } + }, + _ => println!("!!! unknown command"), + } + } else { + self.framed.write(codec::ChatRequest::Message(m.to_owned())); + } + } +} + +/// Server communication + +impl StreamHandler for ChatClient { + + fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context) { + match msg { + codec::ChatResponse::Message(ref msg) => { + println!("message: {}", msg); + } + codec::ChatResponse::Joined(ref msg) => { + println!("!!! joined: {}", msg); + } + codec::ChatResponse::Rooms(rooms) => { + println!("\n!!! Available rooms:"); + for room in rooms { + println!("{}", room); + } + println!(""); + } + _ => (), + } + } +} diff --git a/websocket-chat/src/codec.rs b/websocket-chat/src/codec.rs new file mode 100644 index 0000000..0363824 --- /dev/null +++ b/websocket-chat/src/codec.rs @@ -0,0 +1,123 @@ +#![allow(dead_code)] +use std::io; +use serde_json as json; +use byteorder::{BigEndian , ByteOrder}; +use bytes::{BytesMut, BufMut}; +use tokio_io::codec::{Encoder, Decoder}; + +/// Client request +#[derive(Serialize, Deserialize, Debug, Message)] +#[serde(tag="cmd", content="data")] +pub enum ChatRequest { + /// List rooms + List, + /// Join rooms + Join(String), + /// Send message + Message(String), + /// Ping + Ping +} + +/// Server response +#[derive(Serialize, Deserialize, Debug, Message)] +#[serde(tag="cmd", content="data")] +pub enum ChatResponse { + Ping, + + /// List of rooms + Rooms(Vec), + + /// Joined + Joined(String), + + /// Message + Message(String), +} + +/// Codec for Client -> Server transport +pub struct ChatCodec; + +impl Decoder for ChatCodec +{ + type Item = ChatRequest; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let size = { + if src.len() < 2 { + return Ok(None) + } + BigEndian::read_u16(src.as_ref()) as usize + }; + + if src.len() >= size + 2 { + src.split_to(2); + let buf = src.split_to(size); + Ok(Some(json::from_slice::(&buf)?)) + } else { + Ok(None) + } + } +} + +impl Encoder for ChatCodec +{ + type Item = ChatResponse; + type Error = io::Error; + + fn encode(&mut self, msg: ChatResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + let msg = json::to_string(&msg).unwrap(); + let msg_ref: &[u8] = msg.as_ref(); + + dst.reserve(msg_ref.len() + 2); + dst.put_u16::(msg_ref.len() as u16); + dst.put(msg_ref); + + Ok(()) + } +} + + +/// Codec for Server -> Client transport +pub struct ClientChatCodec; + +impl Decoder for ClientChatCodec +{ + type Item = ChatResponse; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let size = { + if src.len() < 2 { + return Ok(None) + } + BigEndian::read_u16(src.as_ref()) as usize + }; + + if src.len() >= size + 2 { + src.split_to(2); + let buf = src.split_to(size); + Ok(Some(json::from_slice::(&buf)?)) + } else { + Ok(None) + } + } +} + +impl Encoder for ClientChatCodec +{ + type Item = ChatRequest; + type Error = io::Error; + + fn encode(&mut self, msg: ChatRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + let msg = json::to_string(&msg).unwrap(); + let msg_ref: &[u8] = msg.as_ref(); + + dst.reserve(msg_ref.len() + 2); + dst.put_u16::(msg_ref.len() as u16); + dst.put(msg_ref); + + Ok(()) + } +} diff --git a/websocket-chat/src/main.rs b/websocket-chat/src/main.rs new file mode 100644 index 0000000..5cd3e6e --- /dev/null +++ b/websocket-chat/src/main.rs @@ -0,0 +1,209 @@ +#![allow(unused_variables)] +extern crate rand; +extern crate bytes; +extern crate byteorder; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate env_logger; +extern crate serde; +extern crate serde_json; +#[macro_use] extern crate serde_derive; + +#[macro_use] +extern crate actix; +extern crate actix_web; + +use std::time::Instant; + +use actix::*; +use actix_web::server::HttpServer; +use actix_web::{http, fs, ws, App, HttpRequest, HttpResponse, Error}; + +mod codec; +mod server; +mod session; + +/// This is our websocket route state, this state is shared with all route instances +/// via `HttpContext::state()` +struct WsChatSessionState { + addr: Addr, +} + +/// Entry point for our route +fn chat_route(req: HttpRequest) -> Result { + ws::start( + req, + WsChatSession { + id: 0, + hb: Instant::now(), + room: "Main".to_owned(), + name: None}) +} + +struct WsChatSession { + /// unique session id + id: usize, + /// Client must send ping at least once per 10 seconds, otherwise we drop connection. + hb: Instant, + /// joined room + room: String, + /// peer name + name: Option, +} + +impl Actor for WsChatSession { + type Context = ws::WebsocketContext; + + /// Method is called on actor start. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // HttpContext::state() is instance of WsChatSessionState, state is shared across all + // routes within application + let addr: Addr = ctx.address(); + ctx.state().addr.send(server::Connect{addr: addr.recipient()}) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ok(()) + }).wait(ctx); + } + + fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + // notify chat server + ctx.state().addr.do_send(server::Disconnect{id: self.id}); + Running::Stop + } +} + +/// Handle messages from chat server, we simply send it to peer websocket +impl Handler for WsChatSession { + type Result = (); + + fn handle(&mut self, msg: session::Message, ctx: &mut Self::Context) { + ctx.text(msg.0); + } +} + +/// WebSocket message handler +impl StreamHandler for WsChatSession { + + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + println!("WEBSOCKET MESSAGE: {:?}", msg); + match msg { + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Pong(msg) => self.hb = Instant::now(), + ws::Message::Text(text) => { + let m = text.trim(); + // we check for /sss type of messages + if m.starts_with('/') { + let v: Vec<&str> = m.splitn(2, ' ').collect(); + match v[0] { + "/list" => { + // Send ListRooms message to chat server and wait for response + println!("List rooms"); + ctx.state().addr.send(server::ListRooms) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(rooms) => { + for room in rooms { + ctx.text(room); + } + }, + _ => println!("Something is wrong"), + } + fut::ok(()) + }).wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list + // of rooms back + }, + "/join" => { + if v.len() == 2 { + self.room = v[1].to_owned(); + ctx.state().addr.do_send( + server::Join{id: self.id, name: self.room.clone()}); + + ctx.text("joined"); + } else { + ctx.text("!!! room name is required"); + } + }, + "/name" => { + if v.len() == 2 { + self.name = Some(v[1].to_owned()); + } else { + ctx.text("!!! name is required"); + } + }, + _ => ctx.text(format!("!!! unknown command: {:?}", m)), + } + } else { + let msg = if let Some(ref name) = self.name { + format!("{}: {}", name, m) + } else { + m.to_owned() + }; + // send message to chat server + ctx.state().addr.do_send( + server::Message{id: self.id, + msg: msg, + room: self.room.clone()}) + } + }, + ws::Message::Binary(bin) => + println!("Unexpected binary"), + ws::Message::Close(_) => { + ctx.stop(); + } + } + } +} + +fn main() { + let _ = env_logger::init(); + let sys = actix::System::new("websocket-example"); + + // Start chat server actor in separate thread + let server: Addr = Arbiter::start(|_| server::ChatServer::default()); + + // Start tcp server in separate thread + let srv = server.clone(); + Arbiter::new("tcp-server").do_send::( + msgs::Execute::new(move || { + session::TcpServer::new("127.0.0.1:12345", srv); + Ok(()) + })); + + // Create Http server with websocket support + HttpServer::new( + move || { + // Websocket sessions state + let state = WsChatSessionState { addr: server.clone() }; + + App::with_state(state) + // redirect to websocket.html + .resource("/", |r| r.method(http::Method::GET).f(|_| { + HttpResponse::Found() + .header("LOCATION", "/static/websocket.html") + .finish() + })) + // websocket + .resource("/ws/", |r| r.route().f(chat_route)) + // static resources + .handler("/static/", fs::StaticFiles::new("static/")) + }) + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/websocket-chat/src/server.rs b/websocket-chat/src/server.rs new file mode 100644 index 0000000..8b735b8 --- /dev/null +++ b/websocket-chat/src/server.rs @@ -0,0 +1,197 @@ +//! `ChatServer` is an actor. It maintains list of connection client session. +//! And manages available rooms. Peers send messages to other peers in same +//! room through `ChatServer`. + +use std::cell::RefCell; +use std::collections::{HashMap, HashSet}; +use rand::{self, Rng, ThreadRng}; +use actix::prelude::*; + +use session; + +/// Message for chat server communications + +/// New chat session is created +#[derive(Message)] +#[rtype(usize)] +pub struct Connect { + pub addr: Recipient, +} + +/// Session is disconnected +#[derive(Message)] +pub struct Disconnect { + pub id: usize, +} + +/// Send message to specific room +#[derive(Message)] +pub struct Message { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, + /// Room name + pub room: String, +} + +/// List of available rooms +pub struct ListRooms; + +impl actix::Message for ListRooms { + type Result = Vec; +} + +/// Join room, if room does not exists create new one. +#[derive(Message)] +pub struct Join { + /// Client id + pub id: usize, + /// Room name + pub name: String, +} + +/// `ChatServer` manages chat rooms and responsible for coordinating chat session. +/// implementation is super primitive +pub struct ChatServer { + sessions: HashMap>, + rooms: HashMap>, + rng: RefCell, +} + +impl Default for ChatServer { + fn default() -> ChatServer { + // default room + let mut rooms = HashMap::new(); + rooms.insert("Main".to_owned(), HashSet::new()); + + ChatServer { + sessions: HashMap::new(), + rooms: rooms, + rng: RefCell::new(rand::thread_rng()), + } + } +} + +impl ChatServer { + /// Send message to all users in the room + fn send_message(&self, room: &str, message: &str, skip_id: usize) { + if let Some(sessions) = self.rooms.get(room) { + for id in sessions { + if *id != skip_id { + if let Some(addr) = self.sessions.get(id) { + let _ = addr.do_send(session::Message(message.to_owned())); + } + } + } + } + } +} + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context; +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { + println!("Someone joined"); + + // notify all users in same room + self.send_message(&"Main".to_owned(), "Someone joined", 0); + + // register session with random id + let id = self.rng.borrow_mut().gen::(); + self.sessions.insert(id, msg.addr); + + // auto join session to Main room + self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); + + // send id back + id + } +} + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context) { + println!("Someone disconnected"); + + let mut rooms: Vec = Vec::new(); + + // remove address + if self.sessions.remove(&msg.id).is_some() { + // remove session from all rooms + for (name, sessions) in &mut self.rooms { + if sessions.remove(&msg.id) { + rooms.push(name.to_owned()); + } + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + } +} + +/// Handler for Message message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Message, _: &mut Context) { + self.send_message(&msg.room, msg.msg.as_str(), msg.id); + } +} + +/// Handler for `ListRooms` message. +impl Handler for ChatServer { + type Result = MessageResult; + + fn handle(&mut self, _: ListRooms, _: &mut Context) -> Self::Result { + let mut rooms = Vec::new(); + + for key in self.rooms.keys() { + rooms.push(key.to_owned()) + } + + MessageResult(rooms) + } +} + +/// Join room, send disconnect message to old room +/// send join message to new room +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Join, _: &mut Context) { + let Join {id, name} = msg; + let mut rooms = Vec::new(); + + // remove session from all rooms + for (n, sessions) in &mut self.rooms { + if sessions.remove(&id) { + rooms.push(n.to_owned()); + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + + if self.rooms.get_mut(&name).is_none() { + self.rooms.insert(name.clone(), HashSet::new()); + } + self.send_message(&name, "Someone connected", id); + self.rooms.get_mut(&name).unwrap().insert(id); + } +} diff --git a/websocket-chat/src/session.rs b/websocket-chat/src/session.rs new file mode 100644 index 0000000..7f28c6a --- /dev/null +++ b/websocket-chat/src/session.rs @@ -0,0 +1,207 @@ +//! `ClientSession` is an actor, it manages peer tcp connection and +//! proxies commands from peer to `ChatServer`. +use std::{io, net}; +use std::str::FromStr; +use std::time::{Instant, Duration}; +use futures::Stream; +use tokio_io::AsyncRead; +use tokio_io::io::WriteHalf; +use tokio_io::codec::FramedRead; +use tokio_core::net::{TcpStream, TcpListener}; + +use actix::prelude::*; + +use server::{self, ChatServer}; +use codec::{ChatRequest, ChatResponse, ChatCodec}; + + +/// Chat server sends this messages to session +#[derive(Message)] +pub struct Message(pub String); + +/// `ChatSession` actor is responsible for tcp peer communications. +pub struct ChatSession { + /// unique session id + id: usize, + /// this is address of chat server + addr: Addr, + /// Client must send ping at least once per 10 seconds, otherwise we drop connection. + hb: Instant, + /// joined room + room: String, + /// Framed wrapper + framed: actix::io::FramedWrite, ChatCodec>, +} + +impl Actor for ChatSession { + /// For tcp communication we are going to use `FramedContext`. + /// It is convenient wrapper around `Framed` object from `tokio_io` + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + let addr: Addr = ctx.address(); + self.addr.send(server::Connect{addr: addr.recipient()}) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + actix::fut::ok(()) + }).wait(ctx); + } + + fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + // notify chat server + self.addr.do_send(server::Disconnect{id: self.id}); + Running::Stop + } +} + +impl actix::io::WriteHandler for ChatSession {} + +/// To use `Framed` we have to define Io type and Codec +impl StreamHandler for ChatSession { + + /// This is main event loop for client requests + fn handle(&mut self, msg: ChatRequest, ctx: &mut Context) { + match msg { + ChatRequest::List => { + // Send ListRooms message to chat server and wait for response + println!("List rooms"); + self.addr.send(server::ListRooms) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(rooms) => { + act.framed.write(ChatResponse::Rooms(rooms)); + }, + _ => println!("Something is wrong"), + } + actix::fut::ok(()) + }).wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list of rooms back + }, + ChatRequest::Join(name) => { + println!("Join to room: {}", name); + self.room = name.clone(); + self.addr.do_send(server::Join{id: self.id, name: name.clone()}); + self.framed.write(ChatResponse::Joined(name)); + }, + ChatRequest::Message(message) => { + // send message to chat server + println!("Peer message: {}", message); + self.addr.do_send( + server::Message{id: self.id, + msg: message, room: + self.room.clone()}) + } + // we update heartbeat time on ping from peer + ChatRequest::Ping => + self.hb = Instant::now(), + } + } +} + +/// Handler for Message, chat server sends this message, we just send string to peer +impl Handler for ChatSession { + type Result = (); + + fn handle(&mut self, msg: Message, ctx: &mut Context) { + // send message to peer + self.framed.write(ChatResponse::Message(msg.0)); + } +} + +/// Helper methods +impl ChatSession { + + pub fn new(addr: Addr, + framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession { + ChatSession {id: 0, addr: addr, hb: Instant::now(), + room: "Main".to_owned(), framed: framed} + } + + /// helper method that sends ping to client every second. + /// + /// also this method check heartbeats from client + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { + // heartbeat timed out + println!("Client heartbeat failed, disconnecting!"); + + // notify chat server + act.addr.do_send(server::Disconnect{id: act.id}); + + // stop actor + ctx.stop(); + } + + act.framed.write(ChatResponse::Ping); + // if we can not send message to sink, sink is closed (disconnected) + act.hb(ctx); + }); + } +} + + +/// Define tcp server that will accept incoming tcp connection and create +/// chat actors. +pub struct TcpServer { + chat: Addr, +} + +impl TcpServer { + pub fn new(s: &str, chat: Addr) { + // Create server listener + let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); + let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); + + // Our chat server `Server` is an actor, first we need to start it + // and then add stream on incoming tcp connections to it. + // TcpListener::incoming() returns stream of the (TcpStream, net::SocketAddr) items + // So to be able to handle this events `Server` actor has to implement + // stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>` + let _: () = TcpServer::create(|ctx| { + ctx.add_message_stream(listener.incoming() + .map_err(|_| ()) + .map(|(t, a)| TcpConnect(t, a))); + TcpServer{chat: chat} + }); + } +} + +/// Make actor from `Server` +impl Actor for TcpServer { + /// Every actor has to provide execution `Context` in which it can run. + type Context = Context; +} + +#[derive(Message)] +struct TcpConnect(TcpStream, net::SocketAddr); + +/// Handle stream of TcpStream's +impl Handler for TcpServer { + type Result = (); + + fn handle(&mut self, msg: TcpConnect, _: &mut Context) { + // For each incoming connection we create `ChatSession` actor + // with out chat server address. + let server = self.chat.clone(); + let _: () = ChatSession::create(|ctx| { + let (r, w) = msg.0.split(); + ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); + ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx)) + }); + } +} diff --git a/websocket-chat/static/websocket.html b/websocket-chat/static/websocket.html new file mode 100644 index 0000000..e59e13f --- /dev/null +++ b/websocket-chat/static/websocket.html @@ -0,0 +1,90 @@ + + + + + + + + +

Chat!

+
+  | Status: + disconnected +
+
+
+
+ + +
+ + diff --git a/websocket/Cargo.toml b/websocket/Cargo.toml new file mode 100644 index 0000000..2fe63b2 --- /dev/null +++ b/websocket/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "websocket" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[[bin]] +name = "server" +path = "src/main.rs" + +[[bin]] +name = "client" +path = "src/client.rs" + +[dependencies] +env_logger = "*" +futures = "0.1" +tokio-core = "0.1" +actix = "0.5" +actix-web = "^0.5" diff --git a/websocket/README.md b/websocket/README.md new file mode 100644 index 0000000..8ffcba8 --- /dev/null +++ b/websocket/README.md @@ -0,0 +1,27 @@ +# websocket + +Simple echo websocket server. + +## Usage + +### server + +```bash +cd actix-web/examples/websocket +cargo run +# Started http server: 127.0.0.1:8080 +``` + +### web client + +- [http://localhost:8080/ws/index.html](http://localhost:8080/ws/index.html) + +### python client + +- ``pip install aiohttp`` +- ``python websocket-client.py`` + +if ubuntu : + +- ``pip3 install aiohttp`` +- ``python3 websocket-client.py`` diff --git a/websocket/src/client.rs b/websocket/src/client.rs new file mode 100644 index 0000000..34ff243 --- /dev/null +++ b/websocket/src/client.rs @@ -0,0 +1,113 @@ +//! Simple websocket client. + +#![allow(unused_variables)] +extern crate actix; +extern crate actix_web; +extern crate env_logger; +extern crate futures; +extern crate tokio_core; + +use std::{io, thread}; +use std::time::Duration; + +use actix::*; +use futures::Future; +use actix_web::ws::{Message, ProtocolError, Client, ClientWriter}; + + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + let _ = env_logger::init(); + let sys = actix::System::new("ws-example"); + + Arbiter::handle().spawn( + Client::new("http://127.0.0.1:8080/ws/") + .connect() + .map_err(|e| { + println!("Error: {}", e); + () + }) + .map(|(reader, writer)| { + let addr: Addr = ChatClient::create(|ctx| { + ChatClient::add_stream(reader, ctx); + ChatClient(writer) + }); + + // start console loop + thread::spawn(move|| { + loop { + let mut cmd = String::new(); + if io::stdin().read_line(&mut cmd).is_err() { + println!("error"); + return + } + addr.do_send(ClientCommand(cmd)); + } + }); + + () + }) + ); + + let _ = sys.run(); +} + + +struct ChatClient(ClientWriter); + +#[derive(Message)] +struct ClientCommand(String); + +impl Actor for ChatClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } + + fn stopped(&mut self, _: &mut Context) { + println!("Disconnected"); + + // Stop application on disconnect + Arbiter::system().do_send(actix::msgs::SystemExit(0)); + } +} + +impl ChatClient { + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + act.0.ping(""); + act.hb(ctx); + }); + } +} + +/// Handle stdin commands +impl Handler for ChatClient { + type Result = (); + + fn handle(&mut self, msg: ClientCommand, ctx: &mut Context) { + self.0.text(msg.0) + } +} + +/// Handle server websocket messages +impl StreamHandler for ChatClient { + + fn handle(&mut self, msg: Message, ctx: &mut Context) { + match msg { + Message::Text(txt) => println!("Server: {:?}", txt), + _ => () + } + } + + fn started(&mut self, ctx: &mut Context) { + println!("Connected"); + } + + fn finished(&mut self, ctx: &mut Context) { + println!("Server disconnected"); + ctx.stop() + } +} diff --git a/websocket/src/main.rs b/websocket/src/main.rs new file mode 100644 index 0000000..11292a9 --- /dev/null +++ b/websocket/src/main.rs @@ -0,0 +1,66 @@ +//! Simple echo websocket server. +//! Open `http://localhost:8080/ws/index.html` in browser +//! or [python console client](https://github.com/actix/actix-web/blob/master/examples/websocket-client.py) +//! could be used for testing. + +#![allow(unused_variables)] +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use actix::prelude::*; +use actix_web::{ + http, middleware, server, fs, ws, App, HttpRequest, HttpResponse, Error}; + +/// do websocket handshake and start `MyWebSocket` actor +fn ws_index(r: HttpRequest) -> Result { + ws::start(r, MyWebSocket) +} + +/// websocket connection is long running connection, it easier +/// to handle with an actor +struct MyWebSocket; + +impl Actor for MyWebSocket { + type Context = ws::WebsocketContext; +} + +/// Handler for `ws::Message` +impl StreamHandler for MyWebSocket { + + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + // process websocket messages + println!("WS: {:?}", msg); + match msg { + ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Text(text) => ctx.text(text), + ws::Message::Binary(bin) => ctx.binary(bin), + ws::Message::Close(_) => { + ctx.stop(); + } + _ => (), + } + } +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("ws-example"); + + server::new( + || App::new() + // enable logger + .middleware(middleware::Logger::default()) + // websocket route + .resource("/ws/", |r| r.method(http::Method::GET).f(ws_index)) + // static files + .handler("/", fs::StaticFiles::new("../static/") + .index_file("index.html"))) + // start http server on 127.0.0.1:8080 + .bind("127.0.0.1:8080").unwrap() + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/websocket/websocket-client.py b/websocket/websocket-client.py new file mode 100755 index 0000000..8a1bd9a --- /dev/null +++ b/websocket/websocket-client.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +"""websocket cmd client for wssrv.py example.""" +import argparse +import asyncio +import signal +import sys + +import aiohttp + + +def start_client(loop, url): + name = input('Please enter your name: ') + + # send request + ws = yield from aiohttp.ClientSession().ws_connect(url, autoclose=False, autoping=False) + + # input reader + def stdin_callback(): + line = sys.stdin.buffer.readline().decode('utf-8') + if not line: + loop.stop() + else: + ws.send_str(name + ': ' + line) + loop.add_reader(sys.stdin.fileno(), stdin_callback) + + @asyncio.coroutine + def dispatch(): + while True: + msg = yield from ws.receive() + + if msg.type == aiohttp.WSMsgType.TEXT: + print('Text: ', msg.data.strip()) + elif msg.type == aiohttp.WSMsgType.BINARY: + print('Binary: ', msg.data) + elif msg.type == aiohttp.WSMsgType.PING: + ws.pong() + elif msg.type == aiohttp.WSMsgType.PONG: + print('Pong received') + else: + if msg.type == aiohttp.WSMsgType.CLOSE: + yield from ws.close() + elif msg.type == aiohttp.WSMsgType.ERROR: + print('Error during receive %s' % ws.exception()) + elif msg.type == aiohttp.WSMsgType.CLOSED: + pass + + break + + yield from dispatch() + + +ARGS = argparse.ArgumentParser( + description="websocket console client for wssrv.py example.") +ARGS.add_argument( + '--host', action="store", dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action="store", dest='port', + default=8080, type=int, help='Port number') + +if __name__ == '__main__': + args = ARGS.parse_args() + if ':' in args.host: + args.host, port = args.host.split(':', 1) + args.port = int(port) + + url = 'http://{}:{}/ws/'.format(args.host, args.port) + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, loop.stop) + asyncio.Task(start_client(loop, url)) + loop.run_forever()