Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 170 additions & 59 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use chrono::{TimeDelta, Utc};
use config::*;
use mostro_core::prelude::*;
use nostr_sdk::EventBuilder;
use nostr_sdk::{Kind as NostrKind, Tag, Timestamp};
use nostr_sdk::{Kind as NostrKind, Tag};
use sqlx_crud::Crud;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -517,6 +517,32 @@ async fn job_update_bitcoin_prices() {
/// Processes unpaid development fees for completed orders
///
/// Runs every 60 seconds and attempts to pay dev fees for orders that have:
/// Parse the unix timestamp from a PENDING marker.
///
/// Marker format: `PENDING-{uuid}-{unix_timestamp}`
/// Legacy format: `PENDING-{uuid}` (no timestamp) → returns `None`
///
/// Returns `Some(timestamp)` if a valid unix timestamp is found at the end,
/// `None` otherwise.
fn parse_pending_timestamp(marker: &str) -> Option<u64> {
let stripped = marker.strip_prefix("PENDING-")?;

// Expected format: {uuid}-{unix_timestamp}
// UUID is exactly 36 chars (8-4-4-4-12 hex digits with dashes).
// The timestamp follows after the UUID and a separating dash.
if stripped.len() <= 37 {
return None;
}

// Verify the 37th char (index 36) is a dash separating UUID from timestamp
if stripped.as_bytes().get(36) != Some(&b'-') {
return None;
}

let ts_str = &stripped[37..];
ts_str.parse::<u64>().ok().filter(|&ts| ts > 1_000_000_000)
}

/// - status = 'settled-hold-invoice' OR 'success'
/// - dev_fee > 0
/// - dev_fee_paid = false
Expand All @@ -535,54 +561,86 @@ async fn job_process_dev_fee_payment() {
info!("Checking for unpaid development fees");

// Cleanup stale PENDING entries (crash recovery)
let cleanup_ttl = 300; // 5 minutes in seconds
let cutoff_time = Timestamp::now() - cleanup_ttl;

if let Ok(stale_orders) = sqlx::query_as::<_, Order>(
// Uses the timestamp embedded in the PENDING marker (format: PENDING-{uuid}-{unix_ts})
// to determine staleness, rather than taken_at which reflects order creation time.
// Legacy markers without timestamps are treated as stale for backward compatibility.
let cleanup_ttl_secs: u64 = 300; // 5 minutes
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();

if let Ok(pending_orders) = sqlx::query_as::<_, Order>(
"SELECT * FROM orders
WHERE dev_fee_payment_hash LIKE 'PENDING-%'
AND taken_at > 0
AND taken_at < ?1",
WHERE dev_fee_payment_hash LIKE 'PENDING-%'",
)
.bind(cutoff_time.as_u64() as i64)
.fetch_all(&*pool)
.await
{
if !stale_orders.is_empty() {
warn!(
"Found {} stale PENDING dev fee orders (older than {}s), resetting",
stale_orders.len(),
cleanup_ttl
);
let mut stale_count = 0u32;

for mut pending_order in pending_orders {
let order_id = pending_order.id;
let marker = pending_order
.dev_fee_payment_hash
.as_deref()
.unwrap_or_default();

// Parse timestamp from marker: PENDING-{uuid}-{unix_ts}
// Legacy format (PENDING-{uuid}) has no timestamp → treat as stale
let pending_ts = parse_pending_timestamp(marker);

let is_stale = match pending_ts {
Some(ts) => now_unix.saturating_sub(ts) >= cleanup_ttl_secs,
None => {
// Legacy marker without timestamp — treat as stale
warn!(
"Order {} has legacy PENDING marker without timestamp, treating as stale",
order_id
);
true
}
};

for mut stale_order in stale_orders {
let order_id = stale_order.id;
let age_seconds = Timestamp::now().as_u64() - stale_order.taken_at as u64;
if !is_stale {
continue;
}

warn!(
"Resetting stale PENDING order {} (age: {}s)",
order_id, age_seconds
);
stale_count += 1;
let age_display = pending_ts
.map(|ts| format!("{}s", now_unix.saturating_sub(ts)))
.unwrap_or_else(|| "unknown (legacy)".to_string());

stale_order.dev_fee_paid = false;
stale_order.dev_fee_payment_hash = None;
warn!(
"Resetting stale PENDING order {} (age: {})",
order_id, age_display
);

match stale_order.update(&pool).await {
Ok(_) => {
info!(
"✅ Reset stale PENDING for order {}, will retry payment",
order_id
);
}
Err(e) => {
error!(
"Failed to reset stale PENDING for order {}: {:?}",
order_id, e
);
}
pending_order.dev_fee_paid = false;
pending_order.dev_fee_payment_hash = None;

match pending_order.update(&pool).await {
Ok(_) => {
info!(
"✅ Reset stale PENDING for order {}, will retry payment",
order_id
);
}
Err(e) => {
error!(
"Failed to reset stale PENDING for order {}: {:?}",
order_id, e
);
}
}
}

if stale_count > 0 {
warn!(
"Reset {} stale PENDING dev fee orders (TTL: {}s)",
stale_count, cleanup_ttl_secs
);
}
}

// Query unpaid orders
Expand Down Expand Up @@ -631,7 +689,12 @@ async fn job_process_dev_fee_payment() {
let order_id = order.id;
info!("Pre-marking order {} as payment pending", order_id);
order.dev_fee_paid = true;
order.dev_fee_payment_hash = Some(format!("PENDING-{}", order_id));
let pending_ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
order.dev_fee_payment_hash =
Some(format!("PENDING-{}-{}", order_id, pending_ts));

let mut order = match order.update(&pool).await {
Err(e) => {
Expand Down Expand Up @@ -740,31 +803,26 @@ async fn job_process_dev_fee_payment() {
}
}
Err(_) => {
// STEP 5: Timeout, reset to unpaid for retry
// STEP 5: Timeout — DO NOT reset to unpaid.
//
// A timeout does NOT mean the payment failed. The Lightning
// payment may still be in-flight and could succeed after the
// timeout window. Resetting dev_fee_paid=false here would
// cause a duplicate payment on the next scheduler cycle.
//
// Instead, leave the order in PENDING state (dev_fee_paid=true,
// dev_fee_payment_hash="PENDING-{uuid}-{ts}"). The stale
// PENDING detection (with timestamp-based TTL) will clean it
// up after the configured TTL if the payment never resolves.
//
// See: https://github.com/MostroP2P/mostro/issues/568
let order_id = order.id;
let dev_fee = order.dev_fee;
error!(
"Dev fee payment timeout (50s) for order {} ({} sats)",
warn!(
"Dev fee payment timeout (50s) for order {} ({} sats). \
Leaving in PENDING state — stale cleanup will handle if payment never resolves.",
order_id, dev_fee
);

order.dev_fee_paid = false;
order.dev_fee_payment_hash = None;

match order.update(&pool).await {
Err(e) => {
error!(
"Failed to reset after timeout for order {}: {:?}",
order_id, e
);
}
Ok(_) => {
info!(
"Reset order {} to unpaid after timeout, will retry",
order_id
);
}
}
}
}
}
Expand All @@ -774,3 +832,56 @@ async fn job_process_dev_fee_payment() {
}
});
}

#[cfg(test)]
mod pending_marker_tests {
use super::parse_pending_timestamp;

#[test]
fn test_parse_new_format_with_uuid() {
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-1707700000";
assert_eq!(parse_pending_timestamp(marker), Some(1707700000));
}

#[test]
fn test_parse_legacy_format_uuid() {
// Legacy format without timestamp → None
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000";
assert_eq!(parse_pending_timestamp(marker), None);
}

#[test]
fn test_parse_not_pending() {
assert_eq!(parse_pending_timestamp("some-random-hash"), None);
assert_eq!(parse_pending_timestamp(""), None);
}

#[test]
fn test_parse_plain_pending() {
assert_eq!(parse_pending_timestamp("PENDING"), None);
assert_eq!(parse_pending_timestamp("PENDING-"), None);
}

#[test]
fn test_parse_invalid_timestamp() {
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-notanumber";
assert_eq!(parse_pending_timestamp(marker), None);
}

#[test]
fn test_parse_too_small_timestamp() {
// Timestamps < 1_000_000_000 (before ~2001) are rejected
let marker = "PENDING-550e8400-e29b-41d4-a716-446655440000-12345";
assert_eq!(parse_pending_timestamp(marker), None);
}

#[test]
fn test_parse_current_timestamp() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let marker = format!("PENDING-550e8400-e29b-41d4-a716-446655440000-{}", now);
assert_eq!(parse_pending_timestamp(&marker), Some(now));
}
}