From 098932e63ff0dd5ab08fafe472f5a1cc7d1fe228 Mon Sep 17 00:00:00 2001 From: Mari Date: Mon, 10 Jul 2023 15:33:16 +0200 Subject: [PATCH 1/5] refactor web server --- src/bin/am/commands/start.rs | 174 +++---------------------------- src/bin/am/main.rs | 1 + src/bin/am/server.rs | 53 ++++++++++ src/bin/am/server/explorer.rs | 42 ++++++++ src/bin/am/server/prometheus.rs | 9 ++ src/bin/am/server/pushgateway.rs | 9 ++ src/bin/am/server/util.rs | 68 ++++++++++++ 7 files changed, 195 insertions(+), 161 deletions(-) create mode 100644 src/bin/am/server.rs create mode 100644 src/bin/am/server/explorer.rs create mode 100644 src/bin/am/server/prometheus.rs create mode 100644 src/bin/am/server/pushgateway.rs create mode 100644 src/bin/am/server/util.rs diff --git a/src/bin/am/commands/start.rs b/src/bin/am/commands/start.rs index 3064f03..d246509 100644 --- a/src/bin/am/commands/start.rs +++ b/src/bin/am/commands/start.rs @@ -1,18 +1,12 @@ use crate::dir::AutoCleanupDir; use crate::downloader::{download_github_release, unpack, verify_checksum}; use crate::interactive; +use crate::server::start_web_server; use anyhow::{bail, Context, Result}; use autometrics_am::prometheus; -use axum::body::{self, Body}; -use axum::extract::Path as AxumPath; -use axum::response::{IntoResponse, Redirect, Response}; -use axum::routing::{any, get}; -use axum::Router; use clap::Parser; use directories::ProjectDirs; use futures_util::FutureExt; -use http::{StatusCode, Uri}; -use include_dir::{include_dir, Dir}; use indicatif::MultiProgress; use once_cell::sync::Lazy; use std::fs::File; @@ -23,7 +17,7 @@ use std::time::Duration; use std::{env, vec}; use tempfile::NamedTempFile; use tokio::{process, select}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, info, warn}; use url::Url; // Create a reqwest client that will be used to make HTTP requests. This allows @@ -177,9 +171,19 @@ pub async fn handle_command(mut args: Arguments, mp: MultiProgress) -> Result<() async move { anyhow::Ok(()) }.boxed() }; + if !args.metrics_endpoints.is_empty() { + let endpoints = args + .metrics_endpoints + .iter() + .map(|endpoint| endpoint.to_string()) + .collect::>() + .join(", "); + info!("Now sampling the following endpoints for metrics: {endpoints}"); + } + // Start web server for hosting the explorer, am api and proxies to the enabled services. let listen_address = args.listen_address; - let web_server_task = async move { start_web_server(&listen_address, args).await }; + let web_server_task = start_web_server(&listen_address, args.enable_pushgateway); select! { biased; @@ -481,158 +485,6 @@ async fn start_pushgateway(pushgateway_path: &Path, ephemeral: bool) -> Result<( Ok(()) } -async fn start_web_server(listen_address: &SocketAddr, args: Arguments) -> Result<()> { - let mut app = Router::new() - // Any calls to the root should be redirected to the explorer which is most likely what the user wants to use. - .route("/", get(|| async { Redirect::temporary("/explorer/") })) - .route( - "/explorer", - get(|| async { Redirect::permanent("/explorer/") }), - ) - .route("/explorer/", get(explorer_root_handler)) - .route("/explorer/*path", get(explorer_handler)) - .route("/prometheus/*path", any(prometheus_handler)) - .route("/prometheus", any(prometheus_handler)); - - if args.enable_pushgateway { - app = app - .route("/pushgateway/*path", any(pushgateway_handler)) - .route("/pushgateway", any(pushgateway_handler)); - } - - let server = axum::Server::try_bind(listen_address) - .with_context(|| format!("failed to bind to {}", listen_address))? - .serve(app.into_make_service()); - - debug!("Web server listening on {}", server.local_addr()); - - info!("Explorer endpoint: http://{}", server.local_addr()); - info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus"); - if args.enable_pushgateway { - info!("Pushgateway endpoint: http://127.0.0.1:9091/pushgateway"); - } - - if !args.metrics_endpoints.is_empty() { - let endpoints = args - .metrics_endpoints - .iter() - .map(|endpoint| endpoint.to_string()) - .collect::>() - .join(", "); - info!("Now sampling the following endpoints for metrics: {endpoints}"); - } - - // TODO: Add support for graceful shutdown - // server.with_graceful_shutdown(shutdown_signal()).await?; - server.await?; - - Ok(()) -} - -static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/files/explorer"); - -/// This will serve the "index.html" file from the explorer directory. -/// -/// This needs to be a separate handler since otherwise the Path extractor will -/// fail since the root does not have a path. -async fn explorer_root_handler() -> impl IntoResponse { - serve_explorer("index.html").await -} - -/// This will look at the path of the request and serve the corresponding file. -async fn explorer_handler(AxumPath(path): AxumPath) -> impl IntoResponse { - serve_explorer(&path).await -} - -/// Server a specific file from the explorer directory. Returns 404 if the file -/// was not found. -async fn serve_explorer(path: &str) -> impl IntoResponse { - trace!(?path, "Serving static file"); - - match STATIC_DIR.get_file(path) { - None => { - warn!(?path, "Request file was not found in the explorer assets"); - StatusCode::NOT_FOUND.into_response() - } - Some(file) => Response::builder() - .status(StatusCode::OK) - .body(body::boxed(body::Full::from(file.contents()))) - .map(|res| res.into_response()) - .unwrap_or_else(|err| { - error!("Failed to build response: {}", err); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - }), - } -} - -async fn prometheus_handler(req: http::Request) -> impl IntoResponse { - let upstream_base = Url::parse("http://localhost:9090").unwrap(); - proxy_handler(req, upstream_base).await -} - -async fn pushgateway_handler(req: http::Request) -> impl IntoResponse { - let upstream_base = Url::parse("http://localhost:9091").unwrap(); - proxy_handler(req, upstream_base).await -} - -async fn proxy_handler(mut req: http::Request, upstream_base: Url) -> impl IntoResponse { - trace!(req_uri=?req.uri(),method=?req.method(),"Proxying request"); - - // NOTE: The username/password is not forwarded - let mut url = upstream_base.join(req.uri().path()).unwrap(); - url.set_query(req.uri().query()); - *req.uri_mut() = Uri::try_from(url.as_str()).unwrap(); - - let res = CLIENT.execute(req.try_into().unwrap()).await; - - match res { - Ok(res) => { - if !res.status().is_success() { - debug!( - "Response from the upstream source returned a non-success status code for {}: {:?}", - res.url(), res.status() - ); - } - - convert_response(res).into_response() - } - Err(err) => { - error!("Error proxying request: {:?}", err); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } -} - -/// Convert a reqwest::Response into a axum_core::Response. -/// -/// If the Response builder is unable to create a Response, then it will log the -/// error and return a http status code 500. -/// -/// We cannot implement this as an Into or From trait since both types are -/// foreign to this code. -fn convert_response(req: reqwest::Response) -> Response { - let mut builder = http::Response::builder().status(req.status()); - - // Calling `headers_mut` is safe here because we're constructing a new - // Response from scratch and it will only return `None` if the builder is in - // a Error state. - let headers = builder.headers_mut().unwrap(); - for (name, value) in req.headers() { - // Insert all the headers that were in the response from the upstream. - headers.insert(name, value.clone()); - } - - // TODO: Do we need to rewrite some headers, such as host? - - match builder.body(body::StreamBody::from(req.bytes_stream())) { - Ok(res) => res.into_response(), - Err(err) => { - error!("Error converting response: {:?}", err); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } -} - /// Parses the input string into a Url. This uses a custom parser to allow for /// some more flexible input. /// diff --git a/src/bin/am/main.rs b/src/bin/am/main.rs index da30836..c29cf0e 100644 --- a/src/bin/am/main.rs +++ b/src/bin/am/main.rs @@ -13,6 +13,7 @@ mod commands; mod dir; mod downloader; mod interactive; +mod server; #[tokio::main] async fn main() { diff --git a/src/bin/am/server.rs b/src/bin/am/server.rs new file mode 100644 index 0000000..7b4e3e7 --- /dev/null +++ b/src/bin/am/server.rs @@ -0,0 +1,53 @@ +use anyhow::{Context, Result}; +use axum::response::Redirect; +use axum::routing::{any, get}; +use axum::{Router, Server}; +use std::net::SocketAddr; +use tracing::{debug, info}; + +mod explorer; +mod prometheus; +mod pushgateway; +mod util; + +pub(crate) async fn start_web_server( + listen_address: &SocketAddr, + enable_pushgateway: bool, +) -> Result<()> { + let mut app = Router::new() + // Any calls to the root should be redirected to the explorer which is most likely what the user wants to use. + .route("/", get(|| async { Redirect::temporary("/explorer/") })) + .route( + "/explorer", + get(|| async { Redirect::permanent("/explorer/") }), + ) + .route("/explorer/", get(explorer::root_handler)) + .route("/explorer/*path", get(explorer::handler)) + .route("/prometheus/*path", any(prometheus::handler)) + .route("/prometheus", any(prometheus::handler)); + + if enable_pushgateway { + app = app + .route("/pushgateway/*path", any(pushgateway::handler)) + .route("/pushgateway", any(pushgateway::handler)); + } + + let server = Server::try_bind(listen_address) + .with_context(|| format!("failed to bind to {}", listen_address))? + .serve(app.into_make_service()); + + debug!("Web server listening on {}", server.local_addr()); + + info!("Explorer endpoint: http://{}", server.local_addr()); + info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus"); + + if enable_pushgateway { + info!("Pushgateway endpoint: http://127.0.0.1:9091/pushgateway"); + } + + // TODO: Add support for graceful shutdown + // server.with_graceful_shutdown(shutdown_signal()).await?; + server.await?; + + Ok(()) +} diff --git a/src/bin/am/server/explorer.rs b/src/bin/am/server/explorer.rs new file mode 100644 index 0000000..48a986e --- /dev/null +++ b/src/bin/am/server/explorer.rs @@ -0,0 +1,42 @@ +use axum::body; +use axum::extract::Path; +use axum::response::{IntoResponse, Response}; +use http::StatusCode; +use include_dir::{include_dir, Dir}; +use tracing::{error, trace, warn}; + +static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/files/explorer"); + +/// This will serve the "index.html" file from the explorer directory. +/// +/// This needs to be a separate handler since otherwise the Path extractor will +/// fail since the root does not have a path. +pub(crate) async fn root_handler() -> impl IntoResponse { + serve_explorer("index.html").await +} + +/// This will look at the path of the request and serve the corresponding file. +pub(crate) async fn handler(Path(path): Path) -> impl IntoResponse { + serve_explorer(&path).await +} + +/// Server a specific file from the explorer directory. Returns 404 if the file +/// was not found. +async fn serve_explorer(path: &str) -> impl IntoResponse { + trace!(?path, "Serving static file"); + + match STATIC_DIR.get_file(path) { + None => { + warn!(?path, "Request file was not found in the explorer assets"); + StatusCode::NOT_FOUND.into_response() + } + Some(file) => Response::builder() + .status(StatusCode::OK) + .body(body::boxed(body::Full::from(file.contents()))) + .map(|res| res.into_response()) + .unwrap_or_else(|err| { + error!("Failed to build response: {}", err); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + }), + } +} diff --git a/src/bin/am/server/prometheus.rs b/src/bin/am/server/prometheus.rs new file mode 100644 index 0000000..d919aa0 --- /dev/null +++ b/src/bin/am/server/prometheus.rs @@ -0,0 +1,9 @@ +use crate::server::util::proxy_handler; +use axum::body::Body; +use axum::response::IntoResponse; +use url::Url; + +pub(crate) async fn handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9090").unwrap(); + proxy_handler(req, upstream_base).await +} diff --git a/src/bin/am/server/pushgateway.rs b/src/bin/am/server/pushgateway.rs new file mode 100644 index 0000000..9841c91 --- /dev/null +++ b/src/bin/am/server/pushgateway.rs @@ -0,0 +1,9 @@ +use crate::server::util::proxy_handler; +use axum::body::Body; +use axum::response::IntoResponse; +use url::Url; + +pub(crate) async fn handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9091").unwrap(); + proxy_handler(req, upstream_base).await +} diff --git a/src/bin/am/server/util.rs b/src/bin/am/server/util.rs new file mode 100644 index 0000000..4b16040 --- /dev/null +++ b/src/bin/am/server/util.rs @@ -0,0 +1,68 @@ +use crate::commands::start::CLIENT; +use axum::body; +use axum::body::Body; +use axum::response::{IntoResponse, Response}; +use http::{StatusCode, Uri}; +use tracing::{debug, error, trace}; +use url::Url; + +pub(crate) async fn proxy_handler( + mut req: http::Request, + upstream_base: Url, +) -> impl IntoResponse { + trace!(req_uri=?req.uri(),method=?req.method(),"Proxying request"); + + // NOTE: The username/password is not forwarded + let mut url = upstream_base.join(req.uri().path()).unwrap(); + url.set_query(req.uri().query()); + *req.uri_mut() = Uri::try_from(url.as_str()).unwrap(); + + let res = CLIENT.execute(req.try_into().unwrap()).await; + + match res { + Ok(res) => { + if !res.status().is_success() { + debug!( + "Response from the upstream source returned a non-success status code for {}: {:?}", + res.url(), res.status() + ); + } + + convert_response(res).into_response() + } + Err(err) => { + error!("Error proxying request: {:?}", err); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + +/// Convert a reqwest::Response into a axum_core::Response. +/// +/// If the Response builder is unable to create a Response, then it will log the +/// error and return a http status code 500. +/// +/// We cannot implement this as an Into or From trait since both types are +/// foreign to this code. +pub(crate) fn convert_response(req: reqwest::Response) -> Response { + let mut builder = http::Response::builder().status(req.status()); + + // Calling `headers_mut` is safe here because we're constructing a new + // Response from scratch and it will only return `None` if the builder is in + // a Error state. + let headers = builder.headers_mut().unwrap(); + for (name, value) in req.headers() { + // Insert all the headers that were in the response from the upstream. + headers.insert(name, value.clone()); + } + + // TODO: Do we need to rewrite some headers, such as host? + + match builder.body(body::StreamBody::from(req.bytes_stream())) { + Ok(res) => res.into_response(), + Err(err) => { + error!("Error converting response: {:?}", err); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} From 35c7129d006b6a59bed5f17abdc4e340ce536840 Mon Sep 17 00:00:00 2001 From: Mari Date: Mon, 10 Jul 2023 15:35:55 +0200 Subject: [PATCH 2/5] replace double handler with single optional handler --- src/bin/am/server.rs | 2 +- src/bin/am/server/explorer.rs | 19 +++---------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/src/bin/am/server.rs b/src/bin/am/server.rs index 7b4e3e7..b84c09a 100644 --- a/src/bin/am/server.rs +++ b/src/bin/am/server.rs @@ -21,7 +21,7 @@ pub(crate) async fn start_web_server( "/explorer", get(|| async { Redirect::permanent("/explorer/") }), ) - .route("/explorer/", get(explorer::root_handler)) + .route("/explorer/", get(explorer::handler)) .route("/explorer/*path", get(explorer::handler)) .route("/prometheus/*path", any(prometheus::handler)) .route("/prometheus", any(prometheus::handler)); diff --git a/src/bin/am/server/explorer.rs b/src/bin/am/server/explorer.rs index 48a986e..1d5f309 100644 --- a/src/bin/am/server/explorer.rs +++ b/src/bin/am/server/explorer.rs @@ -7,25 +7,12 @@ use tracing::{error, trace, warn}; static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/files/explorer"); -/// This will serve the "index.html" file from the explorer directory. -/// -/// This needs to be a separate handler since otherwise the Path extractor will -/// fail since the root does not have a path. -pub(crate) async fn root_handler() -> impl IntoResponse { - serve_explorer("index.html").await -} - -/// This will look at the path of the request and serve the corresponding file. -pub(crate) async fn handler(Path(path): Path) -> impl IntoResponse { - serve_explorer(&path).await -} +pub(crate) async fn handler(optional_path: Option>) -> impl IntoResponse { + let path = optional_path.map_or_else(|| "index.html".to_string(), |path| path.0); -/// Server a specific file from the explorer directory. Returns 404 if the file -/// was not found. -async fn serve_explorer(path: &str) -> impl IntoResponse { trace!(?path, "Serving static file"); - match STATIC_DIR.get_file(path) { + match STATIC_DIR.get_file(&path) { None => { warn!(?path, "Request file was not found in the explorer assets"); StatusCode::NOT_FOUND.into_response() From 3fa5c7db8b3dba112e8fd311a16d3d1a789a0a60 Mon Sep 17 00:00:00 2001 From: Mari Date: Mon, 10 Jul 2023 15:43:19 +0200 Subject: [PATCH 3/5] proxy metrics to pushgateway metrics --- Cargo.lock | 1 + Cargo.toml | 1 + src/bin/am/server.rs | 1 + src/bin/am/server/pushgateway.rs | 5 +++++ 4 files changed, 8 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d57eedd..4630d94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,6 +104,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "bytes", "clap", "clap-markdown", "dialoguer", diff --git a/Cargo.toml b/Cargo.toml index 0ec642f..cd534a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ categories = ["development-tools::profiling"] [dependencies] anyhow = { version = "1.0.71" } axum = { version = "0.6.18" } +bytes = "1.4.0" clap = { version = "4.2.7", features = ["derive", "env"] } clap-markdown = "0.1.3" dialoguer = "0.10.4" diff --git a/src/bin/am/server.rs b/src/bin/am/server.rs index b84c09a..633e125 100644 --- a/src/bin/am/server.rs +++ b/src/bin/am/server.rs @@ -28,6 +28,7 @@ pub(crate) async fn start_web_server( if enable_pushgateway { app = app + .route("/metrics", any(pushgateway::metrics_proxy_handler)) .route("/pushgateway/*path", any(pushgateway::handler)) .route("/pushgateway", any(pushgateway::handler)); } diff --git a/src/bin/am/server/pushgateway.rs b/src/bin/am/server/pushgateway.rs index 9841c91..bcc93cb 100644 --- a/src/bin/am/server/pushgateway.rs +++ b/src/bin/am/server/pushgateway.rs @@ -7,3 +7,8 @@ pub(crate) async fn handler(req: http::Request) -> impl IntoResponse { let upstream_base = Url::parse("http://localhost:9091").unwrap(); proxy_handler(req, upstream_base).await } + +pub(crate) async fn metrics_proxy_handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9091/pushgateway/metrics").unwrap(); + proxy_handler(req, upstream_base).await +} From 77f6617229ff51e48d16f9172f165c99cb91b6d2 Mon Sep 17 00:00:00 2001 From: Mari Date: Mon, 10 Jul 2023 15:43:41 +0200 Subject: [PATCH 4/5] dont need bytes --- Cargo.lock | 1 - Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4630d94..d57eedd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,7 +104,6 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", - "bytes", "clap", "clap-markdown", "dialoguer", diff --git a/Cargo.toml b/Cargo.toml index cd534a1..0ec642f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ categories = ["development-tools::profiling"] [dependencies] anyhow = { version = "1.0.71" } axum = { version = "0.6.18" } -bytes = "1.4.0" clap = { version = "4.2.7", features = ["derive", "env"] } clap-markdown = "0.1.3" dialoguer = "0.10.4" From c8b1e6e70aadbaf38420af077e476a87949b2b7f Mon Sep 17 00:00:00 2001 From: Mari Date: Tue, 11 Jul 2023 14:24:29 +0200 Subject: [PATCH 5/5] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 594540a..427736f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `--ephemeral` can now be specified to automatically delete data created by Prometheus/Pushgateway after the process exits (#76) - Added new subcommand `discord` which links to the discord server (#80) +- The `/metrics` endpoint now transparently redirects to `/pushgateway/metrics` if + Pushgateway is enabled (#81) ## [0.1.0]