From 48b8e7c335df9f0f195a5c7b0d7393e7f3c6b6fa Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 26 Mar 2019 23:33:13 -0700 Subject: [PATCH] migrate http proxy examples --- async_db/Cargo.toml | 3 +- async_db/src/db.rs | 9 +-- async_ex1/Cargo.toml | 1 - async_ex1/src/main.rs | 59 ++++++++++-------- http-full-proxy/Cargo.toml | 7 ++- http-full-proxy/src/main.rs | 119 +++++++++++++----------------------- http-proxy/Cargo.toml | 9 +-- http-proxy/src/main.rs | 56 +++++++---------- http-proxy/src/server.rs | 39 ++++-------- 9 files changed, 125 insertions(+), 177 deletions(-) diff --git a/async_db/Cargo.toml b/async_db/Cargo.toml index 3161c19d..7848d1b7 100644 --- a/async_db/Cargo.toml +++ b/async_db/Cargo.toml @@ -15,7 +15,8 @@ failure = "0.1.1" futures = "0.1" num_cpus = "1.10.0" r2d2 = "0.8.2" -r2d2_sqlite = "0.5.0" +r2d2_sqlite = "0.8.0" +rusqlite = "0.16" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" diff --git a/async_db/src/db.rs b/async_db/src/db.rs index e23adba2..4ca80daf 100644 --- a/async_db/src/db.rs +++ b/async_db/src/db.rs @@ -3,6 +3,7 @@ use failure::Error; use futures::Future; use r2d2; use r2d2_sqlite; +use rusqlite::NO_PARAMS; use serde_derive::{Deserialize, Serialize}; use std::{thread::sleep, time::Duration}; @@ -47,7 +48,7 @@ fn get_hottest_years(conn: Connection) -> Result, Error> { let mut prep_stmt = conn.prepare(stmt)?; let annuals = prep_stmt - .query_map(&[], |row| WeatherAgg::AnnualAgg { + .query_map(NO_PARAMS, |row| WeatherAgg::AnnualAgg { year: row.get(0), total: row.get(1), }) @@ -73,7 +74,7 @@ fn get_coldest_years(conn: Connection) -> Result, Error> { let mut prep_stmt = conn.prepare(stmt)?; let annuals = prep_stmt - .query_map(&[], |row| WeatherAgg::AnnualAgg { + .query_map(NO_PARAMS, |row| WeatherAgg::AnnualAgg { year: row.get(0), total: row.get(1), }) @@ -99,7 +100,7 @@ fn get_hottest_months(conn: Connection) -> Result, Error> { let mut prep_stmt = conn.prepare(stmt)?; let annuals = prep_stmt - .query_map(&[], |row| WeatherAgg::MonthAgg { + .query_map(NO_PARAMS, |row| WeatherAgg::MonthAgg { year: row.get(0), month: row.get(1), total: row.get(2), @@ -125,7 +126,7 @@ fn get_coldest_months(conn: Connection) -> Result, Error> { let mut prep_stmt = conn.prepare(stmt)?; let annuals = prep_stmt - .query_map(&[], |row| WeatherAgg::MonthAgg { + .query_map(NO_PARAMS, |row| WeatherAgg::MonthAgg { year: row.get(0), month: row.get(1), total: row.get(2), diff --git a/async_ex1/Cargo.toml b/async_ex1/Cargo.toml index 01a79760..f89f97b5 100644 --- a/async_ex1/Cargo.toml +++ b/async_ex1/Cargo.toml @@ -7,7 +7,6 @@ workspace = ".." [dependencies] actix-rt = "0.2" -actix-http = { git="https://github.com/actix/actix-http.git", features=["ssl"] } actix-web = { git="https://github.com/actix/actix-web.git", features=["ssl"] } futures = "0.1" diff --git a/async_ex1/src/main.rs b/async_ex1/src/main.rs index 8781ce9c..a9eb054b 100644 --- a/async_ex1/src/main.rs +++ b/async_ex1/src/main.rs @@ -23,7 +23,7 @@ extern crate serde_derive; use std::collections::HashMap; use std::io; -use actix_http::client; +use actix_web::client::Client; use actix_web::web::BytesMut; use actix_web::{web, App, Error, HttpResponse, HttpServer}; use futures::{Future, Stream}; @@ -54,14 +54,14 @@ struct HttpBinResponse { // ----------------------------------------------------------------------- /// post json to httpbin, get it back in the response body, return deserialized -fn step_x_v1(data: SomeData) -> Box> { - let mut connector = client::Connector::new().service(); - +fn step_x_v1( + data: SomeData, + client: &Client, +) -> Box> { Box::new( - client::ClientRequest::post("https://httpbin.org/post") - .json(data) - .unwrap() - .send(&mut connector) + client + .post("https://httpbin.org/post") + .send_json(data) .map_err(Error::from) // <- convert SendRequestError to an Error .and_then(|resp| { resp // <- this is MessageBody type, resolves to complete body @@ -81,17 +81,20 @@ fn step_x_v1(data: SomeData) -> Box> { fn create_something_v1( some_data: web::Json, + client: web::Data, ) -> Box> { - Box::new(step_x_v1(some_data.into_inner()).and_then(|some_data_2| { - step_x_v1(some_data_2).and_then(|some_data_3| { - step_x_v1(some_data_3).and_then(|d| { - Ok(HttpResponse::Ok() - .content_type("application/json") - .body(serde_json::to_string(&d).unwrap()) - .into()) + Box::new( + step_x_v1(some_data.into_inner(), &client).and_then(move |some_data_2| { + step_x_v1(some_data_2, &client).and_then(move |some_data_3| { + step_x_v1(some_data_3, &client).and_then(|d| { + Ok(HttpResponse::Ok() + .content_type("application/json") + .body(serde_json::to_string(&d).unwrap()) + .into()) + }) }) - }) - })) + }), + ) } // --------------------------------------------------------------- @@ -99,13 +102,13 @@ fn create_something_v1( // --------------------------------------------------------------- /// post json to httpbin, get it back in the response body, return deserialized -fn step_x_v2(data: SomeData) -> impl Future { - let mut connector = client::Connector::new().service(); - - client::ClientRequest::post("https://httpbin.org/post") - .json(data) - .unwrap() - .send(&mut connector) +fn step_x_v2( + data: SomeData, + client: &Client, +) -> impl Future { + client + .post("https://httpbin.org/post") + .send_json(data) .map_err(Error::from) // <- convert SendRequestError to an Error .and_then(|resp| { resp.from_err() @@ -122,10 +125,11 @@ fn step_x_v2(data: SomeData) -> impl Future { fn create_something_v2( some_data: web::Json, + client: web::Data, ) -> impl Future { - step_x_v2(some_data.into_inner()).and_then(|some_data_2| { - step_x_v2(some_data_2).and_then(|some_data_3| { - step_x_v2(some_data_3).and_then(|d| { + step_x_v2(some_data.into_inner(), &client).and_then(move |some_data_2| { + step_x_v2(some_data_2, &client).and_then(move |some_data_3| { + step_x_v2(some_data_3, &client).and_then(|d| { Ok(HttpResponse::Ok() .content_type("application/json") .body(serde_json::to_string(&d).unwrap()) @@ -141,6 +145,7 @@ fn main() -> io::Result<()> { HttpServer::new(|| { App::new() + .data(Client::default()) .service( web::resource("/something_v1") .route(web::post().to(create_something_v1)), diff --git a/http-full-proxy/Cargo.toml b/http-full-proxy/Cargo.toml index f9363344..b7686cfe 100644 --- a/http-full-proxy/Cargo.toml +++ b/http-full-proxy/Cargo.toml @@ -2,10 +2,13 @@ name = "http-full-proxy" version = "0.1.0" authors = ["Rotem Yaari"] +workspace = ".." +edition = "2018" [dependencies] -actix = "0.7.5" -actix-web = "0.7.13" +actix-rt = "0.2" +actix-web = { git="https://github.com/actix/actix-web.git" } + clap = "2.32.0" futures = "0.1.25" failure = "0.1.3" diff --git a/http-full-proxy/src/main.rs b/http-full-proxy/src/main.rs index 05bfb394..f350769e 100644 --- a/http-full-proxy/src/main.rs +++ b/http-full-proxy/src/main.rs @@ -1,85 +1,53 @@ -#![deny(warnings)] -extern crate actix; -extern crate actix_web; -extern crate clap; -extern crate failure; -extern crate futures; -extern crate url; - -use actix_web::{ - client, http, server, App, AsyncResponder, Error, HttpMessage, HttpRequest, - HttpResponse, -}; +use actix_web::client::Client; +use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; use clap::{value_t, Arg}; -use futures::{future, Future}; +use futures::Future; use std::net::ToSocketAddrs; use url::Url; -struct AppState { - forward_url: Url, -} - -impl AppState { - pub fn init(forward_url: Url) -> AppState { - AppState { forward_url } - } -} - fn forward( - req: &HttpRequest, -) -> Box> { - let mut new_url = req.state().forward_url.clone(); + req: HttpRequest, + payload: web::Payload, + url: web::Data, + client: web::Data, +) -> impl Future { + let mut new_url = url.get_ref().clone(); new_url.set_path(req.uri().path()); new_url.set_query(req.uri().query()); - let mut forwarded_req = client::ClientRequest::build_from(req) - .no_default_headers() - .uri(new_url) - .streaming(req.payload()) - .unwrap(); + let forwarded_req = client + .request_from(new_url.as_str(), &req) + .no_default_headers(); - if let Some(addr) = req.peer_addr() { - match forwarded_req.headers_mut().entry("x-forwarded-for") { - Ok(http::header::Entry::Vacant(entry)) => { - let addr = format!("{}", addr.ip()); - entry.insert(addr.parse().unwrap()); - } - Ok(http::header::Entry::Occupied(mut entry)) => { - let addr = format!("{}, {}", entry.get().to_str().unwrap(), addr.ip()); - entry.insert(addr.parse().unwrap()); - } - _ => unreachable!(), - } - } + // if let Some(addr) = req.peer_addr() { + // match forwarded_req.headers_mut().entry("x-forwarded-for") { + // Ok(http::header::Entry::Vacant(entry)) => { + // let addr = format!("{}", addr.ip()); + // entry.insert(addr.parse().unwrap()); + // } + // Ok(http::header::Entry::Occupied(mut entry)) => { + // let addr = format!("{}, {}", entry.get().to_str().unwrap(), addr.ip()); + // entry.insert(addr.parse().unwrap()); + // } + // _ => unreachable!(), + // } + // } forwarded_req - .send() + .send_stream(payload) .map_err(Error::from) - .and_then(construct_response) - .responder() + .map(|res| { + let mut client_resp = HttpResponse::build(res.status()); + for (header_name, header_value) in + res.headers().iter().filter(|(h, _)| *h != "connection") + { + client_resp.header(header_name.clone(), header_value.clone()); + } + client_resp.streaming(res) + }) } -fn construct_response( - resp: client::ClientResponse, -) -> Box> { - let mut client_resp = HttpResponse::build(resp.status()); - for (header_name, header_value) in - resp.headers().iter().filter(|(h, _)| *h != "connection") - { - client_resp.header(header_name.clone(), header_value.clone()); - } - if resp.chunked().unwrap_or(false) { - Box::new(future::ok(client_resp.streaming(resp.payload()))) - } else { - Box::new( - resp.body() - .from_err() - .and_then(move |body| Ok(client_resp.body(body))), - ) - } -} - -fn main() { +fn main() -> std::io::Result<()> { let matches = clap::App::new("HTTP Proxy") .arg( Arg::with_name("listen_addr") @@ -128,14 +96,13 @@ fn main() { )) .unwrap(); - server::new(move || { - App::with_state(AppState::init(forward_url.clone())).default_resource(|r| { - r.f(forward); - }) + HttpServer::new(move || { + App::new() + .data(forward_url.clone()) + .wrap(middleware::Logger::default()) + .default_resource(|r| r.to_async(forward)) }) - .workers(32) - .bind((listen_addr, listen_port)) - .expect("Cannot bind listening port") + .bind((listen_addr, listen_port))? .system_exit() - .run(); + .run() } diff --git a/http-proxy/Cargo.toml b/http-proxy/Cargo.toml index b06a5946..ca1176d4 100644 --- a/http-proxy/Cargo.toml +++ b/http-proxy/Cargo.toml @@ -2,7 +2,8 @@ name = "http-proxy" version = "0.1.0" authors = ["Nikolay Kim "] -workspace = "../" +edition = "2018" +workspace = ".." [[bin]] name = "proxy" @@ -13,8 +14,8 @@ name = "proxy-example-server" path = "src/server.rs" [dependencies] +actix-rt = "0.2" +actix-web = { git="https://github.com/actix/actix-web.git" } + env_logger = "0.5" futures = "0.1" - -actix = "0.7" -actix-web = "^0.7" diff --git a/http-proxy/src/main.rs b/http-proxy/src/main.rs index 9d51aceb..080c848d 100644 --- a/http-proxy/src/main.rs +++ b/http-proxy/src/main.rs @@ -1,19 +1,11 @@ -extern crate actix; -extern crate actix_web; -extern crate env_logger; -extern crate futures; - -use actix_web::{ - client, middleware, server, App, AsyncResponder, Body, Error, HttpMessage, - HttpRequest, HttpResponse, -}; -use futures::{Future, Stream}; +use actix_web::client::Client; +use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer}; +use futures::Future; /// Stream client request response and then send body to a server response -fn index(_req: &HttpRequest) -> Box> { - client::ClientRequest::get("http://127.0.0.1:8081/") - .finish() - .unwrap() +fn index(client: web::Data) -> impl Future { + client + .get("http://127.0.0.1:8081/") .send() .map_err(Error::from) // <- convert SendRequestError to an Error .and_then(|resp| { @@ -24,15 +16,15 @@ fn index(_req: &HttpRequest) -> Box> Ok(HttpResponse::Ok().body(body)) }) }) - .responder() } /// streaming client request to a streaming server response -fn streaming(_req: &HttpRequest) -> Box> { +fn streaming( + client: web::Data, +) -> impl Future> { // send client request - client::ClientRequest::get("https://www.rust-lang.org/en-US/") - .finish() - .unwrap() + client + .get("https://www.rust-lang.org/en-US/") .send() // <- connect to host and send request .map_err(Error::from) // <- convert SendRequestError to an Error .and_then(|resp| { @@ -40,27 +32,21 @@ fn streaming(_req: &HttpRequest) -> Box std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_server=info,actix_web=trace"); env_logger::init(); - let sys = actix::System::new("http-proxy"); - server::new(|| { + HttpServer::new(|| { App::new() - .middleware(middleware::Logger::default()) - .resource("/streaming", |r| r.f(streaming)) - .resource("/", |r| r.f(index)) + .data(Client::new()) + .wrap(middleware::Logger::default()) + .service(web::resource("/streaming").to_async(streaming)) + .service(web::resource("/").to_async(index)) }) - .workers(1) - .bind("127.0.0.1:8080") - .unwrap() - .start(); - - println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); + .bind("127.0.0.1:8080")? + .run() } diff --git a/http-proxy/src/server.rs b/http-proxy/src/server.rs index 11584def..a8e7559e 100644 --- a/http-proxy/src/server.rs +++ b/http-proxy/src/server.rs @@ -1,35 +1,20 @@ -extern crate actix; -extern crate actix_web; -extern crate env_logger; -extern crate futures; +use actix_web::{middleware, web, App, HttpResponse, HttpServer, Responder}; -use actix_web::*; -use futures::Future; - -fn index(req: &HttpRequest) -> FutureResponse { - req.body() - .from_err() - .map(|bytes| HttpResponse::Ok().body(bytes)) - .responder() +fn index(body: web::Bytes) -> impl Responder { + HttpResponse::Ok().body(body) } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=error"); - let _ = env_logger::init(); - let sys = actix::System::new("ws-example"); +fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_server=info,actix_web=trace"); + env_logger::init(); - server::new(|| { + HttpServer::new(|| { App::new() // enable logger - .middleware(middleware::Logger::default()) - .resource("/index.html", |r| r.f(|_| "Hello world!")) - .resource("/", |r| r.f(index)) + .wrap(middleware::Logger::default()) + .service(web::resource("/index.html").to(|| "Hello world!")) + .service(web::resource("/").to(index)) }) - .workers(1) - .bind("127.0.0.1:8081") - .unwrap() - .start(); - - println!("Started http server: 127.0.0.1:8081"); - let _ = sys.run(); + .bind("127.0.0.1:8081")? + .run() }