Update futures, actix and request to next major version, refactor to use async/await

This commit is contained in:
Valentin Brandl 2020-02-12 19:45:18 +01:00
parent c7cde6222a
commit dfc49c6b64
No known key found for this signature in database
GPG Key ID: 30D341DD34118D7D
6 changed files with 279 additions and 595 deletions

700
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,11 +16,12 @@ log = "0.4.8"
log4rs = "0.8.3" log4rs = "0.8.3"
number_prefix = "0.3.0" number_prefix = "0.3.0"
openssl-probe = "0.1.2" openssl-probe = "0.1.2"
reqwest = "0.9.22" reqwest = "0.10.1"
serde = "1.0.103" serde = "1.0.103"
serde_derive = "1.0.103" serde_derive = "1.0.103"
serde_json = "1.0.42" serde_json = "1.0.42"
structopt = "0.3.5" structopt = "0.3.5"
actix-rt = "1.0.0"
[build-dependencies] [build-dependencies]
ructe = "0.8.0" ructe = "0.8.0"

View File

@ -48,7 +48,7 @@ pub(crate) struct Opt {
pub(crate) logfile: PathBuf, pub(crate) logfile: PathBuf,
} }
pub(crate) fn init() -> Result<()> { pub(crate) async fn init() -> Result<()> {
std::env::set_var("RUST_LOG", "actix_web=info,hoc=info"); std::env::set_var("RUST_LOG", "actix_web=info,hoc=info");
// pretty_env_logger::init(); // pretty_env_logger::init();
openssl_probe::init_ssl_cert_env_vars(); openssl_probe::init_ssl_cert_env_vars();

View File

@ -44,10 +44,6 @@ impl ResponseError for Error {
.content_type("text/html") .content_type("text/html")
.body(buf) .body(buf)
} }
fn render_response(&self) -> HttpResponse {
self.error_response()
}
} }
impl std::error::Error for Error {} impl std::error::Error for Error {}

View File

@ -25,13 +25,11 @@ use crate::{
template::RepoInfo, template::RepoInfo,
}; };
use actix_web::{ use actix_web::{
error::ErrorBadRequest,
http::header::{CacheControl, CacheDirective, Expires}, http::header::{CacheControl, CacheDirective, Expires},
middleware, web, App, HttpResponse, HttpServer, middleware, web, App, HttpResponse, HttpServer,
}; };
use badge::{Badge, BadgeOptions}; use badge::{Badge, BadgeOptions};
use bytes::Bytes; use futures::future::Future;
use futures::{unsync::mpsc, Future, Stream};
use git2::Repository; use git2::Repository;
use number_prefix::{NumberPrefix, Prefixed, Standalone}; use number_prefix::{NumberPrefix, Prefixed, Standalone};
use std::{ use std::{
@ -139,12 +137,12 @@ fn hoc(repo: &str, repo_dir: &str, cache_dir: &str) -> Result<(u64, String, u64)
Ok((cache.count, head, commits)) Ok((cache.count, head, commits))
} }
fn remote_exists(url: &str) -> impl Future<Item = bool, Error = Error> { async fn remote_exists(url: &str) -> Result<bool> {
CLIENT let resp = CLIENT.head(url).send().await?;
.head(url) Ok(resp.status() == reqwest::StatusCode::OK)
.send()
.map(|resp| resp.status() == reqwest::StatusCode::OK) // .map(|resp| resp.status() == reqwest::StatusCode::OK)
.from_err() // .from_err()
} }
enum HocResult { enum HocResult {
@ -160,11 +158,11 @@ enum HocResult {
NotFound, NotFound,
} }
fn handle_hoc_request<T, F>( async fn handle_hoc_request<T, F>(
state: web::Data<Arc<State>>, state: web::Data<Arc<State>>,
data: web::Path<(String, String)>, data: web::Path<(String, String)>,
mapper: F, mapper: F,
) -> impl Future<Item = HttpResponse, Error = Error> ) -> Result<HttpResponse>
where where
T: Service, T: Service,
F: Fn(HocResult) -> Result<HttpResponse>, F: Fn(HocResult) -> Result<HttpResponse>,
@ -173,44 +171,43 @@ where
let service_path = format!("{}/{}", T::domain(), repo); let service_path = format!("{}/{}", T::domain(), repo);
let path = format!("{}/{}", state.repos, service_path); let path = format!("{}/{}", state.repos, service_path);
let url = format!("https://{}", service_path); let url = format!("https://{}", service_path);
remote_exists(&url) let remote_exists = remote_exists(&url).await?;
.and_then(move |remote_exists| { let file = Path::new(&path);
let file = Path::new(&path); if !file.exists() {
if !file.exists() { if !remote_exists {
if !remote_exists { warn!("Repository does not exist: {}", url);
warn!("Repository does not exist: {}", url); return mapper(HocResult::NotFound);
return Ok(HocResult::NotFound); }
} info!("Cloning {} for the first time", url);
info!("Cloning {} for the first time", url); create_dir_all(file)?;
create_dir_all(file)?; let repo = Repository::init_bare(file)?;
let repo = Repository::init_bare(file)?; repo.remote_add_fetch("origin", "refs/heads/*:refs/heads/*")?;
repo.remote_add_fetch("origin", "refs/heads/*:refs/heads/*")?; repo.remote_set_url("origin", &url)?;
repo.remote_set_url("origin", &url)?; REPO_COUNT.fetch_add(1, Ordering::Relaxed);
REPO_COUNT.fetch_add(1, Ordering::Relaxed); }
} pull(&path)?;
pull(&path)?; let (hoc, head, commits) = hoc(&service_path, &state.repos, &state.cache)?;
let (hoc, head, commits) = hoc(&service_path, &state.repos, &state.cache)?; let hoc_pretty = match NumberPrefix::decimal(hoc as f64) {
let hoc_pretty = match NumberPrefix::decimal(hoc as f64) { Standalone(hoc) => hoc.to_string(),
Standalone(hoc) => hoc.to_string(), Prefixed(prefix, hoc) => format!("{:.1}{}", hoc, prefix),
Prefixed(prefix, hoc) => format!("{:.1}{}", hoc, prefix), };
}; let res = HocResult::Hoc {
Ok(HocResult::Hoc { hoc,
hoc, commits,
commits, hoc_pretty,
hoc_pretty, head: head.to_string(),
head: head.to_string(), url,
url, repo,
repo, service_path,
service_path, };
}) mapper(res)
}) // .and_then(mapper)
.and_then(mapper)
} }
fn json_hoc<T: Service>( fn json_hoc<T: Service>(
state: web::Data<Arc<State>>, state: web::Data<Arc<State>>,
data: web::Path<(String, String)>, data: web::Path<(String, String)>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Output = Result<HttpResponse>> {
let mapper = |r| match r { let mapper = |r| match r {
HocResult::NotFound => p404(), HocResult::NotFound => p404(),
HocResult::Hoc { HocResult::Hoc {
@ -227,8 +224,8 @@ fn json_hoc<T: Service>(
fn calculate_hoc<T: Service>( fn calculate_hoc<T: Service>(
state: web::Data<Arc<State>>, state: web::Data<Arc<State>>,
data: web::Path<(String, String)>, data: web::Path<(String, String)>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Output = Result<HttpResponse>> {
let mapper = |r| match r { let mapper = move |r| match r {
HocResult::NotFound => p404(), HocResult::NotFound => p404(),
HocResult::Hoc { hoc_pretty, .. } => { HocResult::Hoc { hoc_pretty, .. } => {
let badge_opt = BadgeOptions { let badge_opt = BadgeOptions {
@ -237,9 +234,8 @@ fn calculate_hoc<T: Service>(
status: hoc_pretty, status: hoc_pretty,
}; };
let badge = Badge::new(badge_opt)?; let badge = Badge::new(badge_opt)?;
// TODO: remove clone
let (tx, rx_body) = mpsc::unbounded(); let body = badge.to_svg().as_bytes().to_vec();
let _ = tx.unbounded_send(Bytes::from(badge.to_svg().as_bytes()));
let expiration = SystemTime::now() + Duration::from_secs(30); let expiration = SystemTime::now() + Duration::from_secs(30);
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
@ -251,7 +247,7 @@ fn calculate_hoc<T: Service>(
CacheDirective::NoCache, CacheDirective::NoCache,
CacheDirective::NoStore, CacheDirective::NoStore,
])) ]))
.streaming(rx_body.map_err(|_| ErrorBadRequest("bad request")))) .body(body))
} }
}; };
handle_hoc_request::<T, _>(state, data, mapper) handle_hoc_request::<T, _>(state, data, mapper)
@ -260,7 +256,7 @@ fn calculate_hoc<T: Service>(
fn overview<T: Service>( fn overview<T: Service>(
state: web::Data<Arc<State>>, state: web::Data<Arc<State>>,
data: web::Path<(String, String)>, data: web::Path<(String, String)>,
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Output = Result<HttpResponse>> {
let mapper = |r| match r { let mapper = |r| match r {
HocResult::NotFound => p404(), HocResult::NotFound => p404(),
HocResult::Hoc { HocResult::Hoc {
@ -287,30 +283,17 @@ fn overview<T: Service>(
&mut buf, &mut buf,
VERSION_INFO, VERSION_INFO,
REPO_COUNT.load(Ordering::Relaxed), REPO_COUNT.load(Ordering::Relaxed),
repo_info repo_info,
// &OPT.domain,
// &service_path,
// &url,
// hoc,
// &hoc_pretty,
// &head,
// &T::commit_url(&repo, &head),
// commits,
)?; )?;
let (tx, rx_body) = mpsc::unbounded(); Ok(HttpResponse::Ok().content_type("text/html").body(buf))
let _ = tx.unbounded_send(Bytes::from(buf));
Ok(HttpResponse::Ok()
.content_type("text/html")
.streaming(rx_body.map_err(|_| ErrorBadRequest("bad request"))))
} }
}; };
handle_hoc_request::<T, _>(state, data, mapper) handle_hoc_request::<T, _>(state, data, mapper)
} }
#[get("/")] #[get("/")]
fn index() -> Result<HttpResponse> { async fn index() -> Result<HttpResponse> {
let mut buf = Vec::new(); let mut buf = Vec::new();
templates::index( templates::index(
&mut buf, &mut buf,
@ -322,7 +305,7 @@ fn index() -> Result<HttpResponse> {
} }
#[post("/generate")] #[post("/generate")]
fn generate(params: web::Form<GeneratorForm>) -> Result<HttpResponse> { async fn generate(params: web::Form<GeneratorForm<'_>>) -> Result<HttpResponse> {
let repo = format!("{}/{}", params.user, params.repo); let repo = format!("{}/{}", params.user, params.repo);
let mut buf = Vec::new(); let mut buf = Vec::new();
templates::generate( templates::generate(
@ -334,12 +317,8 @@ fn generate(params: web::Form<GeneratorForm>) -> Result<HttpResponse> {
params.service.service(), params.service.service(),
&repo, &repo,
)?; )?;
let (tx, rx_body) = mpsc::unbounded();
let _ = tx.unbounded_send(Bytes::from(buf));
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok().content_type("text/html").body(buf))
.content_type("text/html")
.streaming(rx_body.map_err(|_| ErrorBadRequest("bad request"))))
} }
fn p404() -> Result<HttpResponse> { fn p404() -> Result<HttpResponse> {
@ -348,6 +327,10 @@ fn p404() -> Result<HttpResponse> {
Ok(HttpResponse::NotFound().content_type("text/html").body(buf)) Ok(HttpResponse::NotFound().content_type("text/html").body(buf))
} }
async fn async_p404() -> Result<HttpResponse> {
p404()
}
#[get("/tacit-css.min.css")] #[get("/tacit-css.min.css")]
fn css() -> HttpResponse { fn css() -> HttpResponse {
HttpResponse::Ok().content_type("text/css").body(CSS) HttpResponse::Ok().content_type("text/css").body(CSS)
@ -358,13 +341,13 @@ fn favicon32() -> HttpResponse {
HttpResponse::Ok().content_type("image/png").body(FAVICON) HttpResponse::Ok().content_type("image/png").body(FAVICON)
} }
fn start_server() -> Result<()> { async fn start_server() -> std::io::Result<()> {
let interface = format!("{}:{}", OPT.host, OPT.port); let interface = format!("{}:{}", OPT.host, OPT.port);
let state = Arc::new(State { let state = Arc::new(State {
repos: OPT.outdir.display().to_string(), repos: OPT.outdir.display().to_string(),
cache: OPT.cachedir.display().to_string(), cache: OPT.cachedir.display().to_string(),
}); });
Ok(HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.data(state.clone()) .data(state.clone())
.wrap(middleware::Logger::default()) .wrap(middleware::Logger::default())
@ -373,23 +356,25 @@ fn start_server() -> Result<()> {
.service(css) .service(css)
.service(favicon32) .service(favicon32)
.service(generate) .service(generate)
.service(web::resource("/github/{user}/{repo}").to_async(calculate_hoc::<GitHub>)) .service(web::resource("/github/{user}/{repo}").to(calculate_hoc::<GitHub>))
.service(web::resource("/gitlab/{user}/{repo}").to_async(calculate_hoc::<Gitlab>)) .service(web::resource("/gitlab/{user}/{repo}").to(calculate_hoc::<Gitlab>))
.service(web::resource("/bitbucket/{user}/{repo}").to_async(calculate_hoc::<Bitbucket>)) .service(web::resource("/bitbucket/{user}/{repo}").to(calculate_hoc::<Bitbucket>))
.service(web::resource("/github/{user}/{repo}/json").to_async(json_hoc::<GitHub>)) .service(web::resource("/github/{user}/{repo}/json").to(json_hoc::<GitHub>))
.service(web::resource("/gitlab/{user}/{repo}/json").to_async(json_hoc::<Gitlab>)) .service(web::resource("/gitlab/{user}/{repo}/json").to(json_hoc::<Gitlab>))
.service(web::resource("/bitbucket/{user}/{repo}/json").to_async(json_hoc::<Bitbucket>)) .service(web::resource("/bitbucket/{user}/{repo}/json").to(json_hoc::<Bitbucket>))
.service(web::resource("/view/github/{user}/{repo}").to_async(overview::<GitHub>)) .service(web::resource("/view/github/{user}/{repo}").to(overview::<GitHub>))
.service(web::resource("/view/gitlab/{user}/{repo}").to_async(overview::<Gitlab>)) .service(web::resource("/view/gitlab/{user}/{repo}").to(overview::<Gitlab>))
.service(web::resource("/view/bitbucket/{user}/{repo}").to_async(overview::<Bitbucket>)) .service(web::resource("/view/bitbucket/{user}/{repo}").to(overview::<Bitbucket>))
.default_service(web::resource("").route(web::get().to_async(p404))) .default_service(web::resource("").route(web::get().to(async_p404)))
}) })
.workers(OPT.workers) .workers(OPT.workers)
.bind(interface)? .bind(interface)?
.run()?) .run()
.await
} }
fn main() -> Result<()> { #[actix_rt::main]
config::init()?; async fn main() -> std::io::Result<()> {
start_server() config::init().await.unwrap();
start_server().await
} }

View File

@ -15,7 +15,7 @@ pub(crate) const CSS: &str = include_str!("../static/tacit-css.min.css");
pub(crate) const FAVICON: &[u8] = include_bytes!("../static/favicon32.png"); pub(crate) const FAVICON: &[u8] = include_bytes!("../static/favicon32.png");
lazy_static! { lazy_static! {
pub(crate) static ref CLIENT: reqwest::r#async::Client = reqwest::r#async::Client::new(); pub(crate) static ref CLIENT: reqwest::Client = reqwest::Client::new();
pub(crate) static ref OPT: Opt = Opt::from_args(); pub(crate) static ref OPT: Opt = Opt::from_args();
pub(crate) static ref REPO_COUNT: AtomicUsize = pub(crate) static ref REPO_COUNT: AtomicUsize =
AtomicUsize::new(count_repositories(&OPT.outdir).unwrap()); AtomicUsize::new(count_repositories(&OPT.outdir).unwrap());