diff --git a/mm2src/mm2_main/src/database.rs b/mm2src/mm2_main/src/database.rs index 4385b0b329..62c335d633 100644 --- a/mm2src/mm2_main/src/database.rs +++ b/mm2src/mm2_main/src/database.rs @@ -137,6 +137,10 @@ async fn migration_14(ctx: &MmArc) -> Vec<(&'static str, Vec)> { fix_maker_and_taker_pubkeys_in_stats_db(ctx).await } +fn migration_15() -> Vec<(&'static str, Vec)> { + db_common::sqlite::execute_batch(stats_swaps::ADD_SWAP_VERSION_AND_MARKET_MARGIN) +} + async fn statements_for_migration(ctx: &MmArc, current_migration: i64) -> Option)>> { match current_migration { 1 => Some(migration_1(ctx).await), @@ -153,6 +157,7 @@ async fn statements_for_migration(ctx: &MmArc, current_migration: i64) -> Option 12 => Some(migration_12()), 13 => Some(migration_13()), 14 => Some(migration_14(ctx).await), + 15 => Some(migration_15()), _ => None, } } diff --git a/mm2src/mm2_main/src/database/stats_swaps.rs b/mm2src/mm2_main/src/database/stats_swaps.rs index 4921072678..bd1dfb1eda 100644 --- a/mm2src/mm2_main/src/database/stats_swaps.rs +++ b/mm2src/mm2_main/src/database/stats_swaps.rs @@ -1,6 +1,6 @@ #![allow(deprecated)] // TODO: remove this once rusqlite is >= 0.29 -use crate::lp_swap::{MakerSavedSwap, SavedSwap, SavedSwapIo, TakerSavedSwap}; +use crate::lp_swap::{MakerSavedSwap, SavedSwap, SavedSwapIo, TPUSwapStatusForStats, TakerSavedSwap}; use common::log::{debug, error}; use db_common::{ owned_named_params, @@ -95,6 +95,13 @@ pub const ADD_MAKER_TAKER_GUI_AND_VERSION: &[&str] = &[ pub const SELECT_ID_BY_UUID: &str = "SELECT id FROM stats_swaps WHERE uuid = ?1"; +pub const ADD_SWAP_VERSION_AND_MARKET_MARGIN: &[&str] = &[ + "ALTER TABLE stats_swaps ADD COLUMN swap_version INTEGER NOT NULL DEFAULT 0;", + "ALTER TABLE stats_swaps ADD COLUMN market_margin DECIMAL;", + // NULL = unknown, 0 = not a bot, 1 = bot + "ALTER TABLE stats_swaps ADD COLUMN is_maker_bot INTEGER;", +]; + /// Returns SQL statements to initially fill stats_swaps table using existing DB with JSON files pub async fn create_and_fill_stats_swaps_from_json_statements(ctx: &MmArc) -> Vec<(&'static str, Vec)> { let maker_swaps = SavedSwap::load_all_from_maker_stats_db(ctx).await.unwrap_or_default(); @@ -434,6 +441,151 @@ pub fn add_swap_to_index(conn: &Connection, swap: &SavedSwap) { } } +/// Constructs the update query for optional fields of the TPU swap status struct. That is, any field that is wrapped in an Option. +fn update_optional_info_tpu(swap: &TPUSwapStatusForStats) -> (String, OwnedSqlNamedParams) { + let mut extra_args = String::new(); + let mut params = OwnedSqlNamedParams::new(); + + if let Some(ref maker_pubkey) = swap.maker_swap_pubkey { + extra_args.push_str(", maker_pubkey = :maker_pubkey"); + params.push((":maker_pubkey", maker_pubkey.clone().into())); + } + if let Some(ref taker_pubkey) = swap.taker_swap_pubkey { + extra_args.push_str(", taker_pubkey = :taker_pubkey"); + params.push((":taker_pubkey", taker_pubkey.clone().into())); + } + if let Some(ref maker_coin_usd_price) = swap.maker_coin_usd_price { + extra_args.push_str(", maker_coin_usd_price = :maker_coin_usd_price"); + params.push((":maker_coin_usd_price", maker_coin_usd_price.to_string().into())); + } + if let Some(ref taker_coin_usd_price) = swap.taker_coin_usd_price { + extra_args.push_str(", taker_coin_usd_price = :taker_coin_usd_price"); + params.push((":taker_coin_usd_price", taker_coin_usd_price.to_string().into())); + } + if let Some(ref market_margin) = swap.market_margin { + extra_args.push_str(", market_margin = :market_margin"); + params.push((":market_margin", market_margin.to_string().into())); + } + if let Some(is_maker_bot) = swap.is_maker_bot { + extra_args.push_str(", is_maker_bot = :is_maker_bot"); + params.push((":is_maker_bot", (is_maker_bot as u32).into())); + } + if let Some(ref maker_gui) = swap.maker_gui { + extra_args.push_str(", maker_gui = :maker_gui"); + params.push((":maker_gui", maker_gui.clone().into())); + } + if let Some(ref maker_version) = swap.maker_version { + extra_args.push_str(", maker_version = :maker_version"); + params.push((":maker_version", maker_version.clone().into())); + } + if let Some(ref taker_gui) = swap.taker_gui { + extra_args.push_str(", taker_gui = :taker_gui"); + params.push((":taker_gui", taker_gui.clone().into())); + } + if let Some(ref taker_version) = swap.taker_version { + extra_args.push_str(", taker_version = :taker_version"); + params.push((":taker_version", taker_version.clone().into())); + } + + let update_query = format!("UPDATE stats_swaps set swap_version = :swap_version{extra_args} WHERE uuid = :uuid;",); + + // Swap version is actually mandatory and will always be set. + params.push((":uuid", swap.uuid.to_string().into())); + params.push((":swap_version", (swap.swap_version as u32).into())); + + (update_query, params) +} + +/// Adds a TPU swap to stats_swap table if it's not present and updates the swap with the new data if it already exists. +pub fn add_v2swap_to_index(conn: &Connection, swap: &TPUSwapStatusForStats) -> Result<(), String> { + let params = vec![swap.uuid.to_string()]; + let query_row = conn.query_row(SELECT_ID_BY_UUID, params_from_iter(params.iter()), |row| { + row.get::<_, i64>(0) + }); + match query_row.optional() { + // swap is not indexed yet, insert it into the DB + Ok(None) => { + let (maker_coin_ticker, maker_coin_platform) = split_coin(&swap.maker_coin); + let (taker_coin_ticker, taker_coin_platform) = split_coin(&swap.taker_coin); + + let params = owned_named_params! { + ":maker_coin": swap.maker_coin.clone(), + ":maker_coin_ticker": maker_coin_ticker, + ":maker_coin_platform": maker_coin_platform, + ":taker_coin": swap.taker_coin.clone(), + ":taker_coin_ticker": taker_coin_ticker, + ":taker_coin_platform": taker_coin_platform, + ":uuid": swap.uuid.to_string(), + ":started_at": swap.started_at.to_string(), + ":finished_at": swap.finished_at.to_string(), + ":maker_amount": swap.maker_amount.to_string(), + ":taker_amount": swap.taker_amount.to_string(), + ":is_success": (swap.is_success() as u32).to_string(), + // We will set those in the optional info update. For now setting them to NULL to conform with `INSERT_STATS_SWAP` statement. + ":maker_coin_usd_price": None::, + ":taker_coin_usd_price": None::, + ":maker_pubkey": None::, + ":taker_pubkey": None::, + }; + + execute_query_with_params(conn, INSERT_STATS_SWAP, params); + }, + // swap is already indexed. Only update missing fields. + Ok(Some(_)) => (), + Err(e) => { + let err_msg = format!("Error {} on query {} with params {:?}", e, SELECT_ID_BY_UUID, params); + error!("{}", err_msg); + return Err(err_msg); + }, + }; + + let (sql, params) = update_optional_info_tpu(swap); + + execute_query_with_params(conn, &sql, params); + + Ok(()) +} + +#[derive(Debug, Serialize)] +pub struct SwapStatusFromDB { + pub maker_coin: String, + pub taker_coin: String, + pub started_at: u64, + pub finished_at: u64, + pub maker_amount: f64, + pub taker_amount: f64, + pub is_success: bool, +} + +pub fn fetch_swap_status(conn: &Connection, uuid: &str) -> Result, String> { + const SELECT_SWAP_STATUS_BY_UUID: &str = "SELECT maker_coin, taker_coin, started_at, finished_at, maker_amount, taker_amount, is_success FROM stats_swaps WHERE uuid = ?1"; + + let params = vec![uuid.to_string()]; + let query_row = conn.query_row(SELECT_SWAP_STATUS_BY_UUID, params_from_iter(params.iter()), |row| { + Ok(SwapStatusFromDB { + maker_coin: row.get(0)?, + taker_coin: row.get(1)?, + started_at: row.get(2)?, + finished_at: row.get(3)?, + maker_amount: row.get(4)?, + taker_amount: row.get(5)?, + is_success: row.get::<_, u32>(6)? != 0, + }) + }); + match query_row.optional() { + Ok(Some(swap_status)) => Ok(Some(swap_status)), + Ok(None) => Ok(None), + Err(e) => { + let err_msg = format!( + "Error {} on query {} with params {:?}", + e, SELECT_SWAP_STATUS_BY_UUID, params + ); + error!("{}", err_msg); + Err(err_msg) + }, + } +} + #[test] fn test_split_coin() { let input = ""; diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 9e8dcee41c..3727a441f9 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -140,6 +140,7 @@ use pubkey_banning::BanReason; pub use pubkey_banning::{ban_pubkey_rpc, is_pubkey_banned, list_banned_pubkeys_rpc, unban_pubkeys_rpc}; pub use recreate_swap_data::recreate_swap_data; pub use saved_swap::{SavedSwap, SavedSwapError, SavedSwapIo, SavedSwapResult}; +pub use swap_v2_common::TPUSwapStatusForStats; use swap_v2_common::{ get_unfinished_swaps_uuids, swap_kickstart_handler_for_maker, swap_kickstart_handler_for_taker, ActiveSwapV2Info, }; @@ -363,12 +364,22 @@ pub async fn process_swap_msg(ctx: MmArc, topic: &str, msg: &[u8]) -> P2PRequest Err(swap_msg_err) => { #[cfg(not(target_arch = "wasm32"))] return match json::from_slice::(msg) { - Ok(mut status) => { - status.data.fetch_and_set_usd_prices().await; - if let Err(e) = save_stats_swap(&ctx, &status.data).await { - error!("Error saving the swap {} status: {}", status.data.uuid(), e); - } - Ok(()) + Ok(SwapStatus { data, .. }) => match data { + SwapStatusData::Legacy(mut status) => { + status.fetch_and_set_usd_prices().await; + let uuid = *status.uuid(); + if let Err(e) = save_stats_swap(&ctx, *status).await { + error!("Error saving the swap {} status: {}", uuid, e); + } + Ok(()) + }, + SwapStatusData::Tpu(status) => { + let uuid = status.uuid; + if let Err(e) = add_v2swap_to_stats_db_index(&ctx, *status).await { + error!("Error saving the swap {} status: {}", uuid, e); + } + Ok(()) + }, }, Err(swap_status_err) => { let error = format!( @@ -1012,16 +1023,33 @@ pub async fn insert_new_swap_to_db( } #[cfg(not(target_arch = "wasm32"))] -fn add_swap_to_db_index(ctx: &MmArc, swap: &SavedSwap) { - if let Some(conn) = ctx.sqlite_conn_opt() { - crate::database::stats_swaps::add_swap_to_index(&conn, swap) - } +pub async fn add_v2swap_to_stats_db_index(ctx: &MmArc, swap: TPUSwapStatusForStats) -> Result<(), String> { + let ctx = ctx.clone(); + common::async_blocking(move || { + if let Some(conn) = ctx.sqlite_conn_opt() { + crate::database::stats_swaps::add_v2swap_to_index(&conn, &swap) + } else { + ERR!("No SQLite connection available") + } + }) + .await +} + +#[cfg(not(target_arch = "wasm32"))] +async fn add_swap_to_db_index(ctx: &MmArc, swap: SavedSwap) { + let ctx = ctx.clone(); + common::async_blocking(move || { + if let Some(conn) = ctx.sqlite_conn_opt() { + crate::database::stats_swaps::add_swap_to_index(&conn, &swap) + } + }) + .await } #[cfg(not(target_arch = "wasm32"))] -async fn save_stats_swap(ctx: &MmArc, swap: &SavedSwap) -> Result<(), String> { +async fn save_stats_swap(ctx: &MmArc, swap: SavedSwap) -> Result<(), String> { try_s!(swap.save_to_stats_db(ctx).await); - add_swap_to_db_index(ctx, swap); + add_swap_to_db_index(ctx, swap).await; Ok(()) } @@ -1168,10 +1196,17 @@ pub async fn stats_swap_status(ctx: MmArc, req: Json) -> Result Ok(try_s!(Response::builder().body(res))) } +#[derive(Debug, Deserialize, Serialize)] +#[serde(untagged)] +enum SwapStatusData { + Legacy(Box), + Tpu(Box), +} + #[derive(Debug, Deserialize, Serialize)] struct SwapStatus { method: String, - data: SavedSwap, + data: SwapStatusData, } /// Broadcasts `my` swap status to P2P network @@ -1182,15 +1217,50 @@ async fn broadcast_my_swap_status(ctx: &MmArc, uuid: Uuid) -> Result<(), String> }; status.hide_secrets(); + let status = SwapStatus { + method: "swapstatus".into(), + data: SwapStatusData::Legacy(Box::new(status)), + }; + let msg = json::to_vec(&status).expect("Swap status ser should never fail"); + #[cfg(not(target_arch = "wasm32"))] - try_s!(save_stats_swap(ctx, &status).await); + { + // Extract status back. We are avoiding cloning it since it's a big object. + let SwapStatusData::Legacy(status) = status.data else { + unreachable!("We are sure that status is Legacy variant"); + }; + + try_s!(save_stats_swap(ctx, *status).await); + } + + broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None); + Ok(()) +} +async fn broadcast_my_v2swap_status(ctx: &MmArc, status: TPUSwapStatusForStats) -> Result<(), String> { + let uuid = status.uuid; + + // Serialize the status to prepare for broadcasting let status = SwapStatus { method: "swapstatus".into(), - data: status, + data: SwapStatusData::Tpu(Box::new(status)), }; let msg = json::to_vec(&status).expect("Swap status ser should never fail"); + + #[cfg(not(target_arch = "wasm32"))] + { + // Extract status back. We are avoiding cloning it since it's a big object. + let SwapStatusData::Tpu(status) = status.data else { + unreachable!("We are sure that status is Tpu variant"); + }; + + // Add the status to our own stats db index + try_s!(add_v2swap_to_stats_db_index(ctx, *status).await); + } + + // Broadcast the status to the P2P network broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None); + Ok(()) } diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index ec6819727d..f33825a5cc 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -65,7 +65,7 @@ pub struct StoredNegotiationData { taker_payment_locktime: u64, taker_funding_locktime: u64, maker_coin_htlc_pub_from_taker: BytesJson, - taker_coin_htlc_pub_from_taker: BytesJson, + pub taker_coin_htlc_pub_from_taker: BytesJson, maker_coin_swap_contract: Option, taker_coin_swap_contract: Option, taker_secret_hash: BytesJson, @@ -151,6 +151,33 @@ pub enum MakerSwapEvent { Completed, } +impl MakerSwapEvent { + /// Returns true if the event is an end state of the swap, i.e. no more events will be produced after it. + pub fn is_end_state(&self) -> bool { + matches!( + self, + MakerSwapEvent::Aborted { .. } | MakerSwapEvent::Completed | MakerSwapEvent::MakerPaymentRefunded { .. } + ) + } + + /// Returns negotiation data if the event contains it. + pub fn negotiation_data(&self) -> Option<&StoredNegotiationData> { + match self { + MakerSwapEvent::WaitingForTakerFunding { negotiation_data, .. } + | MakerSwapEvent::TakerFundingReceived { negotiation_data, .. } + | MakerSwapEvent::MakerPaymentSentFundingSpendGenerated { negotiation_data, .. } + | MakerSwapEvent::MakerPaymentRefundRequired { negotiation_data, .. } + | MakerSwapEvent::TakerPaymentReceived { negotiation_data, .. } + | MakerSwapEvent::TakerPaymentReceivedAndPreimageValidationSkipped { negotiation_data, .. } + | MakerSwapEvent::TakerPaymentSpent { negotiation_data, .. } => Some(negotiation_data), + MakerSwapEvent::Initialized { .. } + | MakerSwapEvent::MakerPaymentRefunded { .. } + | MakerSwapEvent::Aborted { .. } + | MakerSwapEvent::Completed => None, + } + } +} + /// Storage for maker swaps. #[derive(Clone)] pub struct MakerSwapStorage { @@ -468,7 +495,7 @@ impl Vec { + pub fn unique_data(&self) -> Vec { self.secret_hash() } @@ -2177,6 +2204,7 @@ impl ::Result { warn!("Swap {} was aborted with reason {}", state_machine.uuid, self.reason); + try_broadcast_v2_maker_swap_status(state_machine).await; } } @@ -2242,6 +2270,7 @@ impl ::Result { info!("Swap {} has been completed", state_machine.uuid); + try_broadcast_v2_maker_swap_status(state_machine).await; } } @@ -2291,6 +2320,7 @@ impl, + /// The amount of the maker's coin + pub maker_amount: BigDecimal, + + /// The coin name of the taker + pub taker_coin: String, + /// The public key of the taker (to which the maker's coins were paid) + pub taker_swap_pubkey: Option, + /// The amount of the taker's coin + pub taker_amount: BigDecimal, + + /// The price of the maker's coin in USD at the moment of the swap + pub maker_coin_usd_price: Option, + /// The price of the taker's coin in USD at the moment of the swap + pub taker_coin_usd_price: Option, + /// The difference (in +/- percentage) between the market price and the swap price at the moment of the swap (from the maker's pov) + pub market_margin: Option, + /// Is the maker a bot. Possible values are: Some(true) (yes), Some(false) (no), None (unknown) + pub is_maker_bot: Option, + + /// The GUI of the maker + pub maker_gui: Option, + /// The maker's KDF version + pub maker_version: Option, + /// The GUI of the taker + pub taker_gui: Option, + /// The taker's KDF version + pub taker_version: Option, + + /// The version of the swap protocol used in the swap + /// Note that this field should start with 2 because this struct is specific to TPU swaps + pub swap_version: u8, + + // The next set of fields are extra and currently not part of the swap stats + /// Maker's p2p pubkey + pub maker_p2p_pubkey: Secp256k1PubkeySerialize, + /// Taker's p2p pubkey + pub taker_p2p_pubkey: Secp256k1PubkeySerialize, + + /// Premium paid by taker to maker + pub taker_premium: BigDecimal, + /// The amount of fee paid by the taker to the DEX + pub dex_fee_amount: BigDecimal, + /// The amount of DEX fee burnt + pub dex_fee_burn: BigDecimal, + + /// The maker or taker detailed swap events + pub events: TPUSwapEvents, +} + +#[derive(Debug, Deserialize, Serialize)] +/// Represents either a batch of maker or taker swap events. This could be used to know whether a [`TPUSwapStatusForStats`] +/// is maker-originating or taker-originating. +pub enum TPUSwapEvents { + FromMaker(Vec), + FromTaker(Vec), +} + +impl TPUSwapStatusForStats { + pub async fn try_from_maker_state_machine( + machine: &MakerSwapStateMachine, + ) -> Result { + let repr = machine + .storage + .get_repr(machine.uuid) + .await + .map_err(|_| SwapStatusGenerationError::StorageError)?; + + // Make sure the swap is finished (aborted, completed or refunded) + if repr.events.last().map(|e| !e.is_end_state()).unwrap_or(true) { + return Err(SwapStatusGenerationError::SwapNotFinished); + } + + // Make sure the swap is of version 2 or higher (since TPU starts from v2) + if repr.swap_version < 2 { + return Err(SwapStatusGenerationError::InvalidSwapVersion); + } + + // Calculate the usd prices of the coins and the market margin + let mut maker_coin_usd_price = None; + let mut taker_coin_usd_price = None; + let mut market_margin = None; + let rates = fetch_swap_coins_price(Some(repr.maker_coin.clone()), Some(repr.taker_coin.clone())).await; + if let Some(ref rates) = rates { + let fair_market_price = &rates.rel / &rates.base; + let swap_price = repr.taker_volume.to_decimal() / repr.maker_volume.to_decimal(); + market_margin = Some( + (&swap_price - &fair_market_price) / fair_market_price + * BigDecimal::try_from(100.0).expect("100.0 is a valid non-NAN float"), + ); + maker_coin_usd_price = Some(rates.base.clone()); + taker_coin_usd_price = Some(rates.rel.clone()); + } + + // Get the maker's swap pubkey + let maker_swap_pubkey = machine.maker_coin.derive_htlc_pubkey_v2_bytes(&machine.unique_data()); + let maker_swap_pubkey = Some(hex::encode(maker_swap_pubkey)); + + // Get the taker's swap pubkey from the negotiation data in the events if available + let mut taker_swap_pubkey = None; + for event in &repr.events { + if let Some(negotiation_data) = event.negotiation_data() { + taker_swap_pubkey = Some(hex::encode(negotiation_data.taker_coin_htlc_pub_from_taker.0.clone())); + break; + } + } + + // Determine if the maker is a bot + // This is overly simplistic check and only checks whether the simple_market_maker_bot_ctx was initialized. + // TODO: A proper check would be to open the market maker bot and check whether is is running and that the + // swap we just performed is found within its SimpleMakerBotRegistry. The problem at the moment is that + // we can't import TradingBotContext since that's part of lp_ordermatch and that would create a cyclic dependency. + let is_maker_bot = Some(machine.ctx.simple_market_maker_bot_ctx.lock().unwrap().is_some()); + + // Get the maker's p2p pubkey + let (p2p_private_key, _) = p2p_private_and_peer_id_to_broadcast(&machine.ctx, machine.p2p_keypair.as_ref()); + let secp_secret = SecretKey::from_slice(&p2p_private_key).expect("valid secret key"); + let maker_p2p_pubkey = PublicKey::from_secret_key(&SECP_SIGN, &secp_secret).into(); + + Ok(TPUSwapStatusForStats { + uuid: repr.uuid, + started_at: repr.started_at, + // Assuming that this method gets called right after the swap is finished. + // TODO: Consider storing the finished_at timestamp in the DB and/or state machine and use that. + finished_at: now_sec(), + maker_coin: repr.maker_coin, + maker_swap_pubkey, + maker_amount: repr.maker_volume.to_decimal(), + taker_coin: repr.taker_coin, + taker_swap_pubkey, + taker_amount: repr.taker_volume.to_decimal(), + maker_coin_usd_price, + taker_coin_usd_price, + market_margin, + is_maker_bot, + maker_gui: machine.ctx.gui().map(|g| g.to_owned()), + maker_version: Some(machine.ctx.mm_version().into()), + taker_gui: None, + taker_version: None, + swap_version: repr.swap_version, + maker_p2p_pubkey, + taker_p2p_pubkey: repr.taker_p2p_pub, + taker_premium: repr.taker_premium.to_decimal(), + dex_fee_amount: repr.dex_fee_amount.to_decimal(), + dex_fee_burn: repr.dex_fee_burn.to_decimal(), + events: TPUSwapEvents::FromMaker(repr.events), + }) + } + + pub async fn try_from_taker_state_machine( + machine: &TakerSwapStateMachine, + ) -> Result { + let repr = machine + .storage + .get_repr(machine.uuid) + .await + .map_err(|_| SwapStatusGenerationError::StorageError)?; + + // Make sure the swap is finished (aborted, completed or refunded) + if repr.events.last().map(|e| !e.is_end_state()).unwrap_or(true) { + return Err(SwapStatusGenerationError::SwapNotFinished); + } + + // Make sure the swap is of version 2 or higher (since TPU starts from v2) + if repr.swap_version < 2 { + return Err(SwapStatusGenerationError::InvalidSwapVersion); + } + + // Calculate the usd prices of the coins and the market margin + let mut maker_coin_usd_price = None; + let mut taker_coin_usd_price = None; + let mut market_margin = None; + let rates = fetch_swap_coins_price(Some(repr.maker_coin.clone()), Some(repr.taker_coin.clone())).await; + if let Some(ref rates) = rates { + let fair_market_price = &rates.rel / &rates.base; + let swap_price = repr.taker_volume.to_decimal() / repr.maker_volume.to_decimal(); + market_margin = Some( + (&swap_price - &fair_market_price) / fair_market_price + * BigDecimal::try_from(100.0).expect("100.0 is a valid non-NAN float"), + ); + maker_coin_usd_price = Some(rates.base.clone()); + taker_coin_usd_price = Some(rates.rel.clone()); + } + + // Get the taker's swap pubkey + let taker_swap_pubkey = machine.taker_coin.derive_htlc_pubkey_v2_bytes(&machine.unique_data()); + let taker_swap_pubkey = Some(hex::encode(taker_swap_pubkey)); + + // Get the maker's swap pubkey from the negotiation data in the events if available + let mut maker_swap_pubkey = None; + for event in &repr.events { + if let Some(negotiation_data) = event.negotiation_data() { + maker_swap_pubkey = Some(hex::encode(negotiation_data.maker_coin_htlc_pub_from_maker.0.clone())); + break; + } + } + + // Get the taker's p2p pubkey + let (p2p_private_key, _) = p2p_private_and_peer_id_to_broadcast(&machine.ctx, machine.p2p_keypair.as_ref()); + let secp_secret = SecretKey::from_slice(&p2p_private_key).expect("valid secret key"); + let taker_p2p_pubkey = PublicKey::from_secret_key(&SECP_SIGN, &secp_secret).into(); + + Ok(TPUSwapStatusForStats { + uuid: repr.uuid, + started_at: repr.started_at, + // Assuming that this method gets called right after the swap is finished. + // TODO: Consider storing the finished_at timestamp in the DB and/or state machine and use that. + finished_at: now_sec(), + maker_coin: repr.maker_coin, + maker_swap_pubkey, + maker_amount: repr.maker_volume.to_decimal(), + taker_coin: repr.taker_coin, + taker_swap_pubkey, + taker_amount: repr.taker_volume.to_decimal(), + maker_coin_usd_price, + taker_coin_usd_price, + market_margin, + is_maker_bot: None, + maker_gui: None, + maker_version: None, + taker_gui: machine.ctx.gui().map(|g| g.to_owned()), + taker_version: Some(machine.ctx.mm_version().into()), + swap_version: repr.swap_version, + maker_p2p_pubkey: repr.maker_p2p_pub, + taker_p2p_pubkey, + taker_premium: repr.taker_premium.to_decimal(), + dex_fee_amount: repr.dex_fee_amount.to_decimal(), + dex_fee_burn: repr.dex_fee_burn.to_decimal(), + events: TPUSwapEvents::FromTaker(repr.events), + }) + } + + pub fn is_success(&self) -> bool { + match self.events { + TPUSwapEvents::FromMaker(ref events) => events + .last() + .map(|e| matches!(e, MakerSwapEvent::Completed)) + .unwrap_or(false), + TPUSwapEvents::FromTaker(ref events) => events + .last() + .map(|e| matches!(e, TakerSwapEvent::Completed)) + .unwrap_or(false), + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +/// Errors that could be returned when generating the swap status for stats from a swap state machine. +pub enum SwapStatusGenerationError { + StorageError, + SwapNotFinished, + InvalidSwapVersion, +} + +pub async fn try_broadcast_v2_maker_swap_status( + state_machine: &MakerSwapStateMachine, +) { + let swap_status = TPUSwapStatusForStats::try_from_maker_state_machine(state_machine) + .await + .map_err(|e| { + error!("Error converting finished state machine to swap status for stats: {e:?}"); + }); + + if let Ok(swap_status) = swap_status { + if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { + error!("Error broadcasting swap status: {e}"); + } + } +} + +pub async fn try_broadcast_v2_taker_swap_status( + state_machine: &TakerSwapStateMachine, +) { + let swap_status = TPUSwapStatusForStats::try_from_taker_state_machine(state_machine) + .await + .map_err(|e| { + error!("Error converting finished state machine to swap status for stats: {e:?}"); + }); + + if let Ok(swap_status) = swap_status { + if let Err(e) = broadcast_my_v2swap_status(&state_machine.ctx, swap_status).await { + error!("Error broadcasting swap status: {e}"); + } + } +} diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs index 9915978152..ad3f70f5b0 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_rpcs.rs @@ -21,6 +21,7 @@ use uuid::Uuid; cfg_native!( use crate::database::my_swaps::SELECT_MY_SWAP_V2_FOR_RPC_BY_UUID; + use crate::database::stats_swaps; use common::async_blocking; use db_common::sqlite::query_single_row; use db_common::sqlite::rusqlite::{Result as SqlResult, Row, Error as SqlError}; @@ -534,3 +535,37 @@ pub(crate) async fn active_swaps_rpc( statuses, }) } + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Deserialize)] +pub(crate) struct FetchSwapStatusRequest { + uuid: Uuid, +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Display, Serialize, SerializeErrorType)] +#[serde(tag = "error_type", content = "error_data")] +pub(crate) enum FetchSwapStatusError { + NoSwapWithUuid(Uuid), + DBFetchError(String), +} + +#[cfg(not(target_arch = "wasm32"))] +impl HttpStatusCode for FetchSwapStatusError { + fn status_code(&self) -> StatusCode { + match self { + FetchSwapStatusError::NoSwapWithUuid(_) => StatusCode::NOT_FOUND, + FetchSwapStatusError::DBFetchError(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +pub(crate) async fn fetch_swap_status_rpc( + ctx: MmArc, + req: FetchSwapStatusRequest, +) -> MmResult { + stats_swaps::fetch_swap_status(&ctx.sqlite_connection(), &req.uuid.to_string()) + .map_to_mm(|e| FetchSwapStatusError::DBFetchError(e.to_string()))? + .or_mm_err(|| FetchSwapStatusError::NoSwapWithUuid(req.uuid)) +} diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index 0298f3ac1a..9cdc48e98b 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -63,7 +63,7 @@ pub struct StoredNegotiationData { maker_payment_locktime: u64, maker_secret_hash: BytesJson, taker_coin_maker_address: String, - maker_coin_htlc_pub_from_maker: BytesJson, + pub maker_coin_htlc_pub_from_maker: BytesJson, taker_coin_htlc_pub_from_maker: BytesJson, maker_coin_swap_contract: Option, taker_coin_swap_contract: Option, @@ -180,6 +180,40 @@ pub enum TakerSwapEvent { Completed, } +impl TakerSwapEvent { + /// Returns true if the event is an end state of the swap, i.e. no more events will be produced after it. + pub fn is_end_state(&self) -> bool { + matches!( + self, + TakerSwapEvent::Aborted { .. } + | TakerSwapEvent::Completed + | TakerSwapEvent::TakerFundingRefunded { .. } + | TakerSwapEvent::TakerPaymentRefunded { .. } + ) + } + + /// Returns negotiation data if the event contains it. + pub fn negotiation_data(&self) -> Option<&StoredNegotiationData> { + match self { + TakerSwapEvent::Negotiated { negotiation_data, .. } + | TakerSwapEvent::TakerFundingSent { negotiation_data, .. } + | TakerSwapEvent::TakerFundingRefundRequired { negotiation_data, .. } + | TakerSwapEvent::MakerPaymentAndFundingSpendPreimgReceived { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentSent { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentSentAndPreimageSendingSkipped { negotiation_data, .. } + | TakerSwapEvent::MakerPaymentConfirmed { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentSpent { negotiation_data, .. } + | TakerSwapEvent::MakerPaymentSpent { negotiation_data, .. } + | TakerSwapEvent::TakerPaymentRefundRequired { negotiation_data, .. } => Some(negotiation_data), + TakerSwapEvent::Initialized { .. } + | TakerSwapEvent::TakerFundingRefunded { .. } + | TakerSwapEvent::TakerPaymentRefunded { .. } + | TakerSwapEvent::Aborted { .. } + | TakerSwapEvent::Completed => None, + } + } +} + /// Storage for taker swaps. #[derive(Clone)] pub struct TakerSwapStorage { @@ -477,7 +511,7 @@ impl Vec { + pub fn unique_data(&self) -> Vec { self.uuid.as_bytes().to_vec() } @@ -2555,6 +2589,7 @@ impl ::Result { warn!("Swap {} was aborted with reason {}", state_machine.uuid, self.reason); + try_broadcast_v2_taker_swap_status(state_machine).await; } } @@ -2624,6 +2659,7 @@ impl ::Result { info!("Swap {} has been completed", state_machine.uuid); + try_broadcast_v2_taker_swap_status(state_machine).await; } } @@ -2673,6 +2709,7 @@ impl DispatcherResult handle_mmrpc(ctx, request, start_version_stat_collection).await, "stop_simple_market_maker_bot" => handle_mmrpc(ctx, request, stop_simple_market_maker_bot).await, "stop_version_stat_collection" => handle_mmrpc(ctx, request, stop_version_stat_collection).await, + #[cfg(not(target_arch = "wasm32"))] + "swap_info_from_stats_db" => handle_mmrpc(ctx, request, fetch_swap_status_rpc).await, "trade_preimage" => handle_mmrpc(ctx, request, trade_preimage_rpc).await, "trezor_connection_status" => handle_mmrpc(ctx, request, trezor_connection_status).await, "update_nft" => handle_mmrpc(ctx, request, update_nft).await, diff --git a/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs b/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs index 0069ce9c72..0ce0159b9e 100644 --- a/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs +++ b/mm2src/mm2_main/tests/docker_tests/swap_proto_v2_tests.rs @@ -15,7 +15,7 @@ use mm2_number::MmNumber; use mm2_test_helpers::for_tests::{ active_swaps, check_recent_swaps, coins_needed_for_kickstart, disable_coin, disable_coin_err, enable_native, get_locked_amount, mm_dump, my_swap_status, mycoin1_conf, mycoin_conf, start_swaps, wait_for_swap_finished, - wait_for_swap_status, MarketMakerIt, Mm2TestConf, + wait_for_swap_info_stats_db, wait_for_swap_status, MarketMakerIt, Mm2TestConf, }; use mm2_test_helpers::structs::MmNumberMultiRepr; use script::{Builder, Opcode}; @@ -947,3 +947,56 @@ fn test_v2_swap_utxo_utxo_file_lock() { block_on(mm_alice_dup.wait_for_log(22., |log| log.contains(&expected_log))).unwrap(); } } + +#[test] +fn test_v2_swap_utxo_status_broadcasting() { + let (_ctx, _, bob_priv_key) = generate_utxo_coin_with_random_privkey(MYCOIN, 1000.into()); + let (_ctx, _, alice_priv_key) = generate_utxo_coin_with_random_privkey(MYCOIN1, 1000.into()); + let coins = json!([mycoin_conf(1000), mycoin1_conf(1000)]); + + let bob_conf = Mm2TestConf::seednode_trade_v2(&format!("0x{}", hex::encode(bob_priv_key)), &coins); + let mut mm_bob = MarketMakerIt::start(bob_conf.conf.clone(), bob_conf.rpc_password.clone(), None).unwrap(); + let (_bob_dump_log, _bob_dump_dashboard) = mm_dump(&mm_bob.log_path); + log!("Bob log path: {}", mm_bob.log_path.display()); + + let alice_conf = Mm2TestConf::light_node_trade_v2( + &format!("0x{}", hex::encode(alice_priv_key)), + &coins, + &[&mm_bob.ip.to_string()], + ); + let mut mm_alice = MarketMakerIt::start(alice_conf.conf.clone(), alice_conf.rpc_password.clone(), None).unwrap(); + let (_alice_dump_log, _alice_dump_dashboard) = mm_dump(&mm_alice.log_path); + log!("Alice log path: {}", mm_alice.log_path.display()); + + log!("{:?}", block_on(enable_native(&mm_bob, MYCOIN, &[], None))); + log!("{:?}", block_on(enable_native(&mm_bob, MYCOIN1, &[], None))); + log!("{:?}", block_on(enable_native(&mm_alice, MYCOIN, &[], None))); + log!("{:?}", block_on(enable_native(&mm_alice, MYCOIN1, &[], None))); + + let uuids = block_on(start_swaps( + &mut mm_bob, + &mut mm_alice, + &[(MYCOIN, MYCOIN1)], + 1.0, + 1.0, + 100., + )); + log!("{:?}", uuids); + + for uuid in uuids.iter() { + block_on(wait_for_swap_finished(&mm_bob, uuid, 60)); + block_on(wait_for_swap_finished(&mm_alice, uuid, 30)); + + let maker_swap_status = block_on(my_swap_status(&mm_bob, uuid)); + log!("{:?}", maker_swap_status); + + let taker_swap_status = block_on(my_swap_status(&mm_alice, uuid)); + log!("{:?}", taker_swap_status); + } + + // Make sure that the swap status was persisted in stats DB. + for uuid in uuids.iter() { + block_on(wait_for_swap_info_stats_db(&mm_bob, uuid, 60)); + block_on(wait_for_swap_info_stats_db(&mm_alice, uuid, 30)); + } +} diff --git a/mm2src/mm2_test_helpers/src/for_tests.rs b/mm2src/mm2_test_helpers/src/for_tests.rs index d103e9c6ad..0d039a821b 100644 --- a/mm2src/mm2_test_helpers/src/for_tests.rs +++ b/mm2src/mm2_test_helpers/src/for_tests.rs @@ -2753,6 +2753,33 @@ pub async fn wait_check_stats_swap_status(mm: &MarketMakerIt, uuid: &str, timeou } } +pub async fn wait_for_swap_info_stats_db(mm: &MarketMakerIt, uuid: &str, timeout_sec: i64) { + let wait_until = get_utc_timestamp() + timeout_sec; + loop { + let response = mm + .rpc(&json!({ + "userpass": mm.userpass, + "method": "swap_info_from_stats_db", + "mmrpc": "2.0", + "params": { + "uuid": uuid, + } + })) + .await + .unwrap(); + + if response.0.is_success() { + break; + } + + if get_utc_timestamp() > wait_until { + panic!("Timed out waiting for swap {} info in stats db", uuid); + } + + Timer::sleep(1.).await; + } +} + pub async fn check_recent_swaps(mm: &MarketMakerIt, expected_len: usize) { let response = mm .rpc(&json!({