Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor web server setup + proxy /metrics to pushgateway #81

Merged
merged 5 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 13 additions & 161 deletions src/bin/am/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
use crate::dir::AutoCleanupDir;
use crate::downloader::{download_github_release, unpack, verify_checksum};
use crate::interactive;
use crate::server::start_web_server;
use anyhow::{bail, Context, Result};
use autometrics_am::prometheus;
use axum::body::{self, Body};
use axum::extract::Path as AxumPath;
use axum::response::{IntoResponse, Redirect, Response};
use axum::routing::{any, get};
use axum::Router;
use clap::Parser;
use directories::ProjectDirs;
use futures_util::FutureExt;
use http::{StatusCode, Uri};
use include_dir::{include_dir, Dir};
use indicatif::MultiProgress;
use once_cell::sync::Lazy;
use std::fs::File;
Expand All @@ -23,7 +17,7 @@ use std::time::Duration;
use std::{env, vec};
use tempfile::NamedTempFile;
use tokio::{process, select};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, info, warn};
use url::Url;

// Create a reqwest client that will be used to make HTTP requests. This allows
Expand Down Expand Up @@ -177,9 +171,19 @@ pub async fn handle_command(mut args: Arguments, mp: MultiProgress) -> Result<()
async move { anyhow::Ok(()) }.boxed()
};

if !args.metrics_endpoints.is_empty() {
let endpoints = args
.metrics_endpoints
.iter()
.map(|endpoint| endpoint.to_string())
.collect::<Vec<String>>()
.join(", ");
info!("Now sampling the following endpoints for metrics: {endpoints}");
}

// Start web server for hosting the explorer, am api and proxies to the enabled services.
let listen_address = args.listen_address;
let web_server_task = async move { start_web_server(&listen_address, args).await };
let web_server_task = start_web_server(&listen_address, args.enable_pushgateway);

select! {
biased;
Expand Down Expand Up @@ -481,158 +485,6 @@ async fn start_pushgateway(pushgateway_path: &Path, ephemeral: bool) -> Result<(
Ok(())
}

async fn start_web_server(listen_address: &SocketAddr, args: Arguments) -> Result<()> {
let mut app = Router::new()
// Any calls to the root should be redirected to the explorer which is most likely what the user wants to use.
.route("/", get(|| async { Redirect::temporary("/explorer/") }))
.route(
"/explorer",
get(|| async { Redirect::permanent("/explorer/") }),
)
.route("/explorer/", get(explorer_root_handler))
.route("/explorer/*path", get(explorer_handler))
.route("/prometheus/*path", any(prometheus_handler))
.route("/prometheus", any(prometheus_handler));

if args.enable_pushgateway {
app = app
.route("/pushgateway/*path", any(pushgateway_handler))
.route("/pushgateway", any(pushgateway_handler));
}

let server = axum::Server::try_bind(listen_address)
.with_context(|| format!("failed to bind to {}", listen_address))?
.serve(app.into_make_service());

debug!("Web server listening on {}", server.local_addr());

info!("Explorer endpoint: http://{}", server.local_addr());
info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus");
if args.enable_pushgateway {
info!("Pushgateway endpoint: http://127.0.0.1:9091/pushgateway");
}

if !args.metrics_endpoints.is_empty() {
let endpoints = args
.metrics_endpoints
.iter()
.map(|endpoint| endpoint.to_string())
.collect::<Vec<String>>()
.join(", ");
info!("Now sampling the following endpoints for metrics: {endpoints}");
}

// TODO: Add support for graceful shutdown
// server.with_graceful_shutdown(shutdown_signal()).await?;
server.await?;

Ok(())
}

static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/files/explorer");

/// This will serve the "index.html" file from the explorer directory.
///
/// This needs to be a separate handler since otherwise the Path extractor will
/// fail since the root does not have a path.
async fn explorer_root_handler() -> impl IntoResponse {
serve_explorer("index.html").await
}

/// This will look at the path of the request and serve the corresponding file.
async fn explorer_handler(AxumPath(path): AxumPath<String>) -> impl IntoResponse {
serve_explorer(&path).await
}

/// Server a specific file from the explorer directory. Returns 404 if the file
/// was not found.
async fn serve_explorer(path: &str) -> impl IntoResponse {
trace!(?path, "Serving static file");

match STATIC_DIR.get_file(path) {
None => {
warn!(?path, "Request file was not found in the explorer assets");
StatusCode::NOT_FOUND.into_response()
}
Some(file) => Response::builder()
.status(StatusCode::OK)
.body(body::boxed(body::Full::from(file.contents())))
.map(|res| res.into_response())
.unwrap_or_else(|err| {
error!("Failed to build response: {}", err);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}),
}
}

async fn prometheus_handler(req: http::Request<Body>) -> impl IntoResponse {
let upstream_base = Url::parse("http://localhost:9090").unwrap();
proxy_handler(req, upstream_base).await
}

async fn pushgateway_handler(req: http::Request<Body>) -> impl IntoResponse {
let upstream_base = Url::parse("http://localhost:9091").unwrap();
proxy_handler(req, upstream_base).await
}

async fn proxy_handler(mut req: http::Request<Body>, upstream_base: Url) -> impl IntoResponse {
trace!(req_uri=?req.uri(),method=?req.method(),"Proxying request");

// NOTE: The username/password is not forwarded
let mut url = upstream_base.join(req.uri().path()).unwrap();
url.set_query(req.uri().query());
*req.uri_mut() = Uri::try_from(url.as_str()).unwrap();

let res = CLIENT.execute(req.try_into().unwrap()).await;

match res {
Ok(res) => {
if !res.status().is_success() {
debug!(
"Response from the upstream source returned a non-success status code for {}: {:?}",
res.url(), res.status()
);
}

convert_response(res).into_response()
}
Err(err) => {
error!("Error proxying request: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}

/// Convert a reqwest::Response into a axum_core::Response.
///
/// If the Response builder is unable to create a Response, then it will log the
/// error and return a http status code 500.
///
/// We cannot implement this as an Into or From trait since both types are
/// foreign to this code.
fn convert_response(req: reqwest::Response) -> Response {
let mut builder = http::Response::builder().status(req.status());

// Calling `headers_mut` is safe here because we're constructing a new
// Response from scratch and it will only return `None` if the builder is in
// a Error state.
let headers = builder.headers_mut().unwrap();
for (name, value) in req.headers() {
// Insert all the headers that were in the response from the upstream.
headers.insert(name, value.clone());
}

// TODO: Do we need to rewrite some headers, such as host?

match builder.body(body::StreamBody::from(req.bytes_stream())) {
Ok(res) => res.into_response(),
Err(err) => {
error!("Error converting response: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}

/// Parses the input string into a Url. This uses a custom parser to allow for
/// some more flexible input.
///
Expand Down
1 change: 1 addition & 0 deletions src/bin/am/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod commands;
mod dir;
mod downloader;
mod interactive;
mod server;

#[tokio::main]
async fn main() {
Expand Down
54 changes: 54 additions & 0 deletions src/bin/am/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use anyhow::{Context, Result};
use axum::response::Redirect;
use axum::routing::{any, get};
use axum::{Router, Server};
use std::net::SocketAddr;
use tracing::{debug, info};

mod explorer;
mod prometheus;
mod pushgateway;
mod util;

pub(crate) async fn start_web_server(
listen_address: &SocketAddr,
enable_pushgateway: bool,
) -> Result<()> {
let mut app = Router::new()
// Any calls to the root should be redirected to the explorer which is most likely what the user wants to use.
.route("/", get(|| async { Redirect::temporary("/explorer/") }))
.route(
"/explorer",
get(|| async { Redirect::permanent("/explorer/") }),
)
.route("/explorer/", get(explorer::handler))
.route("/explorer/*path", get(explorer::handler))
.route("/prometheus/*path", any(prometheus::handler))
.route("/prometheus", any(prometheus::handler));

if enable_pushgateway {
app = app
.route("/metrics", any(pushgateway::metrics_proxy_handler))
.route("/pushgateway/*path", any(pushgateway::handler))
.route("/pushgateway", any(pushgateway::handler));
}

let server = Server::try_bind(listen_address)
.with_context(|| format!("failed to bind to {}", listen_address))?
.serve(app.into_make_service());

debug!("Web server listening on {}", server.local_addr());

info!("Explorer endpoint: http://{}", server.local_addr());
info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus");

if enable_pushgateway {
info!("Pushgateway endpoint: http://127.0.0.1:9091/pushgateway");
}

// TODO: Add support for graceful shutdown
// server.with_graceful_shutdown(shutdown_signal()).await?;
server.await?;

Ok(())
}
29 changes: 29 additions & 0 deletions src/bin/am/server/explorer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use axum::body;
use axum::extract::Path;
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use include_dir::{include_dir, Dir};
use tracing::{error, trace, warn};

static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/files/explorer");

pub(crate) async fn handler(optional_path: Option<Path<String>>) -> impl IntoResponse {
let path = optional_path.map_or_else(|| "index.html".to_string(), |path| path.0);

trace!(?path, "Serving static file");

match STATIC_DIR.get_file(&path) {
None => {
warn!(?path, "Request file was not found in the explorer assets");
StatusCode::NOT_FOUND.into_response()
}
Some(file) => Response::builder()
.status(StatusCode::OK)
.body(body::boxed(body::Full::from(file.contents())))
.map(|res| res.into_response())
.unwrap_or_else(|err| {
error!("Failed to build response: {}", err);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}),
}
}
9 changes: 9 additions & 0 deletions src/bin/am/server/prometheus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::server::util::proxy_handler;
use axum::body::Body;
use axum::response::IntoResponse;
use url::Url;

pub(crate) async fn handler(req: http::Request<Body>) -> impl IntoResponse {
let upstream_base = Url::parse("http://localhost:9090").unwrap();
proxy_handler(req, upstream_base).await
}
14 changes: 14 additions & 0 deletions src/bin/am/server/pushgateway.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::server::util::proxy_handler;
use axum::body::Body;
use axum::response::IntoResponse;
use url::Url;

pub(crate) async fn handler(req: http::Request<Body>) -> impl IntoResponse {
let upstream_base = Url::parse("http://localhost:9091").unwrap();
proxy_handler(req, upstream_base).await
}

pub(crate) async fn metrics_proxy_handler(req: http::Request<Body>) -> impl IntoResponse {
let upstream_base = Url::parse("http://localhost:9091/pushgateway/metrics").unwrap();
proxy_handler(req, upstream_base).await
}
68 changes: 68 additions & 0 deletions src/bin/am/server/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::commands::start::CLIENT;
use axum::body;
use axum::body::Body;
use axum::response::{IntoResponse, Response};
use http::{StatusCode, Uri};
use tracing::{debug, error, trace};
use url::Url;

pub(crate) async fn proxy_handler(
mut req: http::Request<Body>,
upstream_base: Url,
) -> impl IntoResponse {
trace!(req_uri=?req.uri(),method=?req.method(),"Proxying request");

// NOTE: The username/password is not forwarded
let mut url = upstream_base.join(req.uri().path()).unwrap();
url.set_query(req.uri().query());
*req.uri_mut() = Uri::try_from(url.as_str()).unwrap();

let res = CLIENT.execute(req.try_into().unwrap()).await;

match res {
Ok(res) => {
if !res.status().is_success() {
debug!(
"Response from the upstream source returned a non-success status code for {}: {:?}",
res.url(), res.status()
);
}

convert_response(res).into_response()
}
Err(err) => {
error!("Error proxying request: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}

/// Convert a reqwest::Response into a axum_core::Response.
///
/// If the Response builder is unable to create a Response, then it will log the
/// error and return a http status code 500.
///
/// We cannot implement this as an Into or From trait since both types are
/// foreign to this code.
pub(crate) fn convert_response(req: reqwest::Response) -> Response {
let mut builder = http::Response::builder().status(req.status());

// Calling `headers_mut` is safe here because we're constructing a new
// Response from scratch and it will only return `None` if the builder is in
// a Error state.
let headers = builder.headers_mut().unwrap();
for (name, value) in req.headers() {
// Insert all the headers that were in the response from the upstream.
headers.insert(name, value.clone());
}

// TODO: Do we need to rewrite some headers, such as host?

match builder.body(body::StreamBody::from(req.bytes_stream())) {
Ok(res) => res.into_response(),
Err(err) => {
error!("Error converting response: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}