1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

add reqwest alternative to http proxy

This commit is contained in:
Rob Ede 2023-01-01 20:22:50 +00:00
parent d6b3d2d57e
commit 7ca1024742
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
3 changed files with 85 additions and 6 deletions

6
Cargo.lock generated
View File

@ -3419,7 +3419,11 @@ dependencies = [
"awc", "awc",
"clap", "clap",
"env_logger", "env_logger",
"futures-util",
"log", "log",
"reqwest 0.11.13",
"tokio 1.23.0",
"tokio-stream",
"url", "url",
] ]
@ -5648,6 +5652,7 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"tokio 1.23.0", "tokio 1.23.0",
"tokio-native-tls", "tokio-native-tls",
"tokio-util 0.7.4",
"tower-service", "tower-service",
"url", "url",
"wasm-bindgen", "wasm-bindgen",
@ -7290,6 +7295,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite 0.2.9", "pin-project-lite 0.2.9",
"tokio 1.23.0", "tokio 1.23.0",
"tokio-util 0.7.4",
] ]
[[package]] [[package]]

View File

@ -9,5 +9,9 @@ awc = "3"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
env_logger.workspace = true env_logger.workspace = true
futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
log.workspace = true log.workspace = true
reqwest = { version = "0.11", features = ["stream"] }
tokio = { version = "1.13.1", features = ["sync"] }
tokio-stream = { version = "0.1.3", features = ["sync"] }
url = "2.2" url = "2.2"

View File

@ -1,27 +1,39 @@
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use actix_web::{error, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web::{
dev::PeerAddr, error, http::Method, middleware, web, App, Error, HttpRequest, HttpResponse,
HttpServer,
};
use awc::Client; use awc::Client;
use clap::Parser; use clap::Parser;
use futures_util::StreamExt as _;
use tokio_stream::wrappers::UnboundedReceiverStream;
use url::Url; use url::Url;
const REQWEST_PREFIX: &str = "/using-reqwest";
/// Forwards the incoming HTTP request using `awc`.
async fn forward( async fn forward(
req: HttpRequest, req: HttpRequest,
payload: web::Payload, payload: web::Payload,
peer_addr: Option<PeerAddr>,
url: web::Data<Url>, url: web::Data<Url>,
client: web::Data<Client>, client: web::Data<Client>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let mut new_url = url.get_ref().clone(); let mut new_url = (**url).clone();
new_url.set_path(req.uri().path()); new_url.set_path(req.uri().path());
new_url.set_query(req.uri().query()); new_url.set_query(req.uri().query());
// TODO: This forwarded implementation is incomplete as it only handles the unofficial
// X-Forwarded-For header but not the official Forwarded one.
let forwarded_req = client let forwarded_req = client
.request_from(new_url.as_str(), req.head()) .request_from(new_url.as_str(), req.head())
.no_decompress(); .no_decompress();
let forwarded_req = match req.head().peer_addr {
Some(addr) => forwarded_req.insert_header(("x-forwarded-for", format!("{}", addr.ip()))), // TODO: This forwarded implementation is incomplete as it only handles the unofficial
// X-Forwarded-For header but not the official Forwarded one.
let forwarded_req = match peer_addr {
Some(PeerAddr(addr)) => {
forwarded_req.insert_header(("x-forwarded-for", addr.ip().to_string()))
}
None => forwarded_req, None => forwarded_req,
}; };
@ -40,6 +52,59 @@ async fn forward(
Ok(client_resp.streaming(res)) Ok(client_resp.streaming(res))
} }
/// Same as `forward` but uses `reqwest` as the client used to forward the request.
async fn forward_reqwest(
req: HttpRequest,
mut payload: web::Payload,
method: Method,
peer_addr: Option<PeerAddr>,
url: web::Data<Url>,
client: web::Data<reqwest::Client>,
) -> Result<HttpResponse, Error> {
let path = req
.uri()
.path()
.strip_prefix(REQWEST_PREFIX)
.unwrap_or(req.uri().path());
let mut new_url = (**url).clone();
new_url.set_path(path);
new_url.set_query(req.uri().query());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
actix_web::rt::spawn(async move {
while let Some(chunk) = payload.next().await {
tx.send(chunk).unwrap();
}
});
let forwarded_req = client
.request(method, new_url)
.body(reqwest::Body::wrap_stream(UnboundedReceiverStream::new(rx)));
// TODO: This forwarded implementation is incomplete as it only handles the unofficial
// X-Forwarded-For header but not the official Forwarded one.
let forwarded_req = match peer_addr {
Some(PeerAddr(addr)) => forwarded_req.header("x-forwarded-for", addr.ip().to_string()),
None => forwarded_req,
};
let res = forwarded_req
.send()
.await
.map_err(error::ErrorInternalServerError)?;
let mut client_resp = HttpResponse::build(res.status());
// Remove `Connection` as per
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection#Directives
for (header_name, header_value) in res.headers().iter().filter(|(h, _)| *h != "connection") {
client_resp.insert_header((header_name.clone(), header_value.clone()));
}
Ok(client_resp.streaming(res.bytes_stream()))
}
#[derive(clap::Parser, Debug)] #[derive(clap::Parser, Debug)]
struct CliArguments { struct CliArguments {
listen_addr: String, listen_addr: String,
@ -70,11 +135,15 @@ async fn main() -> std::io::Result<()> {
log::info!("forwarding to {forward_url}"); log::info!("forwarding to {forward_url}");
let reqwest_client = reqwest::Client::default();
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(web::Data::new(Client::default())) .app_data(web::Data::new(Client::default()))
.app_data(web::Data::new(reqwest_client.clone()))
.app_data(web::Data::new(forward_url.clone())) .app_data(web::Data::new(forward_url.clone()))
.wrap(middleware::Logger::default()) .wrap(middleware::Logger::default())
.service(web::scope(REQWEST_PREFIX).default_service(web::to(forward_reqwest)))
.default_service(web::to(forward)) .default_service(web::to(forward))
}) })
.bind((args.listen_addr, args.listen_port))? .bind((args.listen_addr, args.listen_port))?