1
0
mirror of https://github.com/actix/examples synced 2025-01-22 14:05:55 +01:00

migrate http proxy examples

This commit is contained in:
Nikolay Kim 2019-03-26 23:33:13 -07:00
parent 53fc2221ef
commit 48b8e7c335
9 changed files with 125 additions and 177 deletions

View File

@ -15,7 +15,8 @@ failure = "0.1.1"
futures = "0.1"
num_cpus = "1.10.0"
r2d2 = "0.8.2"
r2d2_sqlite = "0.5.0"
r2d2_sqlite = "0.8.0"
rusqlite = "0.16"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"

View File

@ -3,6 +3,7 @@ use failure::Error;
use futures::Future;
use r2d2;
use r2d2_sqlite;
use rusqlite::NO_PARAMS;
use serde_derive::{Deserialize, Serialize};
use std::{thread::sleep, time::Duration};
@ -47,7 +48,7 @@ fn get_hottest_years(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt
.query_map(&[], |row| WeatherAgg::AnnualAgg {
.query_map(NO_PARAMS, |row| WeatherAgg::AnnualAgg {
year: row.get(0),
total: row.get(1),
})
@ -73,7 +74,7 @@ fn get_coldest_years(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt
.query_map(&[], |row| WeatherAgg::AnnualAgg {
.query_map(NO_PARAMS, |row| WeatherAgg::AnnualAgg {
year: row.get(0),
total: row.get(1),
})
@ -99,7 +100,7 @@ fn get_hottest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt
.query_map(&[], |row| WeatherAgg::MonthAgg {
.query_map(NO_PARAMS, |row| WeatherAgg::MonthAgg {
year: row.get(0),
month: row.get(1),
total: row.get(2),
@ -125,7 +126,7 @@ fn get_coldest_months(conn: Connection) -> Result<Vec<WeatherAgg>, Error> {
let mut prep_stmt = conn.prepare(stmt)?;
let annuals = prep_stmt
.query_map(&[], |row| WeatherAgg::MonthAgg {
.query_map(NO_PARAMS, |row| WeatherAgg::MonthAgg {
year: row.get(0),
month: row.get(1),
total: row.get(2),

View File

@ -7,7 +7,6 @@ workspace = ".."
[dependencies]
actix-rt = "0.2"
actix-http = { git="https://github.com/actix/actix-http.git", features=["ssl"] }
actix-web = { git="https://github.com/actix/actix-web.git", features=["ssl"] }
futures = "0.1"

View File

@ -23,7 +23,7 @@ extern crate serde_derive;
use std::collections::HashMap;
use std::io;
use actix_http::client;
use actix_web::client::Client;
use actix_web::web::BytesMut;
use actix_web::{web, App, Error, HttpResponse, HttpServer};
use futures::{Future, Stream};
@ -54,14 +54,14 @@ struct HttpBinResponse {
// -----------------------------------------------------------------------
/// post json to httpbin, get it back in the response body, return deserialized
fn step_x_v1(data: SomeData) -> Box<Future<Item = SomeData, Error = Error>> {
let mut connector = client::Connector::new().service();
fn step_x_v1(
data: SomeData,
client: &Client,
) -> Box<Future<Item = SomeData, Error = Error>> {
Box::new(
client::ClientRequest::post("https://httpbin.org/post")
.json(data)
.unwrap()
.send(&mut connector)
client
.post("https://httpbin.org/post")
.send_json(data)
.map_err(Error::from) // <- convert SendRequestError to an Error
.and_then(|resp| {
resp // <- this is MessageBody type, resolves to complete body
@ -81,17 +81,20 @@ fn step_x_v1(data: SomeData) -> Box<Future<Item = SomeData, Error = Error>> {
fn create_something_v1(
some_data: web::Json<SomeData>,
client: web::Data<Client>,
) -> Box<Future<Item = HttpResponse, Error = Error>> {
Box::new(step_x_v1(some_data.into_inner()).and_then(|some_data_2| {
step_x_v1(some_data_2).and_then(|some_data_3| {
step_x_v1(some_data_3).and_then(|d| {
Ok(HttpResponse::Ok()
.content_type("application/json")
.body(serde_json::to_string(&d).unwrap())
.into())
Box::new(
step_x_v1(some_data.into_inner(), &client).and_then(move |some_data_2| {
step_x_v1(some_data_2, &client).and_then(move |some_data_3| {
step_x_v1(some_data_3, &client).and_then(|d| {
Ok(HttpResponse::Ok()
.content_type("application/json")
.body(serde_json::to_string(&d).unwrap())
.into())
})
})
})
}))
}),
)
}
// ---------------------------------------------------------------
@ -99,13 +102,13 @@ fn create_something_v1(
// ---------------------------------------------------------------
/// post json to httpbin, get it back in the response body, return deserialized
fn step_x_v2(data: SomeData) -> impl Future<Item = SomeData, Error = Error> {
let mut connector = client::Connector::new().service();
client::ClientRequest::post("https://httpbin.org/post")
.json(data)
.unwrap()
.send(&mut connector)
fn step_x_v2(
data: SomeData,
client: &Client,
) -> impl Future<Item = SomeData, Error = Error> {
client
.post("https://httpbin.org/post")
.send_json(data)
.map_err(Error::from) // <- convert SendRequestError to an Error
.and_then(|resp| {
resp.from_err()
@ -122,10 +125,11 @@ fn step_x_v2(data: SomeData) -> impl Future<Item = SomeData, Error = Error> {
fn create_something_v2(
some_data: web::Json<SomeData>,
client: web::Data<Client>,
) -> impl Future<Item = HttpResponse, Error = Error> {
step_x_v2(some_data.into_inner()).and_then(|some_data_2| {
step_x_v2(some_data_2).and_then(|some_data_3| {
step_x_v2(some_data_3).and_then(|d| {
step_x_v2(some_data.into_inner(), &client).and_then(move |some_data_2| {
step_x_v2(some_data_2, &client).and_then(move |some_data_3| {
step_x_v2(some_data_3, &client).and_then(|d| {
Ok(HttpResponse::Ok()
.content_type("application/json")
.body(serde_json::to_string(&d).unwrap())
@ -141,6 +145,7 @@ fn main() -> io::Result<()> {
HttpServer::new(|| {
App::new()
.data(Client::default())
.service(
web::resource("/something_v1")
.route(web::post().to(create_something_v1)),

View File

@ -2,10 +2,13 @@
name = "http-full-proxy"
version = "0.1.0"
authors = ["Rotem Yaari"]
workspace = ".."
edition = "2018"
[dependencies]
actix = "0.7.5"
actix-web = "0.7.13"
actix-rt = "0.2"
actix-web = { git="https://github.com/actix/actix-web.git" }
clap = "2.32.0"
futures = "0.1.25"
failure = "0.1.3"

View File

@ -1,85 +1,53 @@
#![deny(warnings)]
extern crate actix;
extern crate actix_web;
extern crate clap;
extern crate failure;
extern crate futures;
extern crate url;
use actix_web::{
client, http, server, App, AsyncResponder, Error, HttpMessage, HttpRequest,
HttpResponse,
};
use actix_web::client::Client;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use clap::{value_t, Arg};
use futures::{future, Future};
use futures::Future;
use std::net::ToSocketAddrs;
use url::Url;
struct AppState {
forward_url: Url,
}
impl AppState {
pub fn init(forward_url: Url) -> AppState {
AppState { forward_url }
}
}
fn forward(
req: &HttpRequest<AppState>,
) -> Box<Future<Item = HttpResponse, Error = Error>> {
let mut new_url = req.state().forward_url.clone();
req: HttpRequest,
payload: web::Payload,
url: web::Data<Url>,
client: web::Data<Client>,
) -> impl Future<Item = HttpResponse, Error = Error> {
let mut new_url = url.get_ref().clone();
new_url.set_path(req.uri().path());
new_url.set_query(req.uri().query());
let mut forwarded_req = client::ClientRequest::build_from(req)
.no_default_headers()
.uri(new_url)
.streaming(req.payload())
.unwrap();
let forwarded_req = client
.request_from(new_url.as_str(), &req)
.no_default_headers();
if let Some(addr) = req.peer_addr() {
match forwarded_req.headers_mut().entry("x-forwarded-for") {
Ok(http::header::Entry::Vacant(entry)) => {
let addr = format!("{}", addr.ip());
entry.insert(addr.parse().unwrap());
}
Ok(http::header::Entry::Occupied(mut entry)) => {
let addr = format!("{}, {}", entry.get().to_str().unwrap(), addr.ip());
entry.insert(addr.parse().unwrap());
}
_ => unreachable!(),
}
}
// if let Some(addr) = req.peer_addr() {
// match forwarded_req.headers_mut().entry("x-forwarded-for") {
// Ok(http::header::Entry::Vacant(entry)) => {
// let addr = format!("{}", addr.ip());
// entry.insert(addr.parse().unwrap());
// }
// Ok(http::header::Entry::Occupied(mut entry)) => {
// let addr = format!("{}, {}", entry.get().to_str().unwrap(), addr.ip());
// entry.insert(addr.parse().unwrap());
// }
// _ => unreachable!(),
// }
// }
forwarded_req
.send()
.send_stream(payload)
.map_err(Error::from)
.and_then(construct_response)
.responder()
.map(|res| {
let mut client_resp = HttpResponse::build(res.status());
for (header_name, header_value) in
res.headers().iter().filter(|(h, _)| *h != "connection")
{
client_resp.header(header_name.clone(), header_value.clone());
}
client_resp.streaming(res)
})
}
fn construct_response(
resp: client::ClientResponse,
) -> Box<dyn Future<Item = HttpResponse, Error = Error>> {
let mut client_resp = HttpResponse::build(resp.status());
for (header_name, header_value) in
resp.headers().iter().filter(|(h, _)| *h != "connection")
{
client_resp.header(header_name.clone(), header_value.clone());
}
if resp.chunked().unwrap_or(false) {
Box::new(future::ok(client_resp.streaming(resp.payload())))
} else {
Box::new(
resp.body()
.from_err()
.and_then(move |body| Ok(client_resp.body(body))),
)
}
}
fn main() {
fn main() -> std::io::Result<()> {
let matches = clap::App::new("HTTP Proxy")
.arg(
Arg::with_name("listen_addr")
@ -128,14 +96,13 @@ fn main() {
))
.unwrap();
server::new(move || {
App::with_state(AppState::init(forward_url.clone())).default_resource(|r| {
r.f(forward);
})
HttpServer::new(move || {
App::new()
.data(forward_url.clone())
.wrap(middleware::Logger::default())
.default_resource(|r| r.to_async(forward))
})
.workers(32)
.bind((listen_addr, listen_port))
.expect("Cannot bind listening port")
.bind((listen_addr, listen_port))?
.system_exit()
.run();
.run()
}

View File

@ -2,7 +2,8 @@
name = "http-proxy"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../"
edition = "2018"
workspace = ".."
[[bin]]
name = "proxy"
@ -13,8 +14,8 @@ name = "proxy-example-server"
path = "src/server.rs"
[dependencies]
actix-rt = "0.2"
actix-web = { git="https://github.com/actix/actix-web.git" }
env_logger = "0.5"
futures = "0.1"
actix = "0.7"
actix-web = "^0.7"

View File

@ -1,19 +1,11 @@
extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
use actix_web::{
client, middleware, server, App, AsyncResponder, Body, Error, HttpMessage,
HttpRequest, HttpResponse,
};
use futures::{Future, Stream};
use actix_web::client::Client;
use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer};
use futures::Future;
/// Stream client request response and then send body to a server response
fn index(_req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
client::ClientRequest::get("http://127.0.0.1:8081/")
.finish()
.unwrap()
fn index(client: web::Data<Client>) -> impl Future<Item = HttpResponse, Error = Error> {
client
.get("http://127.0.0.1:8081/")
.send()
.map_err(Error::from) // <- convert SendRequestError to an Error
.and_then(|resp| {
@ -24,15 +16,15 @@ fn index(_req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>>
Ok(HttpResponse::Ok().body(body))
})
})
.responder()
}
/// streaming client request to a streaming server response
fn streaming(_req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Error>> {
fn streaming(
client: web::Data<Client>,
) -> impl Future<Item = HttpResponse, Error = impl Into<Error>> {
// send client request
client::ClientRequest::get("https://www.rust-lang.org/en-US/")
.finish()
.unwrap()
client
.get("https://www.rust-lang.org/en-US/")
.send() // <- connect to host and send request
.map_err(Error::from) // <- convert SendRequestError to an Error
.and_then(|resp| {
@ -40,27 +32,21 @@ fn streaming(_req: &HttpRequest) -> Box<Future<Item = HttpResponse, Error = Erro
Ok(HttpResponse::Ok()
// read one chunk from client response and send this chunk to a server response
// .from_err() converts PayloadError to an Error
.body(Body::Streaming(Box::new(resp.payload().from_err()))))
.streaming(resp))
})
.responder()
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_server=info,actix_web=trace");
env_logger::init();
let sys = actix::System::new("http-proxy");
server::new(|| {
HttpServer::new(|| {
App::new()
.middleware(middleware::Logger::default())
.resource("/streaming", |r| r.f(streaming))
.resource("/", |r| r.f(index))
.data(Client::new())
.wrap(middleware::Logger::default())
.service(web::resource("/streaming").to_async(streaming))
.service(web::resource("/").to_async(index))
})
.workers(1)
.bind("127.0.0.1:8080")
.unwrap()
.start();
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run();
.bind("127.0.0.1:8080")?
.run()
}

View File

@ -1,35 +1,20 @@
extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
use actix_web::{middleware, web, App, HttpResponse, HttpServer, Responder};
use actix_web::*;
use futures::Future;
fn index(req: &HttpRequest) -> FutureResponse<HttpResponse> {
req.body()
.from_err()
.map(|bytes| HttpResponse::Ok().body(bytes))
.responder()
fn index(body: web::Bytes) -> impl Responder {
HttpResponse::Ok().body(body)
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=error");
let _ = env_logger::init();
let sys = actix::System::new("ws-example");
fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_server=info,actix_web=trace");
env_logger::init();
server::new(|| {
HttpServer::new(|| {
App::new()
// enable logger
.middleware(middleware::Logger::default())
.resource("/index.html", |r| r.f(|_| "Hello world!"))
.resource("/", |r| r.f(index))
.wrap(middleware::Logger::default())
.service(web::resource("/index.html").to(|| "Hello world!"))
.service(web::resource("/").to(index))
})
.workers(1)
.bind("127.0.0.1:8081")
.unwrap()
.start();
println!("Started http server: 127.0.0.1:8081");
let _ = sys.run();
.bind("127.0.0.1:8081")?
.run()
}