From f2d42806b0f1cd4b03f10296b7d6bd5d2de070fe Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Wed, 8 Oct 2025 11:55:09 +0200 Subject: [PATCH 01/12] create the status for stats struct and fill it from the state machine also add a couple of columns for new data in stats table. we will be using the same stats table as before with these extra optional columns. also the events list is sent over but not stored as json like with legacy swaps. will decide later what do to with them (if they are supposed to be stored) --- mm2src/mm2_main/src/database.rs | 5 + mm2src/mm2_main/src/database/stats_swaps.rs | 7 + mm2src/mm2_main/src/lp_swap/maker_swap.rs | 3 + mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs | 32 +- mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 275 +++++++++++++++++- mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs | 38 ++- 6 files changed, 352 insertions(+), 8 deletions(-) diff --git a/mm2src/mm2_main/src/database.rs b/mm2src/mm2_main/src/database.rs index 960a31dfc9..8fd6b247b6 100644 --- a/mm2src/mm2_main/src/database.rs +++ b/mm2src/mm2_main/src/database.rs @@ -132,6 +132,10 @@ fn migration_13() -> Vec<(&'static str, Vec)> { ] } +fn migration_14() -> Vec<(&'static str, Vec)> { + db_common::sqlite::execute_batch(stats_swaps::ADD_SWAP_VERSION_AND_MARKET_MARGIN) +} + async fn statements_for_migration(ctx: &MmArc, current_migration: i64) -> Option)>> { match current_migration { 1 => Some(migration_1(ctx).await), @@ -147,6 +151,7 @@ async fn statements_for_migration(ctx: &MmArc, current_migration: i64) -> Option 11 => Some(migration_11()), 12 => Some(migration_12()), 13 => Some(migration_13()), + 14 => Some(migration_14()), _ => None, } } diff --git a/mm2src/mm2_main/src/database/stats_swaps.rs b/mm2src/mm2_main/src/database/stats_swaps.rs index ffb8d72b7e..75a9bc2087 100644 --- a/mm2src/mm2_main/src/database/stats_swaps.rs +++ b/mm2src/mm2_main/src/database/stats_swaps.rs @@ -100,6 +100,13 @@ pub const ADD_MAKER_TAKER_GUI_AND_VERSION: &[&str] = &[ pub const SELECT_ID_BY_UUID: &str = "SELECT id FROM stats_swaps WHERE uuid = ?1"; +pub const ADD_SWAP_VERSION_AND_MARKET_MARGIN: &[&str] = &[ + "ALTER TABLE stats_swaps ADD COLUMN swap_version INTEGER NOT NULL DEFAULT 0;", + "ALTER TABLE stats_swaps ADD COLUMN market_margin DECIMAL;", + // NULL = unknown, 0 = not a bot, 1 = bot + "ALTER TABLE stats_swaps ADD COLUMN is_maker_bot INTEGER;", +]; + /// Returns SQL statements to initially fill stats_swaps table using existing DB with JSON files pub async fn create_and_fill_stats_swaps_from_json_statements(ctx: &MmArc) -> Vec<(&'static str, Vec)> { let maker_swaps = SavedSwap::load_all_from_maker_stats_db(ctx).await.unwrap_or_default(); diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index 0752cad77e..759aced62e 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -2316,6 +2316,7 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { } }) .ok(); + // NOTES(tpu-status): This has nothing to do with the stats, this is only storing the swap to *MY* database. save_my_maker_swap_event(&ctx, &running_swap, to_save) .await .expect("!save_my_maker_swap_event"); @@ -2344,6 +2345,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { error!("!mark_swap_finished({}): {}", uuid, e); } + // NOTES(tpu-status): We broadcast the swap status at the very end and only once. + // We also save it to our own stats db at this point. if to_broadcast { if let Err(e) = broadcast_my_swap_status(&ctx, uuid).await { error!("!broadcast_my_swap_status({}): {}", uuid, e); diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index 18679d945c..efbad74973 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -64,7 +64,7 @@ pub struct StoredNegotiationData { taker_payment_locktime: u64, taker_funding_locktime: u64, maker_coin_htlc_pub_from_taker: BytesJson, - taker_coin_htlc_pub_from_taker: BytesJson, + pub taker_coin_htlc_pub_from_taker: BytesJson, maker_coin_swap_contract: Option, taker_coin_swap_contract: Option, taker_secret_hash: BytesJson, @@ -150,6 +150,33 @@ pub enum MakerSwapEvent { Completed, } +impl MakerSwapEvent { + /// Returns true if the event is terminal, i.e. no more events will be produced after it. + pub fn is_terminal(&self) -> bool { + matches!( + self, + MakerSwapEvent::Aborted { .. } | MakerSwapEvent::Completed | MakerSwapEvent::MakerPaymentRefunded { .. } + ) + } + + /// Returns negotiation data if the event contains it. + pub fn negotiation_data(&self) -> Option<&StoredNegotiationData> { + match self { + MakerSwapEvent::WaitingForTakerFunding { negotiation_data, .. } + | MakerSwapEvent::TakerFundingReceived { negotiation_data, .. } + | MakerSwapEvent::MakerPaymentSentFundingSpendGenerated { negotiation_data, .. } + | MakerSwapEvent::MakerPaymentRefundRequired { negotiation_data, .. } + | MakerSwapEvent::TakerPaymentReceived { negotiation_data, .. } + | MakerSwapEvent::TakerPaymentReceivedAndPreimageValidationSkipped { negotiation_data, .. } + | MakerSwapEvent::TakerPaymentSpent { negotiation_data, .. } => Some(negotiation_data), + MakerSwapEvent::Initialized { .. } + | MakerSwapEvent::MakerPaymentRefunded { .. } + | MakerSwapEvent::Aborted { .. } + | MakerSwapEvent::Completed => None, + } + } +} + /// Storage for maker swaps. #[derive(Clone)] pub struct MakerSwapStorage { @@ -451,7 +478,7 @@ impl Vec { + pub fn unique_data(&self) -> Vec { self.secret_hash() } @@ -2091,6 +2118,7 @@ impl Aborted { } } +// NOTES(tpu-status): These LastState impls should be the ones who send the swap status to the seed nodes (and store stats internally) #[async_trait] impl LastState for Aborted diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs index 1f59532155..dc37d242a7 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs @@ -1,18 +1,25 @@ +use std::convert::TryFrom; + use crate::lp_network::{subscribe_to_topic, unsubscribe_from_topic}; -use crate::lp_swap::maker_swap_v2::{MakerSwapDbRepr, MakerSwapStateMachine, MakerSwapStorage}; +use crate::lp_swap::maker_swap_v2::{MakerSwapDbRepr, MakerSwapEvent, MakerSwapStateMachine, MakerSwapStorage}; use crate::lp_swap::swap_lock::{SwapLock, SwapLockError, SwapLockOps}; -use crate::lp_swap::taker_swap_v2::{TakerSwapDbRepr, TakerSwapStateMachine, TakerSwapStorage}; -use crate::lp_swap::{swap_v2_topic, SwapsContext}; +use crate::lp_swap::taker_swap_v2::{TakerSwapDbRepr, TakerSwapEvent, TakerSwapStateMachine, TakerSwapStorage}; +use crate::lp_swap::{p2p_private_and_peer_id_to_broadcast, swap_v2_topic, SwapsContext}; +use coins::lp_price::fetch_swap_coins_price; use coins::{lp_coinfind, MakerCoinSwapOpsV2, MmCoin, MmCoinEnum, TakerCoinSwapOpsV2}; use common::executor::abortable_queue::AbortableQueue; use common::executor::{SpawnFuture, Timer}; use common::log::{error, info, warn}; +use common::now_sec; use derive_more::Display; +use keys::SECP_SIGN; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::*; +use mm2_libp2p::Secp256k1PubkeySerialize; +use mm2_number::BigDecimal; use mm2_state_machine::storable_state_machine::{StateMachineDbRepr, StateMachineStorage, StorableStateMachine}; use rpc::v1::types::Bytes as BytesJson; -use secp256k1::PublicKey; +use secp256k1::{PublicKey, SecretKey}; use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::Error; @@ -454,3 +461,263 @@ pub(super) async fn swap_kickstart_handler_for_taker( } } } + +#[derive(Debug, Deserialize, Serialize)] +/// The structure represents the swap information to be sent for statistics purposes. +pub struct TPUSwapStatusForStats { + /// The swap unique identifier + uuid: Uuid, + + /// The timestamp when the swap was started + started_at: u64, + /// The timestamp when the swap was finished (either successfully or not) + finished_at: u64, + + /// The coin name of the maker + maker_coin: String, + /// The public key of the maker (to which the taker's coins were paid) + maker_swap_pubkey: Option, + /// The amount of the maker's coin + maker_amount: BigDecimal, + + /// The coin name of the taker + taker_coin: String, + /// The public key of the taker (to which the maker's coins were paid) + taker_swap_pubkey: Option, + /// The amount of the taker's coin + taker_amount: BigDecimal, + + /// The price of the maker's coin in USD at the moment of the swap + maker_coin_usd_price: Option, + /// The price of the taker's coin in USD at the moment of the swap + taker_coin_usd_price: Option, + /// The difference (in +/- percentage) between the market price and the swap price at the moment of the swap (from the maker's pov) + market_margin: Option, + /// Is the maker a bot. Possible values are: Some(true) (yes), Some(false) (no), None (unknown) + is_maker_bot: Option, + + /// The GUI of the maker + maker_gui: Option, + /// The maker's KDF version + maker_version: Option, + /// The GUI of the taker + taker_gui: Option, + /// The taker's KDF version + taker_version: Option, + + /// The version of the swap protocol used in the swap + /// Note that this field should start with 2 because this struct is specific to TPU swaps + swap_version: u8, + + // The next set of fields are extra and currently not part of the swap stats + /// Maker's p2p pubkey + maker_p2p_pubkey: Secp256k1PubkeySerialize, + /// Taker's p2p pubkey + taker_p2p_pubkey: Secp256k1PubkeySerialize, + + /// Premium paid by taker to maker + taker_premium: BigDecimal, + /// The amount of fee paid by the taker to the DEX + dex_fee_amount: BigDecimal, + /// The amount of DEX fee burnt + dex_fee_burn: BigDecimal, + + /// The maker or taker detailed swap events + events: TPUSwapEvents, +} + +#[derive(Debug, Deserialize, Serialize)] +/// Represents either a batch of maker or taker swap events. This could be used to know whether a TPUSwapStatusForStats +/// is maker-originating or taker-originating. +pub enum TPUSwapEvents { + FromMaker(Vec), + FromTaker(Vec), +} + +impl TPUSwapStatusForStats { + async fn try_from_maker_state_machine( + machine: &MakerSwapStateMachine, + ) -> Result { + let repr = machine + .storage + .get_repr(machine.uuid) + .await + .map_err(|_| SwapStatusGenerationError::StorageError)?; + + // Make sure the swap is finished (aborted, completed or refunded) + // FIXME: We might want to not share or store aborted swaps at all + if repr.events.last().map(|e| !e.is_terminal()).unwrap_or(true) { + return Err(SwapStatusGenerationError::SwapNotFinished); + } + + // Make sure the swap is of version 2 or higher (since TPU starts from v2) + if repr.swap_version < 2 { + return Err(SwapStatusGenerationError::InvalidSwapVersion); + } + + // Calculate the usd prices of the coins and the market margin + let mut maker_coin_usd_price = None; + let mut taker_coin_usd_price = None; + let mut market_margin = None; + let rates = fetch_swap_coins_price(Some(repr.maker_coin.clone()), Some(repr.taker_coin.clone())).await; + if let Some(ref rates) = rates { + let fair_market_price = &rates.rel / &rates.base; + let swap_price = repr.taker_volume.to_decimal() / repr.maker_volume.to_decimal(); + market_margin = Some( + (&swap_price - &fair_market_price) / fair_market_price + * BigDecimal::try_from(100.0).expect("100.0 is a valid non-NAN float"), + ); + maker_coin_usd_price = Some(rates.base.clone()); + taker_coin_usd_price = Some(rates.rel.clone()); + } + + // Get the maker's swap pubkey + let maker_swap_pubkey = machine.maker_coin.derive_htlc_pubkey_v2_bytes(&machine.unique_data()); + let maker_swap_pubkey = Some(hex::encode(maker_swap_pubkey)); + + // Get the taker's swap pubkey from the negotiation data in the events if available + let mut taker_swap_pubkey = None; + for event in &repr.events { + if let Some(negotiation_data) = event.negotiation_data() { + taker_swap_pubkey = Some(hex::encode(negotiation_data.taker_coin_htlc_pub_from_taker.0.clone())); + break; + } + } + + // Determine if the maker is a bot + // This is overly simplistic check and only checks whether the simple_market_maker_bot_ctx was initialized. + // TODO: A proper check would be to open the market maker bot and check whether is is running and that the + // swap we just performed is found within its SimpleMakerBotRegistry. The problem at the moment is that + // we can't import TradingBotContext since that's part of lp_ordermatch and that would create a cyclic dependency. + let mut is_maker_bot = Some(false); + if machine.ctx.simple_market_maker_bot_ctx.lock().unwrap().is_some() { + is_maker_bot = Some(true); + } + + // Get the maker's p2p pubkey + let (p2p_private_key, _) = p2p_private_and_peer_id_to_broadcast(&machine.ctx, machine.p2p_keypair.as_ref()); + let secp_secret = SecretKey::from_slice(&p2p_private_key).expect("valid secret key"); + let maker_p2p_pubkey = PublicKey::from_secret_key(&SECP_SIGN, &secp_secret).into(); + + Ok(TPUSwapStatusForStats { + uuid: repr.uuid, + started_at: repr.started_at, + // Assuming that this method gets called right after the swap is finished. + // TODO: Consider storing the finished_at timestamp in the DB and/or state machine and use that. + finished_at: now_sec(), + maker_coin: repr.maker_coin, + maker_swap_pubkey, + maker_amount: repr.maker_volume.to_decimal(), + taker_coin: repr.taker_coin, + taker_swap_pubkey, + taker_amount: repr.taker_volume.to_decimal(), + maker_coin_usd_price, + taker_coin_usd_price, + market_margin, + is_maker_bot, + maker_gui: machine.ctx.gui().map(|g| g.to_owned()), + maker_version: Some(machine.ctx.mm_version().into()), + taker_gui: None, + taker_version: None, + swap_version: repr.swap_version, + maker_p2p_pubkey, + taker_p2p_pubkey: repr.taker_p2p_pub, + taker_premium: repr.taker_premium.to_decimal(), + dex_fee_amount: repr.dex_fee_amount.to_decimal(), + dex_fee_burn: repr.dex_fee_burn.to_decimal(), + events: TPUSwapEvents::FromMaker(repr.events), + }) + } + + async fn try_from_taker_state_machine( + machine: &TakerSwapStateMachine, + ) -> Result { + let repr = machine + .storage + .get_repr(machine.uuid) + .await + .map_err(|_| SwapStatusGenerationError::StorageError)?; + + // Make sure the swap is finished (aborted, completed or refunded) + // FIXME: We might want to not share or store aborted swaps at all + if repr.events.last().map(|e| !e.is_terminal()).unwrap_or(true) { + return Err(SwapStatusGenerationError::SwapNotFinished); + } + + // Make sure the swap is of version 2 or higher (since TPU starts from v2) + if repr.swap_version < 2 { + return Err(SwapStatusGenerationError::InvalidSwapVersion); + } + + // Calculate the usd prices of the coins and the market margin + let mut maker_coin_usd_price = None; + let mut taker_coin_usd_price = None; + let mut market_margin = None; + let rates = fetch_swap_coins_price(Some(repr.maker_coin.clone()), Some(repr.taker_coin.clone())).await; + if let Some(ref rates) = rates { + let fair_market_price = &rates.rel / &rates.base; + let swap_price = repr.taker_volume.to_decimal() / repr.maker_volume.to_decimal(); + market_margin = Some( + (&swap_price - &fair_market_price) / fair_market_price + * BigDecimal::try_from(100.0).expect("100.0 is a valid non-NAN float"), + ); + maker_coin_usd_price = Some(rates.base.clone()); + taker_coin_usd_price = Some(rates.rel.clone()); + } + + // Get the taker's swap pubkey + let taker_swap_pubkey = machine.taker_coin.derive_htlc_pubkey_v2_bytes(&machine.unique_data()); + let taker_swap_pubkey = Some(hex::encode(taker_swap_pubkey)); + + // Get the maker's swap pubkey from the negotiation data in the events if available + let mut maker_swap_pubkey = None; + for event in &repr.events { + if let Some(negotiation_data) = event.negotiation_data() { + maker_swap_pubkey = Some(hex::encode(negotiation_data.maker_coin_htlc_pub_from_maker.0.clone())); + break; + } + } + + // Get the taker's p2p pubkey + let (p2p_private_key, _) = p2p_private_and_peer_id_to_broadcast(&machine.ctx, machine.p2p_keypair.as_ref()); + let secp_secret = SecretKey::from_slice(&p2p_private_key).expect("valid secret key"); + let taker_p2p_pubkey = PublicKey::from_secret_key(&SECP_SIGN, &secp_secret).into(); + + Ok(TPUSwapStatusForStats { + uuid: repr.uuid, + started_at: repr.started_at, + // Assuming that this method gets called right after the swap is finished. + // TODO: Consider storing the finished_at timestamp in the DB and/or state machine and use that. + finished_at: now_sec(), + maker_coin: repr.maker_coin, + maker_swap_pubkey, + maker_amount: repr.maker_volume.to_decimal(), + taker_coin: repr.taker_coin, + taker_swap_pubkey, + taker_amount: repr.taker_volume.to_decimal(), + maker_coin_usd_price, + taker_coin_usd_price, + market_margin, + is_maker_bot: None, + maker_gui: None, + maker_version: None, + taker_gui: machine.ctx.gui().map(|g| g.to_owned()), + taker_version: Some(machine.ctx.mm_version().into()), + swap_version: repr.swap_version, + maker_p2p_pubkey: repr.maker_p2p_pub, + taker_p2p_pubkey, + taker_premium: repr.taker_premium.to_decimal(), + dex_fee_amount: repr.dex_fee_amount.to_decimal(), + dex_fee_burn: repr.dex_fee_burn.to_decimal(), + events: TPUSwapEvents::FromTaker(repr.events), + }) + } +} + +#[derive(Debug, Deserialize, Serialize)] +/// Errors that could be returned when generating the swap status for stats from a swap state machine. +pub enum SwapStatusGenerationError { + StorageError, + SwapNotFinished, + InvalidSwapVersion, +} diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index 334c71b77e..68c1c60977 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -63,7 +63,7 @@ pub struct StoredNegotiationData { maker_payment_locktime: u64, maker_secret_hash: BytesJson, taker_coin_maker_address: String, - maker_coin_htlc_pub_from_maker: BytesJson, + pub maker_coin_htlc_pub_from_maker: BytesJson, taker_coin_htlc_pub_from_maker: BytesJson, maker_coin_swap_contract: Option, taker_coin_swap_contract: Option, @@ -180,6 +180,40 @@ pub enum TakerSwapEvent { Completed, } +impl TakerSwapEvent { + /// Returns true if the event is terminal, i.e. no more events will be produced after it. + pub fn is_terminal(&self) -> bool { + matches!( + self, + TakerSwapEvent::Aborted { .. } + | TakerSwapEvent::Completed + | TakerSwapEvent::TakerFundingRefunded { .. } + | TakerSwapEvent::TakerPaymentRefunded { .. } + ) + } + + /// Returns negotiation data if the event contains it. + pub fn negotiation_data(&self) -> Option<&StoredNegotiationData> { + match self { + TakerSwapEvent::Negotiated { negotiation_data, .. } + | TakerSwapEvent::TakerFundingSent { negotiation_data, .. } + | TakerSwapEvent::TakerFundingRefundRequired { negotiation_data, .. } + | TakerSwapEvent::MakerPaymentAndFundingSpendPreimgReceived { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentSent { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentSentAndPreimageSendingSkipped { negotiation_data, .. } + | TakerSwapEvent::MakerPaymentConfirmed { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentSpent { negotiation_data, .. } + | TakerSwapEvent::MakerPaymentSpent { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentRefundRequired { negotiation_data, .. } => Some(negotiation_data), + TakerSwapEvent::Initialized { .. } + | TakerSwapEvent::TakerFundingRefunded { .. } + | TakerSwapEvent::TakerPaymentRefunded { .. } + | TakerSwapEvent::Aborted { .. } + | TakerSwapEvent::Completed => None, + } + } +} + /// Storage for taker swaps. #[derive(Clone)] pub struct TakerSwapStorage { @@ -476,7 +510,7 @@ impl Vec { + pub fn unique_data(&self) -> Vec { self.uuid.as_bytes().to_vec() } From 6c535b90e9ef6349db0ad3095e2a060f663185d9 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Thu, 9 Oct 2025 11:41:48 +0200 Subject: [PATCH 02/12] persist stats swap to the db and send it via the network this also wraps the structs found in SwapStatusData inside a box because they are very different in size (i.e. for perf purposes) --- mm2src/mm2_main/src/database/stats_swaps.rs | 109 +++++++++++++++++- mm2src/mm2_main/src/lp_swap.rs | 71 ++++++++++-- mm2src/mm2_main/src/lp_swap/maker_swap.rs | 3 - mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs | 2 +- mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 63 +++++----- mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs | 1 + 6 files changed, 210 insertions(+), 39 deletions(-) diff --git a/mm2src/mm2_main/src/database/stats_swaps.rs b/mm2src/mm2_main/src/database/stats_swaps.rs index 75a9bc2087..eb9e270e39 100644 --- a/mm2src/mm2_main/src/database/stats_swaps.rs +++ b/mm2src/mm2_main/src/database/stats_swaps.rs @@ -1,6 +1,6 @@ #![allow(deprecated)] // TODO: remove this once rusqlite is >= 0.29 -use crate::lp_swap::{MakerSavedSwap, SavedSwap, SavedSwapIo, TakerSavedSwap}; +use crate::lp_swap::{MakerSavedSwap, SavedSwap, SavedSwapIo, TPUSwapStatusForStats, TakerSavedSwap}; use common::log::{debug, error}; use db_common::{ owned_named_params, @@ -376,6 +376,113 @@ pub fn add_swap_to_index(conn: &Connection, swap: &SavedSwap) { execute_query_with_params(conn, sql, params); } +/// Constructs the update query for optional fields of the TPU swap status struct. That is, any field that is wrapped in an Option. +fn update_optional_info(swap: &TPUSwapStatusForStats) -> (String, OwnedSqlNamedParams) { + let mut extra_args = String::new(); + let mut params = OwnedSqlNamedParams::new(); + + if let Some(ref maker_pubkey) = swap.maker_swap_pubkey { + extra_args.push_str(", maker_pubkey = :maker_pubkey"); + params.push((":maker_pubkey", maker_pubkey.clone().into())); + } + if let Some(ref taker_pubkey) = swap.taker_swap_pubkey { + extra_args.push_str(", taker_pubkey = :taker_pubkey"); + params.push((":taker_pubkey", taker_pubkey.clone().into())); + } + if let Some(ref maker_coin_usd_price) = swap.maker_coin_usd_price { + extra_args.push_str(", maker_coin_usd_price = :maker_coin_usd_price"); + params.push((":maker_coin_usd_price", maker_coin_usd_price.to_string().into())); + } + if let Some(ref taker_coin_usd_price) = swap.taker_coin_usd_price { + extra_args.push_str(", taker_coin_usd_price = :taker_coin_usd_price"); + params.push((":taker_coin_usd_price", taker_coin_usd_price.to_string().into())); + } + if let Some(ref market_margin) = swap.market_margin { + extra_args.push_str(", market_margin = :market_margin"); + params.push((":market_margin", market_margin.to_string().into())); + } + if let Some(is_maker_bot) = swap.is_maker_bot { + extra_args.push_str(", is_maker_bot = :is_maker_bot"); + params.push((":is_maker_bot", (is_maker_bot as u32).into())); + } + if let Some(ref maker_gui) = swap.maker_gui { + extra_args.push_str(", maker_gui = :maker_gui"); + params.push((":maker_gui", maker_gui.clone().into())); + } + if let Some(ref maker_version) = swap.maker_version { + extra_args.push_str(", maker_version = :maker_version"); + params.push((":maker_version", maker_version.clone().into())); + } + if let Some(ref taker_gui) = swap.taker_gui { + extra_args.push_str(", taker_gui = :taker_gui"); + params.push((":taker_gui", taker_gui.clone().into())); + } + if let Some(ref taker_version) = swap.taker_version { + extra_args.push_str(", taker_version = :taker_version"); + params.push((":taker_version", taker_version.clone().into())); + } + + let update_query = format!( + "UPDATE stats_swaps set swap_version = :swap_version{} WHERE uuid = :uuid;", + extra_args + ); + + // Swap version is actually mandatory and will always be set. + params.push((":uuid", swap.uuid.to_string().into())); + params.push((":swap_version", (swap.swap_version as u32).into())); + + (update_query, params) +} + +/// Adds a TPU swap to stats_swap table if it's not present and updates the swap with the new data if it already exists. +pub fn add_v2swap_to_index(conn: &Connection, swap: &TPUSwapStatusForStats) -> Result<(), String> { + let params = vec![swap.uuid.to_string()]; + let query_row = conn.query_row(SELECT_ID_BY_UUID, params_from_iter(params.iter()), |row| { + row.get::<_, i64>(0) + }); + match query_row.optional() { + // swap is not indexed yet, insert it into the DB + Ok(None) => { + let (maker_coin_ticker, maker_coin_platform) = split_coin(&swap.maker_coin); + let (taker_coin_ticker, taker_coin_platform) = split_coin(&swap.taker_coin); + + let params = owned_named_params! { + ":maker_coin": swap.maker_coin.clone(), + ":maker_coin_ticker": maker_coin_ticker, + ":maker_coin_platform": maker_coin_platform, + ":taker_coin": swap.taker_coin.clone(), + ":taker_coin_ticker": taker_coin_ticker, + ":taker_coin_platform": taker_coin_platform, + ":uuid": swap.uuid.to_string(), + ":started_at": swap.started_at.to_string(), + ":finished_at": swap.finished_at.to_string(), + ":maker_amount": swap.maker_amount.to_string(), + ":taker_amount": swap.taker_amount.to_string(), + ":is_success": (swap.is_success() as u32).to_string(), + ":maker_coin_usd_price": None::, + ":taker_coin_usd_price": None::, + ":maker_pubkey": None::, + ":taker_pubkey": None::, + }; + + execute_query_with_params(conn, INSERT_STATS_SWAP, params); + }, + // swap is already indexed. Only update missing fields. + Ok(Some(_)) => (), + Err(e) => { + let err_msg = format!("Error {} on query {} with params {:?}", e, SELECT_ID_BY_UUID, params); + error!("{}", err_msg); + return Err(err_msg); + }, + }; + + let (sql, params) = update_optional_info(swap); + + execute_query_with_params(conn, &sql, params); + + Ok(()) +} + #[test] fn test_split_coin() { let input = ""; diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index b59fe05225..77e7068756 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -140,6 +140,7 @@ use pubkey_banning::BanReason; pub use pubkey_banning::{ban_pubkey_rpc, is_pubkey_banned, list_banned_pubkeys_rpc, unban_pubkeys_rpc}; pub use recreate_swap_data::recreate_swap_data; pub use saved_swap::{SavedSwap, SavedSwapError, SavedSwapIo, SavedSwapResult}; +pub use swap_v2_common::TPUSwapStatusForStats; use swap_v2_common::{ get_unfinished_swaps_uuids, swap_kickstart_handler_for_maker, swap_kickstart_handler_for_taker, ActiveSwapV2Info, }; @@ -363,12 +364,21 @@ pub async fn process_swap_msg(ctx: MmArc, topic: &str, msg: &[u8]) -> P2PRequest Err(swap_msg_err) => { #[cfg(not(target_arch = "wasm32"))] return match json::from_slice::(msg) { - Ok(mut status) => { - status.data.fetch_and_set_usd_prices().await; - if let Err(e) = save_stats_swap(&ctx, &status.data).await { - error!("Error saving the swap {} status: {}", status.data.uuid(), e); - } - Ok(()) + Ok(SwapStatus { data, .. }) => match data { + SwapStatusData::Legacy(mut status) => { + status.fetch_and_set_usd_prices().await; + if let Err(e) = save_stats_swap(&ctx, &status).await { + error!("Error saving the swap {} status: {}", status.uuid(), e); + } + Ok(()) + }, + SwapStatusData::Tpu(status) => { + let uuid = status.uuid; + if let Err(e) = add_v2swap_to_stats_db_index(&ctx, *status).await { + error!("Error saving the swap {} status: {}", uuid, e); + } + Ok(()) + }, }, Err(swap_status_err) => { let error = format!( @@ -1011,6 +1021,19 @@ pub async fn insert_new_swap_to_db( .map_err(|e| ERRL!("{}", e)) } +#[cfg(not(target_arch = "wasm32"))] +pub async fn add_v2swap_to_stats_db_index(ctx: &MmArc, swap: TPUSwapStatusForStats) -> Result<(), String> { + let ctx = ctx.clone(); + common::async_blocking(move || { + if let Some(conn) = ctx.sqlite_conn_opt() { + crate::database::stats_swaps::add_v2swap_to_index(&conn, &swap) + } else { + ERR!("No SQLite connection available") + } + }) + .await +} + #[cfg(not(target_arch = "wasm32"))] fn add_swap_to_db_index(ctx: &MmArc, swap: &SavedSwap) { if let Some(conn) = ctx.sqlite_conn_opt() { @@ -1021,6 +1044,7 @@ fn add_swap_to_db_index(ctx: &MmArc, swap: &SavedSwap) { #[cfg(not(target_arch = "wasm32"))] async fn save_stats_swap(ctx: &MmArc, swap: &SavedSwap) -> Result<(), String> { try_s!(swap.save_to_stats_db(ctx).await); + // FIXME: Such a DB persistence query should be async. add_swap_to_db_index(ctx, swap); Ok(()) } @@ -1168,10 +1192,17 @@ pub async fn stats_swap_status(ctx: MmArc, req: Json) -> Result Ok(try_s!(Response::builder().body(res))) } +#[derive(Debug, Deserialize, Serialize)] +#[serde(untagged)] +enum SwapStatusData { + Legacy(Box), + Tpu(Box), +} + #[derive(Debug, Deserialize, Serialize)] struct SwapStatus { method: String, - data: SavedSwap, + data: SwapStatusData, } /// Broadcasts `my` swap status to P2P network @@ -1187,13 +1218,37 @@ async fn broadcast_my_swap_status(ctx: &MmArc, uuid: Uuid) -> Result<(), String> let status = SwapStatus { method: "swapstatus".into(), - data: status, + data: SwapStatusData::Legacy(Box::new(status)), }; let msg = json::to_vec(&status).expect("Swap status ser should never fail"); broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None); Ok(()) } +async fn broadcast_my_v2swap_status(ctx: &MmArc, status: TPUSwapStatusForStats) -> Result<(), String> { + // Serialize the status to prepare for broadcasting + let status = SwapStatus { + method: "swapstatus".into(), + data: SwapStatusData::Tpu(Box::new(status)), + }; + let msg = json::to_vec(&status).expect("Swap status ser should never fail"); + + // Extract status back. We are avoiding cloning it since it's a big object. + let SwapStatusData::Tpu(status) = status.data else { + unreachable!("We are sure that status is TPU variant"); + }; + let uuid = status.uuid; + + // Add the status to our own stats db index + #[cfg(not(target_arch = "wasm32"))] + try_s!(add_v2swap_to_stats_db_index(ctx, *status).await); + + // Broadcast the status to the P2P network + broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None); + + Ok(()) +} + #[derive(Debug, Deserialize)] pub struct MySwapsFilter { pub my_coin: Option, diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index 759aced62e..0752cad77e 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -2316,7 +2316,6 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { } }) .ok(); - // NOTES(tpu-status): This has nothing to do with the stats, this is only storing the swap to *MY* database. save_my_maker_swap_event(&ctx, &running_swap, to_save) .await .expect("!save_my_maker_swap_event"); @@ -2345,8 +2344,6 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { error!("!mark_swap_finished({}): {}", uuid, e); } - // NOTES(tpu-status): We broadcast the swap status at the very end and only once. - // We also save it to our own stats db at this point. if to_broadcast { if let Err(e) = broadcast_my_swap_status(&ctx, uuid).await { error!("!broadcast_my_swap_status({}): {}", uuid, e); diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index efbad74973..de996eda7c 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -2118,7 +2118,6 @@ impl Aborted { } } -// NOTES(tpu-status): These LastState impls should be the ones who send the swap status to the seed nodes (and store stats internally) #[async_trait] impl LastState for Aborted @@ -2129,6 +2128,7 @@ impl, state_machine: &mut Self::StateMachine, ) -> ::Result { + // FIXME: Do we want to broadcast the swap status here? warn!("Swap {} was aborted with reason {}", state_machine.uuid, self.reason); } } diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs index dc37d242a7..9b4b38f0a3 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs @@ -466,64 +466,64 @@ pub(super) async fn swap_kickstart_handler_for_taker( /// The structure represents the swap information to be sent for statistics purposes. pub struct TPUSwapStatusForStats { /// The swap unique identifier - uuid: Uuid, + pub uuid: Uuid, /// The timestamp when the swap was started - started_at: u64, + pub started_at: u64, /// The timestamp when the swap was finished (either successfully or not) - finished_at: u64, + pub finished_at: u64, /// The coin name of the maker - maker_coin: String, + pub maker_coin: String, /// The public key of the maker (to which the taker's coins were paid) - maker_swap_pubkey: Option, + pub maker_swap_pubkey: Option, /// The amount of the maker's coin - maker_amount: BigDecimal, + pub maker_amount: BigDecimal, /// The coin name of the taker - taker_coin: String, + pub taker_coin: String, /// The public key of the taker (to which the maker's coins were paid) - taker_swap_pubkey: Option, + pub taker_swap_pubkey: Option, /// The amount of the taker's coin - taker_amount: BigDecimal, + pub taker_amount: BigDecimal, /// The price of the maker's coin in USD at the moment of the swap - maker_coin_usd_price: Option, + pub maker_coin_usd_price: Option, /// The price of the taker's coin in USD at the moment of the swap - taker_coin_usd_price: Option, + pub taker_coin_usd_price: Option, /// The difference (in +/- percentage) between the market price and the swap price at the moment of the swap (from the maker's pov) - market_margin: Option, + pub market_margin: Option, /// Is the maker a bot. Possible values are: Some(true) (yes), Some(false) (no), None (unknown) - is_maker_bot: Option, + pub is_maker_bot: Option, /// The GUI of the maker - maker_gui: Option, + pub maker_gui: Option, /// The maker's KDF version - maker_version: Option, + pub maker_version: Option, /// The GUI of the taker - taker_gui: Option, + pub taker_gui: Option, /// The taker's KDF version - taker_version: Option, + pub taker_version: Option, /// The version of the swap protocol used in the swap /// Note that this field should start with 2 because this struct is specific to TPU swaps - swap_version: u8, + pub swap_version: u8, // The next set of fields are extra and currently not part of the swap stats /// Maker's p2p pubkey - maker_p2p_pubkey: Secp256k1PubkeySerialize, + pub maker_p2p_pubkey: Secp256k1PubkeySerialize, /// Taker's p2p pubkey - taker_p2p_pubkey: Secp256k1PubkeySerialize, + pub taker_p2p_pubkey: Secp256k1PubkeySerialize, /// Premium paid by taker to maker - taker_premium: BigDecimal, + pub taker_premium: BigDecimal, /// The amount of fee paid by the taker to the DEX - dex_fee_amount: BigDecimal, + pub dex_fee_amount: BigDecimal, /// The amount of DEX fee burnt - dex_fee_burn: BigDecimal, + pub dex_fee_burn: BigDecimal, /// The maker or taker detailed swap events - events: TPUSwapEvents, + pub events: TPUSwapEvents, } #[derive(Debug, Deserialize, Serialize)] @@ -545,7 +545,6 @@ impl TPUSwapStatusForStats { .map_err(|_| SwapStatusGenerationError::StorageError)?; // Make sure the swap is finished (aborted, completed or refunded) - // FIXME: We might want to not share or store aborted swaps at all if repr.events.last().map(|e| !e.is_terminal()).unwrap_or(true) { return Err(SwapStatusGenerationError::SwapNotFinished); } @@ -639,7 +638,6 @@ impl TPUSwapStatusForStats { .map_err(|_| SwapStatusGenerationError::StorageError)?; // Make sure the swap is finished (aborted, completed or refunded) - // FIXME: We might want to not share or store aborted swaps at all if repr.events.last().map(|e| !e.is_terminal()).unwrap_or(true) { return Err(SwapStatusGenerationError::SwapNotFinished); } @@ -712,6 +710,19 @@ impl TPUSwapStatusForStats { events: TPUSwapEvents::FromTaker(repr.events), }) } + + pub fn is_success(&self) -> bool { + match self.events { + TPUSwapEvents::FromMaker(ref events) => events + .last() + .map(|e| matches!(e, MakerSwapEvent::Completed)) + .unwrap_or(false), + TPUSwapEvents::FromTaker(ref events) => events + .last() + .map(|e| matches!(e, TakerSwapEvent::Completed)) + .unwrap_or(false), + } + } } #[derive(Debug, Deserialize, Serialize)] diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index 68c1c60977..6b35bf3ff0 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -2549,6 +2549,7 @@ impl, state_machine: &mut Self::StateMachine, ) -> ::Result { + // FIXME: Do we want to broadcast the swap status here? warn!("Swap {} was aborted with reason {}", state_machine.uuid, self.reason); } } From 01f0b72fcc8d6282317007b84df11eb5776db183 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Thu, 9 Oct 2025 11:58:02 +0200 Subject: [PATCH 03/12] broadcast (and store) the stats at the last state for tpu swaps we don't do so in aborted states though. we might wanna change that --- mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs | 26 ++++++++++++- mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 4 +- mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs | 38 ++++++++++++++++++- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index de996eda7c..43f33347db 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -6,7 +6,7 @@ use super::{ }; use crate::lp_swap::maker_swap::MakerSwapPreparedParams; use crate::lp_swap::swap_lock::SwapLock; -use crate::lp_swap::swap_v2_pb::*; +use crate::lp_swap::{broadcast_my_v2swap_status, swap_v2_pb::*}; use crate::lp_swap::{ broadcast_swap_v2_msg_every, check_balance_for_maker_swap, recv_swap_v2_msg, SwapConfirmationsSettings, TransactionIdentifier, MAKER_SWAP_V2_TYPE, MAX_STARTED_AT_DIFF, @@ -2195,6 +2195,18 @@ impl ::Result { info!("Swap {} has been completed", state_machine.uuid); + + let swap_status = TPUSwapStatusForStats::try_from_maker_state_machine(state_machine) + .await + .map_err(|e| { + error!("Error converting finished state machine to swap status for stats: {e:?}"); + }); + + if let Ok(swap_status) = swap_status { + if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { + error!("Error broadcasting swap status: {e}"); + } + } } } @@ -2244,6 +2256,18 @@ impl, ) -> Result { let repr = machine @@ -628,7 +628,7 @@ impl TPUSwapStatusForStats { }) } - async fn try_from_taker_state_machine( + pub async fn try_from_taker_state_machine( machine: &TakerSwapStateMachine, ) -> Result { let repr = machine diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index 6b35bf3ff0..1f1ebab552 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -5,7 +5,7 @@ use super::{ NEGOTIATION_TIMEOUT_SEC, }; use crate::lp_swap::swap_lock::SwapLock; -use crate::lp_swap::swap_v2_pb::*; +use crate::lp_swap::{broadcast_my_v2swap_status, swap_v2_pb::*}; use crate::lp_swap::{ broadcast_swap_v2_msg_every, check_balance_for_taker_swap, recv_swap_v2_msg, swap_v2_topic, SwapConfirmationsSettings, TransactionIdentifier, MAX_STARTED_AT_DIFF, TAKER_SWAP_V2_TYPE, @@ -2620,6 +2620,18 @@ impl ::Result { info!("Swap {} has been completed", state_machine.uuid); + + let swap_status = TPUSwapStatusForStats::try_from_taker_state_machine(state_machine) + .await + .map_err(|e| { + error!("Error converting finished state machine to swap status for stats: {e:?}"); + }); + + if let Ok(swap_status) = swap_status { + if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { + error!("Error broadcasting swap status: {e}"); + } + } } } @@ -2669,6 +2681,18 @@ impl Date: Sat, 11 Oct 2025 13:41:31 +0200 Subject: [PATCH 04/12] add a comment regarding why certain fields set to None --- mm2src/mm2_main/src/database/stats_swaps.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/mm2src/mm2_main/src/database/stats_swaps.rs b/mm2src/mm2_main/src/database/stats_swaps.rs index eb9e270e39..9fce05cbba 100644 --- a/mm2src/mm2_main/src/database/stats_swaps.rs +++ b/mm2src/mm2_main/src/database/stats_swaps.rs @@ -459,6 +459,7 @@ pub fn add_v2swap_to_index(conn: &Connection, swap: &TPUSwapStatusForStats) -> R ":maker_amount": swap.maker_amount.to_string(), ":taker_amount": swap.taker_amount.to_string(), ":is_success": (swap.is_success() as u32).to_string(), + // We will set those in the optional info update. For now setting them to NULL to conform with `INSERT_STATS_SWAP` statement. ":maker_coin_usd_price": None::, ":taker_coin_usd_price": None::, ":maker_pubkey": None::, From 742fd0d0bd708b5d9a365507ec79aa8a12206690 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sat, 11 Oct 2025 15:56:49 +0200 Subject: [PATCH 05/12] add a simple rpc to check stats db data this is prepared for tests rather than real usage, but could ofc be amended later. for now it only exposes a couple of fields just to make sure the swap persisted in the db correctly --- mm2src/mm2_main/src/database/stats_swaps.rs | 40 +++++++++++++++++++ mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs | 35 ++++++++++++++++ .../mm2_main/src/rpc/dispatcher/dispatcher.rs | 4 ++ 3 files changed, 79 insertions(+) diff --git a/mm2src/mm2_main/src/database/stats_swaps.rs b/mm2src/mm2_main/src/database/stats_swaps.rs index 9fce05cbba..892b4a4e06 100644 --- a/mm2src/mm2_main/src/database/stats_swaps.rs +++ b/mm2src/mm2_main/src/database/stats_swaps.rs @@ -484,6 +484,46 @@ pub fn add_v2swap_to_index(conn: &Connection, swap: &TPUSwapStatusForStats) -> R Ok(()) } +#[derive(Debug, Serialize)] +pub struct SwapStatusFromDB { + pub maker_coin: String, + pub taker_coin: String, + pub started_at: u64, + pub finished_at: u64, + pub maker_amount: String, + pub taker_amount: String, + pub is_success: bool, +} + +pub fn fetch_swap_status(conn: &Connection, uuid: &str) -> Result, String> { + const SELECT_SWAP_STATUS_BY_UUID: &str = "SELECT maker_coin, taker_coin, started_at, finished_at, maker_amount, taker_amount, is_success FROM stats_swaps WHERE uuid = ?1"; + + let params = vec![uuid.to_string()]; + let query_row = conn.query_row(SELECT_SWAP_STATUS_BY_UUID, params_from_iter(params.iter()), |row| { + Ok(SwapStatusFromDB { + maker_coin: row.get(0)?, + taker_coin: row.get(1)?, + started_at: row.get(2)?, + finished_at: row.get(3)?, + maker_amount: row.get(4)?, + taker_amount: row.get(5)?, + is_success: row.get::<_, u32>(6)? != 0, + }) + }); + match query_row.optional() { + Ok(Some(swap_status)) => Ok(Some(swap_status)), + Ok(None) => Ok(None), + Err(e) => { + let err_msg = format!( + "Error {} on query {} with params {:?}", + e, SELECT_SWAP_STATUS_BY_UUID, params + ); + error!("{}", err_msg); + Err(err_msg) + }, + } +} + #[test] fn test_split_coin() { let input = ""; diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs index 9915978152..ad3f70f5b0 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs @@ -21,6 +21,7 @@ use uuid::Uuid; cfg_native!( use crate::database::my_swaps::SELECT_MY_SWAP_V2_FOR_RPC_BY_UUID; + use crate::database::stats_swaps; use common::async_blocking; use db_common::sqlite::query_single_row; use db_common::sqlite::rusqlite::{Result as SqlResult, Row, Error as SqlError}; @@ -534,3 +535,37 @@ pub(crate) async fn active_swaps_rpc( statuses, }) } + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Deserialize)] +pub(crate) struct FetchSwapStatusRequest { + uuid: Uuid, +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Display, Serialize, SerializeErrorType)] +#[serde(tag = "error_type", content = "error_data")] +pub(crate) enum FetchSwapStatusError { + NoSwapWithUuid(Uuid), + DBFetchError(String), +} + +#[cfg(not(target_arch = "wasm32"))] +impl HttpStatusCode for FetchSwapStatusError { + fn status_code(&self) -> StatusCode { + match self { + FetchSwapStatusError::NoSwapWithUuid(_) => StatusCode::NOT_FOUND, + FetchSwapStatusError::DBFetchError(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +pub(crate) async fn fetch_swap_status_rpc( + ctx: MmArc, + req: FetchSwapStatusRequest, +) -> MmResult { + stats_swaps::fetch_swap_status(&ctx.sqlite_connection(), &req.uuid.to_string()) + .map_to_mm(|e| FetchSwapStatusError::DBFetchError(e.to_string()))? + .or_mm_err(|| FetchSwapStatusError::NoSwapWithUuid(req.uuid)) +} diff --git a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs index c724175f45..3ad33583d9 100644 --- a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs +++ b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher.rs @@ -12,6 +12,8 @@ use crate::lp_stats::{ add_node_to_version_stat, remove_node_from_version_stat, start_version_stat_collection, stop_version_stat_collection, update_version_stat_collection, }; +#[cfg(not(target_arch = "wasm32"))] +use crate::lp_swap::swap_v2_rpcs::fetch_swap_status_rpc; use crate::lp_swap::swap_v2_rpcs::{active_swaps_rpc, my_recent_swaps_rpc, my_swap_status_rpc}; use crate::lp_swap::{get_locked_amount_rpc, max_maker_vol, recreate_swap_data, trade_preimage_rpc}; use crate::lp_wallet::{change_mnemonic_password, delete_wallet_rpc, get_mnemonic_rpc, get_wallet_names_rpc}; @@ -284,6 +286,8 @@ async fn dispatcher_v2(request: MmRpcRequest, ctx: MmArc) -> DispatcherResult handle_mmrpc(ctx, request, start_version_stat_collection).await, "stop_simple_market_maker_bot" => handle_mmrpc(ctx, request, stop_simple_market_maker_bot).await, "stop_version_stat_collection" => handle_mmrpc(ctx, request, stop_version_stat_collection).await, + #[cfg(not(target_arch = "wasm32"))] + "swap_info_from_stats_db" => handle_mmrpc(ctx, request, fetch_swap_status_rpc).await, "trade_preimage" => handle_mmrpc(ctx, request, trade_preimage_rpc).await, "trezor_connection_status" => handle_mmrpc(ctx, request, trezor_connection_status).await, "update_nft" => handle_mmrpc(ctx, request, update_nft).await, From e17deb1943152a188dba0bd2cfef1598481615a5 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sat, 11 Oct 2025 16:53:39 +0200 Subject: [PATCH 06/12] add a docker test for swap v2 status db persistance --- .../tests/docker_tests/swap_proto_v2_tests.rs | 55 ++++++++++++++++++- mm2src/mm2_test_helpers/src/for_tests.rs | 27 +++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs b/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs index 0ced4f84d4..d8ddb047ba 100644 --- a/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs +++ b/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs @@ -14,7 +14,7 @@ use mm2_number::MmNumber; use mm2_test_helpers::for_tests::{ active_swaps, check_recent_swaps, coins_needed_for_kickstart, disable_coin, disable_coin_err, enable_native, get_locked_amount, mm_dump, my_swap_status, mycoin1_conf, mycoin_conf, start_swaps, wait_for_swap_finished, - wait_for_swap_status, MarketMakerIt, Mm2TestConf, + wait_for_swap_info_stats_db, wait_for_swap_status, MarketMakerIt, Mm2TestConf, }; use mm2_test_helpers::structs::MmNumberMultiRepr; use script::{Builder, Opcode}; @@ -945,3 +945,56 @@ fn test_v2_swap_utxo_utxo_file_lock() { block_on(mm_alice_dup.wait_for_log(22., |log| log.contains(&expected_log))).unwrap(); } } + +#[test] +fn test_v2_swap_utxo_status_broadcasting() { + let (_ctx, _, bob_priv_key) = generate_utxo_coin_with_random_privkey(MYCOIN, 1000.into()); + let (_ctx, _, alice_priv_key) = generate_utxo_coin_with_random_privkey(MYCOIN1, 1000.into()); + let coins = json!([mycoin_conf(1000), mycoin1_conf(1000)]); + + let bob_conf = Mm2TestConf::seednode_trade_v2(&format!("0x{}", hex::encode(bob_priv_key)), &coins); + let mut mm_bob = MarketMakerIt::start(bob_conf.conf.clone(), bob_conf.rpc_password.clone(), None).unwrap(); + let (_bob_dump_log, _bob_dump_dashboard) = mm_dump(&mm_bob.log_path); + log!("Bob log path: {}", mm_bob.log_path.display()); + + let alice_conf = Mm2TestConf::light_node_trade_v2( + &format!("0x{}", hex::encode(alice_priv_key)), + &coins, + &[&mm_bob.ip.to_string()], + ); + let mut mm_alice = MarketMakerIt::start(alice_conf.conf.clone(), alice_conf.rpc_password.clone(), None).unwrap(); + let (_alice_dump_log, _alice_dump_dashboard) = mm_dump(&mm_alice.log_path); + log!("Alice log path: {}", mm_alice.log_path.display()); + + log!("{:?}", block_on(enable_native(&mm_bob, MYCOIN, &[], None))); + log!("{:?}", block_on(enable_native(&mm_bob, MYCOIN1, &[], None))); + log!("{:?}", block_on(enable_native(&mm_alice, MYCOIN, &[], None))); + log!("{:?}", block_on(enable_native(&mm_alice, MYCOIN1, &[], None))); + + let uuids = block_on(start_swaps( + &mut mm_bob, + &mut mm_alice, + &[(MYCOIN, MYCOIN1)], + 1.0, + 1.0, + 100., + )); + log!("{:?}", uuids); + + for uuid in uuids.iter() { + block_on(wait_for_swap_finished(&mm_bob, uuid, 60)); + block_on(wait_for_swap_finished(&mm_alice, uuid, 30)); + + let maker_swap_status = block_on(my_swap_status(&mm_bob, uuid)); + log!("{:?}", maker_swap_status); + + let taker_swap_status = block_on(my_swap_status(&mm_alice, uuid)); + log!("{:?}", taker_swap_status); + } + + // Make sure that the swap status was persisted in stats DB. + for uuid in uuids.iter() { + block_on(wait_for_swap_info_stats_db(&mm_bob, uuid, 60)); + block_on(wait_for_swap_info_stats_db(&mm_alice, uuid, 30)); + } +} diff --git a/mm2src/mm2_test_helpers/src/for_tests.rs b/mm2src/mm2_test_helpers/src/for_tests.rs index 0b69ad9b55..b731109b31 100644 --- a/mm2src/mm2_test_helpers/src/for_tests.rs +++ b/mm2src/mm2_test_helpers/src/for_tests.rs @@ -2598,6 +2598,33 @@ pub async fn wait_check_stats_swap_status(mm: &MarketMakerIt, uuid: &str, timeou } } +pub async fn wait_for_swap_info_stats_db(mm: &MarketMakerIt, uuid: &str, timeout_sec: i64) { + let wait_until = get_utc_timestamp() + timeout_sec; + loop { + let response = mm + .rpc(&json!({ + "userpass": mm.userpass, + "method": "swap_info_from_stats_db", + "mmrpc": "2.0", + "params": { + "uuid": uuid, + } + })) + .await + .unwrap(); + + if response.0.is_success() { + break; + } + + if get_utc_timestamp() > wait_until { + panic!("Timed out waiting for swap {} info in stats db", uuid); + } + + Timer::sleep(1.).await; + } +} + pub async fn check_recent_swaps(mm: &MarketMakerIt, expected_len: usize) { let response = mm .rpc(&json!({ From f3d9247531e58400407f3cbc2dda7b08fe6f72ff Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sat, 11 Oct 2025 17:20:52 +0200 Subject: [PATCH 07/12] change amounts from string to f64 as they fail row.get() db parsing --- mm2src/mm2_main/src/database/stats_swaps.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mm2src/mm2_main/src/database/stats_swaps.rs b/mm2src/mm2_main/src/database/stats_swaps.rs index 892b4a4e06..054fb9e0ca 100644 --- a/mm2src/mm2_main/src/database/stats_swaps.rs +++ b/mm2src/mm2_main/src/database/stats_swaps.rs @@ -490,8 +490,8 @@ pub struct SwapStatusFromDB { pub taker_coin: String, pub started_at: u64, pub finished_at: u64, - pub maker_amount: String, - pub taker_amount: String, + pub maker_amount: f64, + pub taker_amount: f64, pub is_success: bool, } From c1bd112206d6ff275379bc2957686c566478c1ca Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sat, 11 Oct 2025 17:38:12 +0200 Subject: [PATCH 08/12] resolve a fixme: let add_swap_to_index use async_blocking --- mm2src/mm2_main/src/lp_swap.rs | 55 +++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 77e7068756..6b81d56fef 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -367,8 +367,9 @@ pub async fn process_swap_msg(ctx: MmArc, topic: &str, msg: &[u8]) -> P2PRequest Ok(SwapStatus { data, .. }) => match data { SwapStatusData::Legacy(mut status) => { status.fetch_and_set_usd_prices().await; - if let Err(e) = save_stats_swap(&ctx, &status).await { - error!("Error saving the swap {} status: {}", status.uuid(), e); + let uuid = *status.uuid(); + if let Err(e) = save_stats_swap(&ctx, *status).await { + error!("Error saving the swap {} status: {}", uuid, e); } Ok(()) }, @@ -1035,17 +1036,20 @@ pub async fn add_v2swap_to_stats_db_index(ctx: &MmArc, swap: TPUSwapStatusForSta } #[cfg(not(target_arch = "wasm32"))] -fn add_swap_to_db_index(ctx: &MmArc, swap: &SavedSwap) { - if let Some(conn) = ctx.sqlite_conn_opt() { - crate::database::stats_swaps::add_swap_to_index(&conn, swap) - } +async fn add_swap_to_db_index(ctx: &MmArc, swap: SavedSwap) { + let ctx = ctx.clone(); + common::async_blocking(move || { + if let Some(conn) = ctx.sqlite_conn_opt() { + crate::database::stats_swaps::add_swap_to_index(&conn, &swap) + } + }) + .await } #[cfg(not(target_arch = "wasm32"))] -async fn save_stats_swap(ctx: &MmArc, swap: &SavedSwap) -> Result<(), String> { +async fn save_stats_swap(ctx: &MmArc, swap: SavedSwap) -> Result<(), String> { try_s!(swap.save_to_stats_db(ctx).await); - // FIXME: Such a DB persistence query should be async. - add_swap_to_db_index(ctx, swap); + add_swap_to_db_index(ctx, swap).await; Ok(()) } @@ -1213,19 +1217,29 @@ async fn broadcast_my_swap_status(ctx: &MmArc, uuid: Uuid) -> Result<(), String> }; status.hide_secrets(); - #[cfg(not(target_arch = "wasm32"))] - try_s!(save_stats_swap(ctx, &status).await); - let status = SwapStatus { method: "swapstatus".into(), data: SwapStatusData::Legacy(Box::new(status)), }; let msg = json::to_vec(&status).expect("Swap status ser should never fail"); + + #[cfg(not(target_arch = "wasm32"))] + { + // Extract status back. We are avoiding cloning it since it's a big object. + let SwapStatusData::Legacy(status) = status.data else { + unreachable!("We are sure that status is Legacy variant"); + }; + + try_s!(save_stats_swap(ctx, *status).await); + } + broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None); Ok(()) } async fn broadcast_my_v2swap_status(ctx: &MmArc, status: TPUSwapStatusForStats) -> Result<(), String> { + let uuid = status.uuid; + // Serialize the status to prepare for broadcasting let status = SwapStatus { method: "swapstatus".into(), @@ -1233,15 +1247,16 @@ async fn broadcast_my_v2swap_status(ctx: &MmArc, status: TPUSwapStatusForStats) }; let msg = json::to_vec(&status).expect("Swap status ser should never fail"); - // Extract status back. We are avoiding cloning it since it's a big object. - let SwapStatusData::Tpu(status) = status.data else { - unreachable!("We are sure that status is TPU variant"); - }; - let uuid = status.uuid; - - // Add the status to our own stats db index #[cfg(not(target_arch = "wasm32"))] - try_s!(add_v2swap_to_stats_db_index(ctx, *status).await); + { + // Extract status back. We are avoiding cloning it since it's a big object. + let SwapStatusData::Tpu(status) = status.data else { + unreachable!("We are sure that status is Tpu variant"); + }; + + // Add the status to our own stats db index + try_s!(add_v2swap_to_stats_db_index(ctx, *status).await); + } // Broadcast the status to the P2P network broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None); From ccc0a5ab07455c7560d7c7136458099ffdf928a7 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Thu, 16 Oct 2025 13:35:39 +0200 Subject: [PATCH 09/12] review(onur): change confusing is_terminal() naming --- mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs | 4 ++-- mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 4 ++-- mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index 43f33347db..64b38c9db5 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -151,8 +151,8 @@ pub enum MakerSwapEvent { } impl MakerSwapEvent { - /// Returns true if the event is terminal, i.e. no more events will be produced after it. - pub fn is_terminal(&self) -> bool { + /// Returns true if the event is an end state of the swap, i.e. no more events will be produced after it. + pub fn is_end_state(&self) -> bool { matches!( self, MakerSwapEvent::Aborted { .. } | MakerSwapEvent::Completed | MakerSwapEvent::MakerPaymentRefunded { .. } diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs index 653442b01a..64678dd95f 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs @@ -545,7 +545,7 @@ impl TPUSwapStatusForStats { .map_err(|_| SwapStatusGenerationError::StorageError)?; // Make sure the swap is finished (aborted, completed or refunded) - if repr.events.last().map(|e| !e.is_terminal()).unwrap_or(true) { + if repr.events.last().map(|e| !e.is_end_state()).unwrap_or(true) { return Err(SwapStatusGenerationError::SwapNotFinished); } @@ -638,7 +638,7 @@ impl TPUSwapStatusForStats { .map_err(|_| SwapStatusGenerationError::StorageError)?; // Make sure the swap is finished (aborted, completed or refunded) - if repr.events.last().map(|e| !e.is_terminal()).unwrap_or(true) { + if repr.events.last().map(|e| !e.is_end_state()).unwrap_or(true) { return Err(SwapStatusGenerationError::SwapNotFinished); } diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index 37611a6fad..05ce4dd713 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -181,8 +181,8 @@ pub enum TakerSwapEvent { } impl TakerSwapEvent { - /// Returns true if the event is terminal, i.e. no more events will be produced after it. - pub fn is_terminal(&self) -> bool { + /// Returns true if the event is an end state of the swap, i.e. no more events will be produced after it. + pub fn is_end_state(&self) -> bool { matches!( self, TakerSwapEvent::Aborted { .. } From ca87640b5df2a1725c83b2d21a284baebef3c124 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Thu, 16 Oct 2025 13:47:55 +0200 Subject: [PATCH 10/12] review(onur): add a warning notes to the heavy structs --- mm2src/mm2_main/src/lp_swap/saved_swap.rs | 1 + mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/mm2src/mm2_main/src/lp_swap/saved_swap.rs b/mm2src/mm2_main/src/lp_swap/saved_swap.rs index 08b2b3c12f..02472a5b70 100644 --- a/mm2src/mm2_main/src/lp_swap/saved_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/saved_swap.rs @@ -31,6 +31,7 @@ pub enum SavedSwapError { #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type")] +// NOTE: This is a big struct. Better not add Clone derive to it unless absolutely necessary. pub enum SavedSwap { Maker(MakerSavedSwap), Taker(TakerSavedSwap), diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs index 64678dd95f..68e547ee93 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs @@ -464,6 +464,7 @@ pub(super) async fn swap_kickstart_handler_for_taker( #[derive(Debug, Deserialize, Serialize)] /// The structure represents the swap information to be sent for statistics purposes. +// NOTE: This is a big struct. Better not add Clone derive to it unless absolutely necessary. pub struct TPUSwapStatusForStats { /// The swap unique identifier pub uuid: Uuid, From fcf1c5cfb25ed2a0563a222d417cc8cc29cb38b1 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Mon, 20 Oct 2025 20:55:40 +0200 Subject: [PATCH 11/12] review(onur): proper struct ref in doc comments and other simplifications --- mm2src/mm2_main/src/database/stats_swaps.rs | 5 +---- mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 9 +++------ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/mm2src/mm2_main/src/database/stats_swaps.rs b/mm2src/mm2_main/src/database/stats_swaps.rs index 054fb9e0ca..95199cc770 100644 --- a/mm2src/mm2_main/src/database/stats_swaps.rs +++ b/mm2src/mm2_main/src/database/stats_swaps.rs @@ -422,10 +422,7 @@ fn update_optional_info(swap: &TPUSwapStatusForStats) -> (String, OwnedSqlNamedP params.push((":taker_version", taker_version.clone().into())); } - let update_query = format!( - "UPDATE stats_swaps set swap_version = :swap_version{} WHERE uuid = :uuid;", - extra_args - ); + let update_query = format!("UPDATE stats_swaps set swap_version = :swap_version{extra_args} WHERE uuid = :uuid;",); // Swap version is actually mandatory and will always be set. params.push((":uuid", swap.uuid.to_string().into())); diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs index 68e547ee93..d188215878 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs @@ -462,9 +462,9 @@ pub(super) async fn swap_kickstart_handler_for_taker( } } -#[derive(Debug, Deserialize, Serialize)] /// The structure represents the swap information to be sent for statistics purposes. // NOTE: This is a big struct. Better not add Clone derive to it unless absolutely necessary. +#[derive(Debug, Deserialize, Serialize)] pub struct TPUSwapStatusForStats { /// The swap unique identifier pub uuid: Uuid, @@ -528,7 +528,7 @@ pub struct TPUSwapStatusForStats { } #[derive(Debug, Deserialize, Serialize)] -/// Represents either a batch of maker or taker swap events. This could be used to know whether a TPUSwapStatusForStats +/// Represents either a batch of maker or taker swap events. This could be used to know whether a [`TPUSwapStatusForStats`] /// is maker-originating or taker-originating. pub enum TPUSwapEvents { FromMaker(Vec), @@ -589,10 +589,7 @@ impl TPUSwapStatusForStats { // TODO: A proper check would be to open the market maker bot and check whether is is running and that the // swap we just performed is found within its SimpleMakerBotRegistry. The problem at the moment is that // we can't import TradingBotContext since that's part of lp_ordermatch and that would create a cyclic dependency. - let mut is_maker_bot = Some(false); - if machine.ctx.simple_market_maker_bot_ctx.lock().unwrap().is_some() { - is_maker_bot = Some(true); - } + let is_maker_bot = Some(machine.ctx.simple_market_maker_bot_ctx.lock().unwrap().is_some()); // Get the maker's p2p pubkey let (p2p_private_key, _) = p2p_private_and_peer_id_to_broadcast(&machine.ctx, machine.p2p_keypair.as_ref()); From afa66525ab0f7a0a43466379cc720506c8a84cbf Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Tue, 9 Dec 2025 16:48:37 +0100 Subject: [PATCH 12/12] handle the fixmes and move the broadcaster methods to swap_v2_common.rs --- mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs | 30 ++----------- mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 34 ++++++++++++++- mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs | 43 +++---------------- 3 files changed, 42 insertions(+), 65 deletions(-) diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index 5321749027..f33825a5cc 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -6,7 +6,7 @@ use super::{ }; use crate::lp_swap::maker_swap::MakerSwapPreparedParams; use crate::lp_swap::swap_lock::SwapLock; -use crate::lp_swap::{broadcast_my_v2swap_status, swap_v2_pb::*}; +use crate::lp_swap::swap_v2_pb::*; use crate::lp_swap::{ broadcast_swap_v2_msg_every, check_balance_for_maker_swap, recv_swap_v2_msg, SwapConfirmationsSettings, TransactionIdentifier, MAKER_SWAP_V2_TYPE, MAX_STARTED_AT_DIFF, @@ -2203,8 +2203,8 @@ impl, state_machine: &mut Self::StateMachine, ) -> ::Result { - // FIXME: Do we want to broadcast the swap status here? warn!("Swap {} was aborted with reason {}", state_machine.uuid, self.reason); + try_broadcast_v2_maker_swap_status(state_machine).await; } } @@ -2270,18 +2270,7 @@ impl ::Result { info!("Swap {} has been completed", state_machine.uuid); - - let swap_status = TPUSwapStatusForStats::try_from_maker_state_machine(state_machine) - .await - .map_err(|e| { - error!("Error converting finished state machine to swap status for stats: {e:?}"); - }); - - if let Ok(swap_status) = swap_status { - if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { - error!("Error broadcasting swap status: {e}"); - } - } + try_broadcast_v2_maker_swap_status(state_machine).await; } } @@ -2331,18 +2320,7 @@ impl, +) { + let swap_status = TPUSwapStatusForStats::try_from_maker_state_machine(state_machine) + .await + .map_err(|e| { + error!("Error converting finished state machine to swap status for stats: {e:?}"); + }); + + if let Ok(swap_status) = swap_status { + if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { + error!("Error broadcasting swap status: {e}"); + } + } +} + +pub async fn try_broadcast_v2_taker_swap_status( + state_machine: &TakerSwapStateMachine, +) { + let swap_status = TPUSwapStatusForStats::try_from_taker_state_machine(state_machine) + .await + .map_err(|e| { + error!("Error converting finished state machine to swap status for stats: {e:?}"); + }); + + if let Ok(swap_status) = swap_status { + if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { + error!("Error broadcasting swap status: {e}"); + } + } +} diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index 2ac45b72f0..9cdc48e98b 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -5,7 +5,7 @@ use super::{ NEGOTIATION_TIMEOUT_SEC, }; use crate::lp_swap::swap_lock::SwapLock; -use crate::lp_swap::{broadcast_my_v2swap_status, swap_v2_pb::*}; +use crate::lp_swap::swap_v2_pb::*; use crate::lp_swap::{ broadcast_swap_v2_msg_every, check_balance_for_taker_swap, recv_swap_v2_msg, swap_v2_topic, SwapConfirmationsSettings, TransactionIdentifier, MAX_STARTED_AT_DIFF, TAKER_SWAP_V2_TYPE, @@ -2588,8 +2588,8 @@ impl, state_machine: &mut Self::StateMachine, ) -> ::Result { - // FIXME: Do we want to broadcast the swap status here? warn!("Swap {} was aborted with reason {}", state_machine.uuid, self.reason); + try_broadcast_v2_taker_swap_status(state_machine).await; } } @@ -2659,18 +2659,7 @@ impl ::Result { info!("Swap {} has been completed", state_machine.uuid); - - let swap_status = TPUSwapStatusForStats::try_from_taker_state_machine(state_machine) - .await - .map_err(|e| { - error!("Error converting finished state machine to swap status for stats: {e:?}"); - }); - - if let Ok(swap_status) = swap_status { - if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { - error!("Error broadcasting swap status: {e}"); - } - } + try_broadcast_v2_taker_swap_status(state_machine).await; } } @@ -2720,18 +2709,7 @@ impl