From b7c3d219d31179ebedf855d12ccafde8b4e6034e Mon Sep 17 00:00:00 2001 From: Max Kalashnikoff Date: Thu, 26 Oct 2023 00:23:35 +0200 Subject: [PATCH] chore: publisher_service and metrics bootstrap --- src/bin/publisher.rs | 5 ++- src/lib.rs | 1 + src/publisher_service/metrics.rs | 43 +++++++++++++++++++++++ src/publisher_service/mod.rs | 60 ++++++++++++++++++++++++++++++++ src/publisher_service/worker.rs | 3 ++ 5 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 src/publisher_service/metrics.rs create mode 100644 src/publisher_service/mod.rs create mode 100644 src/publisher_service/worker.rs diff --git a/src/bin/publisher.rs b/src/bin/publisher.rs index 43ade483..5845899b 100644 --- a/src/bin/publisher.rs +++ b/src/bin/publisher.rs @@ -10,10 +10,9 @@ async fn main() -> Result<()> { dotenv().ok(); let config = Configuration::new().expect("Failed to load config!"); tracing_subscriber::fmt() - .with_env_filter(config.log_level) + .with_env_filter(config.clone().log_level) .with_span_events(FmtSpan::CLOSE) .with_ansi(std::env::var("ANSI_LOGS").is_ok()) .init(); - Ok(()) - // TODO: Implement publisher service runner + notify_server::publisher_service::bootstrap(config).await } diff --git a/src/lib.rs b/src/lib.rs index edee4b10..31c6d744 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ pub mod migrate; pub mod model; mod networking; mod notify_keys; +pub mod publisher_service; pub mod registry; pub mod spec; pub mod state; diff --git a/src/publisher_service/metrics.rs b/src/publisher_service/metrics.rs new file mode 100644 index 00000000..849ac685 --- /dev/null +++ b/src/publisher_service/metrics.rs @@ -0,0 +1,43 @@ +use { + axum::response::IntoResponse, + hyper::StatusCode, + tracing::error, + wc::metrics::{otel::metrics::Counter, ServiceMetrics}, +}; + +#[derive(Clone)] +pub struct Metrics { + pub processed_notifications: Counter, +} + +impl Metrics { + pub fn new() -> Self { + let meter = wc::metrics::ServiceMetrics::meter(); + let processed_notifications = meter + .u64_counter("processed_notifications") + .with_description("The number of processed notifications") + .init(); + Metrics { + processed_notifications, + } + } +} + +impl Default for Metrics { + fn default() -> Self { + Self::new() + } +} + +pub async fn handler() -> impl IntoResponse { + match ServiceMetrics::export() { + Ok(content) => (StatusCode::OK, content), + Err(e) => { + error!(?e, "Failed to parse metrics"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to parse metrics".to_string(), + ) + } + } +} diff --git a/src/publisher_service/mod.rs b/src/publisher_service/mod.rs new file mode 100644 index 00000000..32859173 --- /dev/null +++ b/src/publisher_service/mod.rs @@ -0,0 +1,60 @@ +use { + super::{config::Configuration, error::Error, Result}, + axum::{routing::get, Router}, + rand::prelude::*, + relay_rpc::auth::ed25519_dalek::Keypair, + sqlx::postgres::PgPoolOptions, + std::{net::SocketAddr, sync::Arc}, + tokio::select, + tracing::{info, warn}, +}; + +pub mod metrics; +pub mod worker; + +fn create_http_client( + key: &Keypair, + http_relay_url: &str, + notify_url: &str, + project_id: &str, +) -> relay_client::http::Client { + let rpc_address = format!("{http_relay_url}/rpc"); + let aud_address = http_relay_url.to_string(); + let auth = relay_rpc::auth::AuthToken::new(notify_url) + .aud(aud_address) + .as_jwt(key) + .unwrap(); + let conn_opts = + relay_client::ConnectionOptions::new(project_id, auth).with_address(rpc_address); + relay_client::http::Client::new(&conn_opts).unwrap() +} + +pub async fn bootstrap(config: Configuration) -> Result<()> { + wc::metrics::ServiceMetrics::init_with_name("notify-publisher-service"); + + let postgres = PgPoolOptions::new().connect(&config.postgres_url).await?; + sqlx::migrate!("./migrations").run(&postgres).await?; + + let seed = sha256::digest(config.keypair_seed.as_bytes()).as_bytes()[..32] + .try_into() + .map_err(|_| Error::InvalidKeypairSeed)?; + let keypair = Keypair::generate(&mut StdRng::from_seed(seed)); + let _http_relay_client = Arc::new(create_http_client( + &keypair, + &config.relay_url.replace("ws", "http"), + &config.notify_url, + &config.project_id, + )); + let telemetry_port = config.telemetry_prometheus_port.unwrap_or(3001); + let telemetry_addr = SocketAddr::from(([0, 0, 0, 0], telemetry_port)); + + info!("Starting metrics server on {}", telemetry_addr); + let telemetry_app = Router::new().route("/metrics", get(metrics::handler)); + + select! { + e = axum::Server::bind(&telemetry_addr).serve(telemetry_app.into_make_service()) => warn!("Metrics server terminated {:?}", e), + e = worker::run() => warn!("Worker process terminated {:?}", e), + } + + Ok(()) +} diff --git a/src/publisher_service/worker.rs b/src/publisher_service/worker.rs new file mode 100644 index 00000000..35aa8f27 --- /dev/null +++ b/src/publisher_service/worker.rs @@ -0,0 +1,3 @@ +pub async fn run() { + // TODO: worker implementation +}