Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions apps/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ edition = "2024"
anyhow = "1.0.98"
async-trait = "0.1.83"
clap = { version = "4.5.40", features = ["cargo", "derive"] }
crossterm = "0.27"
deadpool = "0.12.2"
ed25519-dalek = "2.1.1"
futures = "0.3"
hex = "0.4.3"
libsql = "0.9.18"
peerup = { path = "../../crates/peerup" }
rand = "0.8"
ratatui = "0.26"
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0"
Expand All @@ -28,9 +32,6 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.5"
uuid = { version = "1.17.0", features = ["serde", "v4"] }
zmq = "0.10.0"
peerup = { path = "../../crates/peerup" }
ratatui = "0.26"
crossterm = "0.27"

[dev-dependencies]
tempfile = "3.13"
36 changes: 36 additions & 0 deletions apps/service/config.production.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Production Configuration Example
# Deploy this on internet-facing nodes

[zeromq]
bind = "*"
port = 5555

[preferences]
use_peerup_layer = true
allow_peer_leech = true
minimum_peer_mr = 0
timeout_seconds = 10
degraded_threshold_ms = 1000
location_privacy = "country_only" # Privacy for production
location_update_interval_secs = 3600

[peerup]
# Listen on ports 9000-9010 (ensure firewall allows inbound TCP)
port_range = [9000, 9010]

# Bootstrap peers - replace with your actual bootstrap node addresses
# Format: /ip4/<IP>/tcp/<PORT>/p2p/<PEER_ID>
# or: /dns4/<DOMAIN>/tcp/<PORT>/p2p/<PEER_ID>
bootstrap_peers = [
# Example: "/dns4/bootstrap1.example.com/tcp/9000/p2p/12D3KooWExamplePeerID1",
# Example: "/dns4/bootstrap2.example.com/tcp/9000/p2p/12D3KooWExamplePeerID2",
]

# Disable mDNS for internet-facing nodes (only useful on LAN)
enable_mdns = false

# Enable Kademlia DHT for peer discovery (production standard)
enable_kademlia = true

# Enable relay for NAT traversal (if nodes are behind NAT)
enable_relay = false
47 changes: 47 additions & 0 deletions apps/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum LocationPrivacy {
pub struct Config {
pub zeromq: ZeroMQ,
pub preferences: Preferences,
#[serde(default)]
pub peerup: PeerUPConfig,
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -46,6 +48,50 @@ pub struct Preferences {
pub location_privacy: LocationPrivacy,
}

/// PeerUP P2P network configuration
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PeerUPConfig {
/// Port range for P2P networking (min, max)
#[serde(default = "default_peerup_port_range")]
pub port_range: (u16, u16),
/// Enable mDNS for local peer discovery
#[serde(default = "default_true")]
pub enable_mdns: bool,
/// Enable Kademlia DHT for wide-area peer discovery
#[serde(default = "default_true")]
pub enable_kademlia: bool,
/// Enable relay for NAT traversal
#[serde(default = "default_false")]
pub enable_relay: bool,
/// Bootstrap peers (multiaddrs as strings)
#[serde(default)]
pub bootstrap_peers: Vec<String>,
}

fn default_peerup_port_range() -> (u16, u16) {
(9000, 9010)
}

fn default_true() -> bool {
true
}

fn default_false() -> bool {
false
}

impl Default for PeerUPConfig {
fn default() -> Self {
Self {
port_range: default_peerup_port_range(),
enable_mdns: true,
enable_kademlia: true,
enable_relay: false,
bootstrap_peers: Vec::new(),
}
}
}

fn default_location_update_interval() -> u64 {
300 // 5 minutes default for mobile devices
}
Expand Down Expand Up @@ -91,6 +137,7 @@ impl Default for Config {
location_update_interval_secs: 300,
location_privacy: LocationPrivacy::Full,
},
peerup: PeerUPConfig::default(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions apps/service/src/crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pub mod verification;

pub use keys::{KeyPair, load_or_generate_keypair};
pub use signing::sign_result;
pub use verification::verify_result;
69 changes: 69 additions & 0 deletions apps/service/src/database/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,72 @@ pub struct PeerResult {
pub country: Option<String>,
pub region: Option<String>,
}

impl PeerResult {
/// Create a new peer result from a P2P received result
pub fn from_p2p_result(p2p_result: &crate::p2p::PeerResult) -> Option<Self> {
// Extract signature or return None if missing
let signature = p2p_result.signature.clone()?;

Self {
id: None,
monitor_uuid: p2p_result.result.monitor_id,
timestamp: p2p_result.result.timestamp,
status: p2p_result.result.status,
latency_ms: p2p_result.result.latency_ms,
status_code: p2p_result.result.status_code,
error_message: p2p_result.result.error_message.clone(),
peer_id: p2p_result.peer_id.clone(),
signature,
verified: false, // Will be verified later
created_at: p2p_result.received_at,
city: None, // TODO: Add geolocation lookup
country: None,
region: None,
}
.into()
}
Comment on lines +113 to +134
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The from_p2p_result method returns None if signature is missing, but the caller in orchestrator (line 253) only logs a warning when None is returned and doesn't distinguish between missing signature vs other errors. This makes debugging harder. Consider logging a more specific message when signature is missing.

Copilot uses AI. Check for mistakes.
}

/// Peer metadata persisted from P2P discovery/connect events
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Peer {
pub peer_id: String,
pub status: String,
pub last_seen: SystemTime,
pub joined_at: SystemTime,
pub contribution_score: f64,
pub uptime_percentage: f64,
pub checks_per_day: i64,
pub location_city: Option<String>,
pub location_region: Option<String>,
pub location_country: Option<String>,
}

impl Peer {
pub fn new_online(peer_id: String, now: SystemTime) -> Self {
Self {
peer_id,
status: "online".to_string(),
last_seen: now,
joined_at: now,
contribution_score: 1.0,
uptime_percentage: 100.0,
checks_per_day: 0,
location_city: None,
location_region: None,
location_country: None,
}
}
}

/// Snapshot of network metrics stored periodically
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkStats {
pub timestamp: SystemTime,
pub total_peers: i64,
pub online_peers: i64,
pub checks_performed: i64,
pub checks_received: i64,
pub bandwidth_used_mb: i64,
}
105 changes: 104 additions & 1 deletion apps/service/src/database/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use libsql::{Connection, params};
use std::sync::Arc;
use uuid::Uuid;

use super::models::{Monitor, MonitorResult, PeerResult};
use super::models::{Monitor, MonitorResult, NetworkStats, Peer, PeerResult};
use crate::monitoring::types::CheckResult;
use crate::pool::LibsqlPool;

Expand Down Expand Up @@ -39,6 +39,18 @@ pub trait Database: Send + Sync {

/// Get peer results for a monitor
async fn get_peer_results(&self, monitor_uuid: Uuid, limit: usize) -> Result<Vec<PeerResult>>;

/// Upsert peer metadata
async fn upsert_peer(&self, peer: &Peer) -> Result<()>;

/// Mark peer offline
async fn mark_peer_offline(&self, peer_id: &str, now: std::time::SystemTime) -> Result<()>;

/// Insert network stats snapshot
async fn insert_network_stats(&self, stats: &NetworkStats) -> Result<i64>;

/// Get latest network stats
async fn get_latest_network_stats(&self) -> Result<Option<NetworkStats>>;
}

/// LibSQL database implementation
Expand Down Expand Up @@ -355,4 +367,95 @@ impl Database for DatabaseImpl {

Ok(results)
}

async fn upsert_peer(&self, peer: &Peer) -> Result<()> {
let conn = self.get_conn().await?;
let last_seen = Monitor::timestamp_to_i64(peer.last_seen);
let joined_at = Monitor::timestamp_to_i64(peer.joined_at);

conn.execute(
"INSERT INTO peers (peer_id, status, last_seen, joined_at, contribution_score, \
uptime_percentage, checks_per_day, location_city, location_region, location_country)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(peer_id) DO UPDATE SET status=excluded.status, \
last_seen=excluded.last_seen",
params![
peer.peer_id.clone(),
peer.status.clone(),
last_seen,
joined_at,
peer.contribution_score,
peer.uptime_percentage,
peer.checks_per_day,
peer.location_city.clone(),
peer.location_region.clone(),
peer.location_country.clone()
],
)
.await?;

Ok(())
}

async fn mark_peer_offline(&self, peer_id: &str, now: std::time::SystemTime) -> Result<()> {
let conn = self.get_conn().await?;
let ts = Monitor::timestamp_to_i64(now);

conn.execute(
"UPDATE peers SET status = 'offline', last_seen = ? WHERE peer_id = ?",
params![ts, peer_id],
)
.await?;

Ok(())
}

async fn insert_network_stats(&self, stats: &NetworkStats) -> Result<i64> {
let conn = self.get_conn().await?;
let ts = Monitor::timestamp_to_i64(stats.timestamp);

conn.execute(
"INSERT INTO network_stats (timestamp, total_peers, online_peers, checks_performed, \
checks_received, bandwidth_used_mb)
VALUES (?, ?, ?, ?, ?, ?)",
params![
ts,
stats.total_peers,
stats.online_peers,
stats.checks_performed,
stats.checks_received,
stats.bandwidth_used_mb
],
)
.await?;

Ok(conn.last_insert_rowid())
}

async fn get_latest_network_stats(&self) -> Result<Option<NetworkStats>> {
let conn = self.get_conn().await?;
let mut stmt = conn
.prepare(
"SELECT timestamp, total_peers, online_peers, checks_performed, checks_received, \
bandwidth_used_mb
FROM network_stats ORDER BY timestamp DESC LIMIT 1",
)
.await?;

let mut rows = stmt.query(()).await?;

if let Some(row) = rows.next().await? {
let ts: i64 = row.get(0)?;
Ok(Some(NetworkStats {
timestamp: Monitor::i64_to_timestamp(ts),
total_peers: row.get(1)?,
online_peers: row.get(2)?,
checks_performed: row.get(3)?,
checks_received: row.get(4)?,
bandwidth_used_mb: row.get(5)?,
}))
} else {
Ok(None)
}
}
}
14 changes: 12 additions & 2 deletions apps/service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,12 @@ async fn main() -> anyhow::Result<()> {
Commands::Run => {
tracing::info!("Starting Uppe. service...");
tracing::info!("P2P network enabled: {}", cfg.preferences.use_peerup_layer);
orchestrator::Orchestrator::start(cfg, pool).await?;

// Use LocalSet for P2P network (libp2p Swarm is !Send)
let local = tokio::task::LocalSet::new();
local
.run_until(async move { orchestrator::Orchestrator::start(cfg, pool).await })
.await?;
}
Commands::Migrate => {
tracing::info!("Running database migrations...");
Expand Down Expand Up @@ -252,7 +257,12 @@ async fn main() -> anyhow::Result<()> {
"unknown".to_string()
};
let p2p_enabled = cfg.preferences.use_peerup_layer;
tui::run_tui_with_p2p(pool, peer_id, p2p_enabled).await?;

// Use LocalSet for P2P network (libp2p Swarm is !Send)
let local = tokio::task::LocalSet::new();
local
.run_until(async move { tui::run_tui_with_p2p(pool, peer_id, p2p_enabled).await })
.await?;
}
}

Expand Down
Loading
Loading