Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions nym-node-status-api/nym-node-status-agent/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ set -a
source "${monorepo_root}/envs/${ENVIRONMENT}.env"
set +a

if [ -z "$NYM_NODE_MNEMONICS" ]; then
echo "NYM_NODE_MNEMONICS is required to run an agent"
exit 1
fi

export RUST_LOG="info"
export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
export NODE_STATUS_AGENT_SERVER_PORT="8000"
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
NODE_STATUS_AGENT_SERVER_PORT="8000"
SERVER="${NODE_STATUS_AGENT_SERVER_ADDRESS}|${NODE_STATUS_AGENT_SERVER_PORT}"
export NODE_STATUS_AGENT_AUTH_KEY="BjyC9SsHAZUzPRkQR4sPTvVrp4GgaquTh5YfSJksvvWT"
export NODE_STATUS_AGENT_PROBE_MNEMONIC="$MNEMONIC"
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
export NODE_STATUS_AGENT_PROBE_EXTRA_ARGS="netstack-download-timeout-sec=30,netstack-num-ping=2,netstack-send-timeout-sec=1,netstack-recv-timeout-sec=1"

workers=${1:-1}
Expand All @@ -48,7 +53,7 @@ function swarm() {
local workers=$1

for ((i = 1; i <= workers; i++)); do
${monorepo_root}/target/release/nym-node-status-agent run-probe &
${monorepo_root}/target/release/nym-node-status-agent run-probe --server ${SERVER} &
done

wait
Expand Down
2 changes: 1 addition & 1 deletion nym-node-status-api/nym-node-status-agent/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) struct Args {
#[derive(Subcommand, Debug)]
pub(crate) enum Command {
RunProbe {
/// Server configurations in format "address:port:auth_key"
/// Server configurations in format "address|port"
/// Can be specified multiple times for multiple servers
#[arg(short, long, required = true)]
server: Vec<String>,
Expand Down
3 changes: 2 additions & 1 deletion nym-node-status-api/nym-node-status-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[package]
name = "nym-node-status-api"
version = "3.2.2"
version = "3.3.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
Expand All @@ -21,6 +21,7 @@ celes = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
cosmwasm-std = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
itertools = { workspace = true }
moka = { workspace = true, features = ["future"] }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function run_bare() {
echo "RUST_LOG=${RUST_LOG}"

# --conection-url is provided in build.rs
cargo run --package nym-node-status-api
cargo run --package nym-node-status-api --no-default-features --features sqlite
}

function run_docker() {
Expand Down
49 changes: 43 additions & 6 deletions nym-node-status-api/nym-node-status-api/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,45 @@ pub(crate) struct Cli {

/// Nym api client timeout.
#[clap(long, default_value = "15", env = "NYM_API_CLIENT_TIMEOUT")]
#[arg(value_parser = parse_duration)]
#[arg(value_parser = parse_duration_std)]
pub(crate) nym_api_client_timeout: Duration,

/// Connection url for the database.
#[clap(long, env = "DATABASE_URL")]
pub(crate) database_url: String,

#[clap(long, default_value = "5", env = "SQLX_BUSY_TIMEOUT_S")]
#[arg(value_parser = parse_duration)]
#[arg(value_parser = parse_duration_std)]
pub(crate) sqlx_busy_timeout_s: Duration,

#[clap(
long,
default_value = "300",
env = "NODE_STATUS_API_MONITOR_REFRESH_INTERVAL"
)]
#[arg(value_parser = parse_duration)]
#[arg(value_parser = parse_duration_std)]
pub(crate) monitor_refresh_interval: Duration,

#[clap(
long,
default_value = "300",
env = "NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL"
)]
#[arg(value_parser = parse_duration)]
#[arg(value_parser = parse_duration_std)]
pub(crate) testruns_refresh_interval: Duration,

#[clap(long, default_value = "86400", env = "NODE_STATUS_API_GEODATA_TTL")]
#[arg(value_parser = parse_duration)]
#[arg(value_parser = parse_duration_std)]
pub(crate) geodata_ttl: Duration,

#[clap(env = "NODE_STATUS_API_AGENT_KEY_LIST")]
#[arg(value_delimiter = ',')]
pub(crate) agent_key_list: Vec<String>,

#[clap(long, default_value = "120s", env = "AGENT_REQUEST_FRESHNESS")]
#[arg(value_parser = parse_duration_humantime)]
pub(crate) agent_request_freshness: time::Duration,

#[clap(
long,
default_value_t = 10,
Expand All @@ -92,7 +96,40 @@ pub(crate) struct Cli {
pub(crate) max_agent_count: i64,
}

fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
fn parse_duration_humantime(arg: &str) -> Result<time::Duration, anyhow::Error> {
let std_duration = match humantime::parse_duration(arg) {
Ok(duration) => duration,
// assume old format (seconds) as a fallback
Err(_) => parse_duration_std(arg)?,
};

Ok(time::Duration::seconds(std_duration.as_secs() as i64))
}

fn parse_duration_std(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
let seconds = arg.parse()?;
Ok(std::time::Duration::from_secs(seconds))
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn humantime_should_work() {
let should_parse = [("120s", 120), ("120", 120), ("0s", 0), ("0", 0)];

for (raw, expected) in should_parse {
if let Ok(parsed) = parse_duration_humantime(raw) {
assert_eq!(parsed.whole_seconds(), expected);
} else {
panic!("Failed to parse {raw}")
}
}

let should_not_parse = ["0.1s", "-15s"];
for raw in should_not_parse {
assert!(parse_duration_humantime(raw).is_err());
}
}
}
53 changes: 5 additions & 48 deletions nym-node-status-api/nym-node-status-api/src/http/api/testruns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ use axum::{
extract::{Path, State},
Router,
};
use nym_node_status_client::{
auth::VerifiableRequest,
models::{get_testrun, submit_results, submit_results_v2},
};
use nym_node_status_client::models::{get_testrun, submit_results, submit_results_v2};
use reqwest::StatusCode;
use tracing::warn;

// TODO dz consider adding endpoint to trigger testrun scan for a given gateway_id
// like in H< src/http/testruns.rs
Expand All @@ -40,8 +36,8 @@ async fn request_testrun(
Json(request): Json<get_testrun::GetTestrunRequest>,
) -> HttpResult<Json<TestrunAssignment>> {
// TODO dz log agent's network probe version
authenticate(&request, &state)?;
is_fresh(&request.payload.timestamp)?;
state.authenticate_agent_submission(&request)?;
state.is_fresh(&request.payload.timestamp)?;

tracing::debug!("Agent requested testrun");

Expand Down Expand Up @@ -87,7 +83,7 @@ async fn submit_testrun(
State(state): State<AppState>,
Json(submitted_result): Json<submit_results::SubmitResults>,
) -> HttpResult<StatusCode> {
authenticate(&submitted_result, &state)?;
state.authenticate_agent_submission(&submitted_result)?;

let db = state.db_pool();
let mut conn = db
Expand Down Expand Up @@ -189,8 +185,7 @@ async fn submit_testrun_v2(
State(state): State<AppState>,
Json(submission): Json<submit_results_v2::SubmitResultsV2>,
) -> HttpResult<StatusCode> {
authenticate(&submission, &state)?;
is_fresh(&submission.payload.assigned_at_utc)?;
state.authenticate_agent_submission(&submission)?;

let db = state.db_pool();
let mut conn = db
Expand Down Expand Up @@ -249,44 +244,6 @@ async fn submit_testrun_v2(
}
}

// TODO dz this should be middleware
#[tracing::instrument(level = "debug", skip_all)]
fn authenticate(request: &impl VerifiableRequest, state: &AppState) -> HttpResult<()> {
if !state.is_registered(request.public_key()) {
tracing::warn!("Public key not registered with NS API, rejecting");
return Err(HttpError::unauthorized());
};

request.verify_signature().map_err(|_| {
tracing::warn!("Signature verification failed, rejecting");
HttpError::unauthorized()
})?;

Ok(())
}

static FRESHNESS_CUTOFF: time::Duration = time::Duration::minutes(2);

fn is_fresh(request_time: &i64) -> HttpResult<()> {
// if a request took longer than N minutes to reach NS API, something is very wrong
let request_time = time::UtcDateTime::from_unix_timestamp(*request_time).map_err(|e| {
warn!("Failed to parse request time: {e}");
HttpError::unauthorized()
})?;

let cutoff_timestamp = now_utc() - FRESHNESS_CUTOFF;
if request_time < cutoff_timestamp {
warn!(
"Request time {} is older than cutoff {} ({}s ago), rejecting",
request_time,
cutoff_timestamp,
FRESHNESS_CUTOFF.whole_seconds()
);
return Err(HttpError::unauthorized());
}
Ok(())
}

fn get_result_from_log(log: &str) -> String {
static RE: std::sync::LazyLock<regex::Regex> =
std::sync::LazyLock::new(|| regex::Regex::new(r"\n\{\s").expect("Invalid regex pattern"));
Expand Down
3 changes: 3 additions & 0 deletions nym-node-status-api/nym-node-status-api/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use crate::{

/// Return handles that allow for graceful shutdown of server + awaiting its
/// background tokio task
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_http_api(
db_pool: DbPool,
http_port: u16,
nym_http_cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
agent_request_freshness_requirement: time::Duration,
node_geocache: NodeGeoCache,
node_delegations: Arc<RwLock<DelegationsCache>>,
) -> anyhow::Result<ShutdownHandles> {
Expand All @@ -29,6 +31,7 @@ pub(crate) async fn start_http_api(
nym_http_cache_ttl,
agent_key_list,
agent_max_count,
agent_request_freshness_requirement,
node_geocache,
node_delegations,
)
Expand Down
49 changes: 47 additions & 2 deletions nym-node-status-api/nym-node-status-api/src/http/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use nym_bin_common::bin_info_owned;
use nym_contracts_common::NaiveFloat;
use nym_crypto::asymmetric::ed25519::PublicKey;
use nym_mixnet_contract_common::NodeId;
use nym_node_status_client::auth::VerifiableRequest;
use nym_validator_client::nym_api::SkimmedNode;
use semver::Version;
use serde::{Deserialize, Serialize};
Expand All @@ -17,8 +18,11 @@ use utoipa::ToSchema;
use super::models::SessionStats;
use crate::{
db::{queries, DbPool},
http::models::{
DVpnGateway, DailyStats, ExtendedNymNode, Gateway, Mixnode, NodeGeoData, SummaryHistory,
http::{
error::{HttpError, HttpResult},
models::{
DVpnGateway, DailyStats, ExtendedNymNode, Gateway, Mixnode, NodeGeoData, SummaryHistory,
},
},
monitor::{DelegationsCache, NodeGeoCache},
};
Expand All @@ -31,6 +35,7 @@ pub(crate) struct AppState {
cache: HttpCache,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
agent_request_freshness_requirement: time::Duration,
node_geocache: NodeGeoCache,
node_delegations: Arc<RwLock<DelegationsCache>>,
bin_info: BinaryInfo,
Expand All @@ -42,6 +47,7 @@ impl AppState {
cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
agent_request_freshness_requirement: time::Duration,
node_geocache: NodeGeoCache,
node_delegations: Arc<RwLock<DelegationsCache>>,
) -> Self {
Expand All @@ -50,6 +56,7 @@ impl AppState {
cache: HttpCache::new(cache_ttl).await,
agent_key_list,
agent_max_count,
agent_request_freshness_requirement,
node_geocache,
node_delegations,
bin_info: BinaryInfo::new(),
Expand Down Expand Up @@ -94,6 +101,44 @@ impl AppState {
pub(crate) fn build_information(&self) -> &BinaryBuildInformationOwned {
&self.bin_info.build_info
}

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn authenticate_agent_submission(
&self,
request: &impl VerifiableRequest,
) -> HttpResult<()> {
if !self.is_registered(request.public_key()) {
tracing::warn!("Public key not registered with NS API, rejecting");
return Err(HttpError::unauthorized());
};

request.verify_signature().map_err(|_| {
tracing::warn!("Signature verification failed, rejecting");
HttpError::unauthorized()
})?;

Ok(())
}

pub(crate) fn is_fresh(&self, request_time: &i64) -> HttpResult<()> {
// if a request took longer than N minutes to reach NS API, something is very wrong
let request_time = time::UtcDateTime::from_unix_timestamp(*request_time).map_err(|e| {
warn!("Failed to parse request time: {e}");
HttpError::unauthorized()
})?;

let cutoff_timestamp = crate::utils::now_utc() - self.agent_request_freshness_requirement;
if request_time < cutoff_timestamp {
warn!(
"Request time {} is older than cutoff {} ({}s ago), rejecting",
request_time,
cutoff_timestamp,
self.agent_request_freshness_requirement.whole_seconds()
);
return Err(HttpError::unauthorized());
}
Ok(())
}
}

static GATEWAYS_LIST_KEY: &str = "gateways";
Expand Down
1 change: 1 addition & 0 deletions nym-node-status-api/nym-node-status-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async fn main() -> anyhow::Result<()> {
args.nym_http_cache_ttl,
agent_key_list.to_owned(),
args.max_agent_count,
args.agent_request_freshness,
geocache,
delegations_cache,
)
Expand Down
Loading
Loading