Skip to content

Commit

Permalink
chore: publisher_service and metrics bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Oct 25, 2023
1 parent 42307a3 commit b7c3d21
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 3 deletions.
5 changes: 2 additions & 3 deletions src/bin/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ async fn main() -> Result<()> {
dotenv().ok();
let config = Configuration::new().expect("Failed to load config!");
tracing_subscriber::fmt()
.with_env_filter(config.log_level)
.with_env_filter(config.clone().log_level)
.with_span_events(FmtSpan::CLOSE)
.with_ansi(std::env::var("ANSI_LOGS").is_ok())
.init();
Ok(())
// TODO: Implement publisher service runner
notify_server::publisher_service::bootstrap(config).await
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod migrate;
pub mod model;
mod networking;
mod notify_keys;
pub mod publisher_service;
pub mod registry;
pub mod spec;
pub mod state;
Expand Down
43 changes: 43 additions & 0 deletions src/publisher_service/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use {
axum::response::IntoResponse,
hyper::StatusCode,
tracing::error,
wc::metrics::{otel::metrics::Counter, ServiceMetrics},
};

#[derive(Clone)]
pub struct Metrics {
pub processed_notifications: Counter<u64>,
}

impl Metrics {
pub fn new() -> Self {
let meter = wc::metrics::ServiceMetrics::meter();
let processed_notifications = meter
.u64_counter("processed_notifications")
.with_description("The number of processed notifications")
.init();
Metrics {
processed_notifications,
}
}
}

impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}

pub async fn handler() -> impl IntoResponse {
match ServiceMetrics::export() {
Ok(content) => (StatusCode::OK, content),
Err(e) => {
error!(?e, "Failed to parse metrics");
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to parse metrics".to_string(),
)
}
}
}
60 changes: 60 additions & 0 deletions src/publisher_service/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use {
super::{config::Configuration, error::Error, Result},
axum::{routing::get, Router},
rand::prelude::*,
relay_rpc::auth::ed25519_dalek::Keypair,
sqlx::postgres::PgPoolOptions,
std::{net::SocketAddr, sync::Arc},
tokio::select,
tracing::{info, warn},
};

pub mod metrics;
pub mod worker;

fn create_http_client(
key: &Keypair,
http_relay_url: &str,
notify_url: &str,
project_id: &str,
) -> relay_client::http::Client {
let rpc_address = format!("{http_relay_url}/rpc");
let aud_address = http_relay_url.to_string();
let auth = relay_rpc::auth::AuthToken::new(notify_url)
.aud(aud_address)
.as_jwt(key)
.unwrap();
let conn_opts =
relay_client::ConnectionOptions::new(project_id, auth).with_address(rpc_address);
relay_client::http::Client::new(&conn_opts).unwrap()
}

pub async fn bootstrap(config: Configuration) -> Result<()> {
wc::metrics::ServiceMetrics::init_with_name("notify-publisher-service");

let postgres = PgPoolOptions::new().connect(&config.postgres_url).await?;
sqlx::migrate!("./migrations").run(&postgres).await?;

let seed = sha256::digest(config.keypair_seed.as_bytes()).as_bytes()[..32]
.try_into()
.map_err(|_| Error::InvalidKeypairSeed)?;
let keypair = Keypair::generate(&mut StdRng::from_seed(seed));
let _http_relay_client = Arc::new(create_http_client(
&keypair,
&config.relay_url.replace("ws", "http"),
&config.notify_url,
&config.project_id,
));
let telemetry_port = config.telemetry_prometheus_port.unwrap_or(3001);
let telemetry_addr = SocketAddr::from(([0, 0, 0, 0], telemetry_port));

info!("Starting metrics server on {}", telemetry_addr);
let telemetry_app = Router::new().route("/metrics", get(metrics::handler));

select! {
e = axum::Server::bind(&telemetry_addr).serve(telemetry_app.into_make_service()) => warn!("Metrics server terminated {:?}", e),
e = worker::run() => warn!("Worker process terminated {:?}", e),
}

Ok(())
}
3 changes: 3 additions & 0 deletions src/publisher_service/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub async fn run() {
// TODO: worker implementation
}

0 comments on commit b7c3d21

Please sign in to comment.