diff --git a/CHANGELOG.md b/CHANGELOG.md index 594540a..427736f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `--ephemeral` can now be specified to automatically delete data created by Prometheus/Pushgateway after the process exits (#76) - Added new subcommand `discord` which links to the discord server (#80) +- The `/metrics` endpoint now transparently redirects to `/pushgateway/metrics` if + Pushgateway is enabled (#81) ## [0.1.0] diff --git a/src/bin/am/commands/start.rs b/src/bin/am/commands/start.rs index 3064f03..d246509 100644 --- a/src/bin/am/commands/start.rs +++ b/src/bin/am/commands/start.rs @@ -1,18 +1,12 @@ use crate::dir::AutoCleanupDir; use crate::downloader::{download_github_release, unpack, verify_checksum}; use crate::interactive; +use crate::server::start_web_server; use anyhow::{bail, Context, Result}; use autometrics_am::prometheus; -use axum::body::{self, Body}; -use axum::extract::Path as AxumPath; -use axum::response::{IntoResponse, Redirect, Response}; -use axum::routing::{any, get}; -use axum::Router; use clap::Parser; use directories::ProjectDirs; use futures_util::FutureExt; -use http::{StatusCode, Uri}; -use include_dir::{include_dir, Dir}; use indicatif::MultiProgress; use once_cell::sync::Lazy; use std::fs::File; @@ -23,7 +17,7 @@ use std::time::Duration; use std::{env, vec}; use tempfile::NamedTempFile; use tokio::{process, select}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, info, warn}; use url::Url; // Create a reqwest client that will be used to make HTTP requests. This allows @@ -177,9 +171,19 @@ pub async fn handle_command(mut args: Arguments, mp: MultiProgress) -> Result<() async move { anyhow::Ok(()) }.boxed() }; + if !args.metrics_endpoints.is_empty() { + let endpoints = args + .metrics_endpoints + .iter() + .map(|endpoint| endpoint.to_string()) + .collect::>() + .join(", "); + info!("Now sampling the following endpoints for metrics: {endpoints}"); + } + // Start web server for hosting the explorer, am api and proxies to the enabled services. let listen_address = args.listen_address; - let web_server_task = async move { start_web_server(&listen_address, args).await }; + let web_server_task = start_web_server(&listen_address, args.enable_pushgateway); select! { biased; @@ -481,158 +485,6 @@ async fn start_pushgateway(pushgateway_path: &Path, ephemeral: bool) -> Result<( Ok(()) } -async fn start_web_server(listen_address: &SocketAddr, args: Arguments) -> Result<()> { - let mut app = Router::new() - // Any calls to the root should be redirected to the explorer which is most likely what the user wants to use. - .route("/", get(|| async { Redirect::temporary("/explorer/") })) - .route( - "/explorer", - get(|| async { Redirect::permanent("/explorer/") }), - ) - .route("/explorer/", get(explorer_root_handler)) - .route("/explorer/*path", get(explorer_handler)) - .route("/prometheus/*path", any(prometheus_handler)) - .route("/prometheus", any(prometheus_handler)); - - if args.enable_pushgateway { - app = app - .route("/pushgateway/*path", any(pushgateway_handler)) - .route("/pushgateway", any(pushgateway_handler)); - } - - let server = axum::Server::try_bind(listen_address) - .with_context(|| format!("failed to bind to {}", listen_address))? - .serve(app.into_make_service()); - - debug!("Web server listening on {}", server.local_addr()); - - info!("Explorer endpoint: http://{}", server.local_addr()); - info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus"); - if args.enable_pushgateway { - info!("Pushgateway endpoint: http://127.0.0.1:9091/pushgateway"); - } - - if !args.metrics_endpoints.is_empty() { - let endpoints = args - .metrics_endpoints - .iter() - .map(|endpoint| endpoint.to_string()) - .collect::>() - .join(", "); - info!("Now sampling the following endpoints for metrics: {endpoints}"); - } - - // TODO: Add support for graceful shutdown - // server.with_graceful_shutdown(shutdown_signal()).await?; - server.await?; - - Ok(()) -} - -static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/files/explorer"); - -/// This will serve the "index.html" file from the explorer directory. -/// -/// This needs to be a separate handler since otherwise the Path extractor will -/// fail since the root does not have a path. -async fn explorer_root_handler() -> impl IntoResponse { - serve_explorer("index.html").await -} - -/// This will look at the path of the request and serve the corresponding file. -async fn explorer_handler(AxumPath(path): AxumPath) -> impl IntoResponse { - serve_explorer(&path).await -} - -/// Server a specific file from the explorer directory. Returns 404 if the file -/// was not found. -async fn serve_explorer(path: &str) -> impl IntoResponse { - trace!(?path, "Serving static file"); - - match STATIC_DIR.get_file(path) { - None => { - warn!(?path, "Request file was not found in the explorer assets"); - StatusCode::NOT_FOUND.into_response() - } - Some(file) => Response::builder() - .status(StatusCode::OK) - .body(body::boxed(body::Full::from(file.contents()))) - .map(|res| res.into_response()) - .unwrap_or_else(|err| { - error!("Failed to build response: {}", err); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - }), - } -} - -async fn prometheus_handler(req: http::Request) -> impl IntoResponse { - let upstream_base = Url::parse("http://localhost:9090").unwrap(); - proxy_handler(req, upstream_base).await -} - -async fn pushgateway_handler(req: http::Request) -> impl IntoResponse { - let upstream_base = Url::parse("http://localhost:9091").unwrap(); - proxy_handler(req, upstream_base).await -} - -async fn proxy_handler(mut req: http::Request, upstream_base: Url) -> impl IntoResponse { - trace!(req_uri=?req.uri(),method=?req.method(),"Proxying request"); - - // NOTE: The username/password is not forwarded - let mut url = upstream_base.join(req.uri().path()).unwrap(); - url.set_query(req.uri().query()); - *req.uri_mut() = Uri::try_from(url.as_str()).unwrap(); - - let res = CLIENT.execute(req.try_into().unwrap()).await; - - match res { - Ok(res) => { - if !res.status().is_success() { - debug!( - "Response from the upstream source returned a non-success status code for {}: {:?}", - res.url(), res.status() - ); - } - - convert_response(res).into_response() - } - Err(err) => { - error!("Error proxying request: {:?}", err); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } -} - -/// Convert a reqwest::Response into a axum_core::Response. -/// -/// If the Response builder is unable to create a Response, then it will log the -/// error and return a http status code 500. -/// -/// We cannot implement this as an Into or From trait since both types are -/// foreign to this code. -fn convert_response(req: reqwest::Response) -> Response { - let mut builder = http::Response::builder().status(req.status()); - - // Calling `headers_mut` is safe here because we're constructing a new - // Response from scratch and it will only return `None` if the builder is in - // a Error state. - let headers = builder.headers_mut().unwrap(); - for (name, value) in req.headers() { - // Insert all the headers that were in the response from the upstream. - headers.insert(name, value.clone()); - } - - // TODO: Do we need to rewrite some headers, such as host? - - match builder.body(body::StreamBody::from(req.bytes_stream())) { - Ok(res) => res.into_response(), - Err(err) => { - error!("Error converting response: {:?}", err); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } - } -} - /// Parses the input string into a Url. This uses a custom parser to allow for /// some more flexible input. /// diff --git a/src/bin/am/main.rs b/src/bin/am/main.rs index da30836..c29cf0e 100644 --- a/src/bin/am/main.rs +++ b/src/bin/am/main.rs @@ -13,6 +13,7 @@ mod commands; mod dir; mod downloader; mod interactive; +mod server; #[tokio::main] async fn main() { diff --git a/src/bin/am/server.rs b/src/bin/am/server.rs new file mode 100644 index 0000000..633e125 --- /dev/null +++ b/src/bin/am/server.rs @@ -0,0 +1,54 @@ +use anyhow::{Context, Result}; +use axum::response::Redirect; +use axum::routing::{any, get}; +use axum::{Router, Server}; +use std::net::SocketAddr; +use tracing::{debug, info}; + +mod explorer; +mod prometheus; +mod pushgateway; +mod util; + +pub(crate) async fn start_web_server( + listen_address: &SocketAddr, + enable_pushgateway: bool, +) -> Result<()> { + let mut app = Router::new() + // Any calls to the root should be redirected to the explorer which is most likely what the user wants to use. + .route("/", get(|| async { Redirect::temporary("/explorer/") })) + .route( + "/explorer", + get(|| async { Redirect::permanent("/explorer/") }), + ) + .route("/explorer/", get(explorer::handler)) + .route("/explorer/*path", get(explorer::handler)) + .route("/prometheus/*path", any(prometheus::handler)) + .route("/prometheus", any(prometheus::handler)); + + if enable_pushgateway { + app = app + .route("/metrics", any(pushgateway::metrics_proxy_handler)) + .route("/pushgateway/*path", any(pushgateway::handler)) + .route("/pushgateway", any(pushgateway::handler)); + } + + let server = Server::try_bind(listen_address) + .with_context(|| format!("failed to bind to {}", listen_address))? + .serve(app.into_make_service()); + + debug!("Web server listening on {}", server.local_addr()); + + info!("Explorer endpoint: http://{}", server.local_addr()); + info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus"); + + if enable_pushgateway { + info!("Pushgateway endpoint: http://127.0.0.1:9091/pushgateway"); + } + + // TODO: Add support for graceful shutdown + // server.with_graceful_shutdown(shutdown_signal()).await?; + server.await?; + + Ok(()) +} diff --git a/src/bin/am/server/explorer.rs b/src/bin/am/server/explorer.rs new file mode 100644 index 0000000..1d5f309 --- /dev/null +++ b/src/bin/am/server/explorer.rs @@ -0,0 +1,29 @@ +use axum::body; +use axum::extract::Path; +use axum::response::{IntoResponse, Response}; +use http::StatusCode; +use include_dir::{include_dir, Dir}; +use tracing::{error, trace, warn}; + +static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/files/explorer"); + +pub(crate) async fn handler(optional_path: Option>) -> impl IntoResponse { + let path = optional_path.map_or_else(|| "index.html".to_string(), |path| path.0); + + trace!(?path, "Serving static file"); + + match STATIC_DIR.get_file(&path) { + None => { + warn!(?path, "Request file was not found in the explorer assets"); + StatusCode::NOT_FOUND.into_response() + } + Some(file) => Response::builder() + .status(StatusCode::OK) + .body(body::boxed(body::Full::from(file.contents()))) + .map(|res| res.into_response()) + .unwrap_or_else(|err| { + error!("Failed to build response: {}", err); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + }), + } +} diff --git a/src/bin/am/server/prometheus.rs b/src/bin/am/server/prometheus.rs new file mode 100644 index 0000000..d919aa0 --- /dev/null +++ b/src/bin/am/server/prometheus.rs @@ -0,0 +1,9 @@ +use crate::server::util::proxy_handler; +use axum::body::Body; +use axum::response::IntoResponse; +use url::Url; + +pub(crate) async fn handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9090").unwrap(); + proxy_handler(req, upstream_base).await +} diff --git a/src/bin/am/server/pushgateway.rs b/src/bin/am/server/pushgateway.rs new file mode 100644 index 0000000..bcc93cb --- /dev/null +++ b/src/bin/am/server/pushgateway.rs @@ -0,0 +1,14 @@ +use crate::server::util::proxy_handler; +use axum::body::Body; +use axum::response::IntoResponse; +use url::Url; + +pub(crate) async fn handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9091").unwrap(); + proxy_handler(req, upstream_base).await +} + +pub(crate) async fn metrics_proxy_handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9091/pushgateway/metrics").unwrap(); + proxy_handler(req, upstream_base).await +} diff --git a/src/bin/am/server/util.rs b/src/bin/am/server/util.rs new file mode 100644 index 0000000..4b16040 --- /dev/null +++ b/src/bin/am/server/util.rs @@ -0,0 +1,68 @@ +use crate::commands::start::CLIENT; +use axum::body; +use axum::body::Body; +use axum::response::{IntoResponse, Response}; +use http::{StatusCode, Uri}; +use tracing::{debug, error, trace}; +use url::Url; + +pub(crate) async fn proxy_handler( + mut req: http::Request, + upstream_base: Url, +) -> impl IntoResponse { + trace!(req_uri=?req.uri(),method=?req.method(),"Proxying request"); + + // NOTE: The username/password is not forwarded + let mut url = upstream_base.join(req.uri().path()).unwrap(); + url.set_query(req.uri().query()); + *req.uri_mut() = Uri::try_from(url.as_str()).unwrap(); + + let res = CLIENT.execute(req.try_into().unwrap()).await; + + match res { + Ok(res) => { + if !res.status().is_success() { + debug!( + "Response from the upstream source returned a non-success status code for {}: {:?}", + res.url(), res.status() + ); + } + + convert_response(res).into_response() + } + Err(err) => { + error!("Error proxying request: {:?}", err); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + +/// Convert a reqwest::Response into a axum_core::Response. +/// +/// If the Response builder is unable to create a Response, then it will log the +/// error and return a http status code 500. +/// +/// We cannot implement this as an Into or From trait since both types are +/// foreign to this code. +pub(crate) fn convert_response(req: reqwest::Response) -> Response { + let mut builder = http::Response::builder().status(req.status()); + + // Calling `headers_mut` is safe here because we're constructing a new + // Response from scratch and it will only return `None` if the builder is in + // a Error state. + let headers = builder.headers_mut().unwrap(); + for (name, value) in req.headers() { + // Insert all the headers that were in the response from the upstream. + headers.insert(name, value.clone()); + } + + // TODO: Do we need to rewrite some headers, such as host? + + match builder.body(body::StreamBody::from(req.bytes_stream())) { + Ok(res) => res.into_response(), + Err(err) => { + error!("Error converting response: {:?}", err); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +}