diff --git a/src/scheduler.rs b/src/scheduler.rs index 67399de9..e8c35492 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -12,7 +12,7 @@ use chrono::{TimeDelta, Utc}; use config::*; use mostro_core::prelude::*; use nostr_sdk::EventBuilder; -use nostr_sdk::{Kind as NostrKind, Tag, Timestamp}; +use nostr_sdk::{Kind as NostrKind, Tag}; use sqlx_crud::Crud; use std::sync::Arc; use tokio::sync::RwLock; @@ -514,6 +514,32 @@ async fn job_update_bitcoin_prices() { }); } +/// Parse the unix timestamp from a PENDING marker. +/// +/// Marker format: `PENDING-{uuid}-{unix_timestamp}` +/// Legacy format: `PENDING-{uuid}` (no timestamp) → returns `None` +/// +/// Returns `Some(timestamp)` if a valid unix timestamp is found at the end, +/// `None` otherwise. +fn parse_pending_timestamp(marker: &str) -> Option { + let stripped = marker.strip_prefix("PENDING-")?; + + // Expected format: {uuid}-{unix_timestamp} + // UUID is exactly 36 chars (8-4-4-4-12 hex digits with dashes). + // The timestamp follows after the UUID and a separating dash. + if stripped.len() <= 37 { + return None; + } + + // Verify the 37th char (index 36) is a dash separating UUID from timestamp + if stripped.as_bytes().get(36) != Some(&b'-') { + return None; + } + + let ts_str = &stripped[37..]; + ts_str.parse::().ok().filter(|&ts| ts > 1_000_000_000) +} + /// Processes unpaid development fees for completed orders /// /// Runs every 60 seconds and attempts to pay dev fees for orders that have: @@ -535,54 +561,83 @@ async fn job_process_dev_fee_payment() { info!("Checking for unpaid development fees"); // Cleanup stale PENDING entries (crash recovery) - let cleanup_ttl = 300; // 5 minutes in seconds - let cutoff_time = Timestamp::now() - cleanup_ttl; + // Uses the timestamp embedded in the PENDING marker (format: PENDING-{uuid}-{unix_ts}) + // to determine staleness, rather than taken_at which reflects order creation time. + // Legacy markers without timestamps are treated as stale for backward compatibility. + let cleanup_ttl_secs: u64 = 300; // 5 minutes + let now_unix = Utc::now().timestamp() as u64; - if let Ok(stale_orders) = sqlx::query_as::<_, Order>( + if let Ok(pending_orders) = sqlx::query_as::<_, Order>( "SELECT * FROM orders - WHERE dev_fee_payment_hash LIKE 'PENDING-%' - AND taken_at > 0 - AND taken_at < ?1", + WHERE dev_fee_payment_hash LIKE 'PENDING-%'", ) - .bind(cutoff_time.as_u64() as i64) .fetch_all(&*pool) .await { - if !stale_orders.is_empty() { - warn!( - "Found {} stale PENDING dev fee orders (older than {}s), resetting", - stale_orders.len(), - cleanup_ttl - ); + let mut stale_count = 0u32; + + for mut pending_order in pending_orders { + let order_id = pending_order.id; + let marker = pending_order + .dev_fee_payment_hash + .as_deref() + .unwrap_or_default(); + + // Parse timestamp from marker: PENDING-{uuid}-{unix_ts} + // Legacy format (PENDING-{uuid}) has no timestamp → treat as stale + let pending_ts = parse_pending_timestamp(marker); + + let is_stale = match pending_ts { + Some(ts) => now_unix.saturating_sub(ts) >= cleanup_ttl_secs, + None => { + // Legacy marker without timestamp — treat as stale + warn!( + "Order {} has legacy PENDING marker without timestamp, treating as stale", + order_id + ); + true + } + }; - for mut stale_order in stale_orders { - let order_id = stale_order.id; - let age_seconds = Timestamp::now().as_u64() - stale_order.taken_at as u64; + if !is_stale { + continue; + } - warn!( - "Resetting stale PENDING order {} (age: {}s)", - order_id, age_seconds - ); + stale_count += 1; + let age_display = pending_ts + .map(|ts| format!("{}s", now_unix.saturating_sub(ts))) + .unwrap_or_else(|| "unknown (legacy)".to_string()); + + warn!( + "Resetting stale PENDING order {} (age: {})", + order_id, age_display + ); - stale_order.dev_fee_paid = false; - stale_order.dev_fee_payment_hash = None; + pending_order.dev_fee_paid = false; + pending_order.dev_fee_payment_hash = None; - match stale_order.update(&pool).await { - Ok(_) => { - info!( - "✅ Reset stale PENDING for order {}, will retry payment", - order_id - ); - } - Err(e) => { - error!( - "Failed to reset stale PENDING for order {}: {:?}", - order_id, e - ); - } + match pending_order.update(&pool).await { + Ok(_) => { + info!( + "✅ Reset stale PENDING for order {}, will retry payment", + order_id + ); + } + Err(e) => { + error!( + "Failed to reset stale PENDING for order {}: {:?}", + order_id, e + ); } } } + + if stale_count > 0 { + warn!( + "Reset {} stale PENDING dev fee orders (TTL: {}s)", + stale_count, cleanup_ttl_secs + ); + } } // Query unpaid orders @@ -631,7 +686,9 @@ async fn job_process_dev_fee_payment() { let order_id = order.id; info!("Pre-marking order {} as payment pending", order_id); order.dev_fee_paid = true; - order.dev_fee_payment_hash = Some(format!("PENDING-{}", order_id)); + let pending_ts = Utc::now().timestamp() as u64; + order.dev_fee_payment_hash = + Some(format!("PENDING-{}-{}", order_id, pending_ts)); let mut order = match order.update(&pool).await { Err(e) => { @@ -740,31 +797,26 @@ async fn job_process_dev_fee_payment() { } } Err(_) => { - // STEP 5: Timeout, reset to unpaid for retry + // STEP 5: Timeout — DO NOT reset to unpaid. + // + // A timeout does NOT mean the payment failed. The Lightning + // payment may still be in-flight and could succeed after the + // timeout window. Resetting dev_fee_paid=false here would + // cause a duplicate payment on the next scheduler cycle. + // + // Instead, leave the order in PENDING state (dev_fee_paid=true, + // dev_fee_payment_hash="PENDING-{uuid}-{ts}"). The stale + // PENDING detection (with timestamp-based TTL) will clean it + // up after the configured TTL if the payment never resolves. + // + // See: https://github.com/MostroP2P/mostro/issues/568 let order_id = order.id; let dev_fee = order.dev_fee; - error!( - "Dev fee payment timeout (50s) for order {} ({} sats)", + warn!( + "Dev fee payment timeout (50s) for order {} ({} sats). \ + Leaving in PENDING state — stale cleanup will handle if payment never resolves.", order_id, dev_fee ); - - order.dev_fee_paid = false; - order.dev_fee_payment_hash = None; - - match order.update(&pool).await { - Err(e) => { - error!( - "Failed to reset after timeout for order {}: {:?}", - order_id, e - ); - } - Ok(_) => { - info!( - "Reset order {} to unpaid after timeout, will retry", - order_id - ); - } - } } } } @@ -774,3 +826,56 @@ async fn job_process_dev_fee_payment() { } }); } + +#[cfg(test)] +mod tests { + use super::parse_pending_timestamp; + + #[test] + fn test_parse_new_format_with_uuid() { + let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-1707700000"; + assert_eq!(parse_pending_timestamp(marker), Some(1707700000)); + } + + #[test] + fn test_parse_legacy_format_uuid() { + // Legacy format without timestamp → None + let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000"; + assert_eq!(parse_pending_timestamp(marker), None); + } + + #[test] + fn test_parse_not_pending() { + assert_eq!(parse_pending_timestamp("some-random-hash"), None); + assert_eq!(parse_pending_timestamp(""), None); + } + + #[test] + fn test_parse_plain_pending() { + assert_eq!(parse_pending_timestamp("PENDING"), None); + assert_eq!(parse_pending_timestamp("PENDING-"), None); + } + + #[test] + fn test_parse_invalid_timestamp() { + let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-notanumber"; + assert_eq!(parse_pending_timestamp(marker), None); + } + + #[test] + fn test_parse_too_small_timestamp() { + // Timestamps < 1_000_000_000 (before ~2001) are rejected + let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-12345"; + assert_eq!(parse_pending_timestamp(marker), None); + } + + #[test] + fn test_parse_current_timestamp() { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let marker = format!("PENDING-550e8400-e29b-41d4-a716-446655440000-{}", now); + assert_eq!(parse_pending_timestamp(&marker), Some(now)); + } +}