Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mm2src/mm2_main/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ fn migration_13() -> Vec<(&'static str, Vec<String>)> {
]
}

fn migration_14() -> Vec<(&'static str, Vec<String>)> {
db_common::sqlite::execute_batch(stats_swaps::ADD_SWAP_VERSION_AND_MARKET_MARGIN)
}

async fn statements_for_migration(ctx: &MmArc, current_migration: i64) -> Option<Vec<(&'static str, Vec<String>)>> {
match current_migration {
1 => Some(migration_1(ctx).await),
Expand All @@ -147,6 +151,7 @@ async fn statements_for_migration(ctx: &MmArc, current_migration: i64) -> Option
11 => Some(migration_11()),
12 => Some(migration_12()),
13 => Some(migration_13()),
14 => Some(migration_14()),
_ => None,
}
}
Expand Down
154 changes: 153 additions & 1 deletion mm2src/mm2_main/src/database/stats_swaps.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(deprecated)] // TODO: remove this once rusqlite is >= 0.29

use crate::lp_swap::{MakerSavedSwap, SavedSwap, SavedSwapIo, TakerSavedSwap};
use crate::lp_swap::{MakerSavedSwap, SavedSwap, SavedSwapIo, TPUSwapStatusForStats, TakerSavedSwap};
use common::log::{debug, error};
use db_common::{
owned_named_params,
Expand Down Expand Up @@ -100,6 +100,13 @@ pub const ADD_MAKER_TAKER_GUI_AND_VERSION: &[&str] = &[

pub const SELECT_ID_BY_UUID: &str = "SELECT id FROM stats_swaps WHERE uuid = ?1";

pub const ADD_SWAP_VERSION_AND_MARKET_MARGIN: &[&str] = &[
"ALTER TABLE stats_swaps ADD COLUMN swap_version INTEGER NOT NULL DEFAULT 0;",
"ALTER TABLE stats_swaps ADD COLUMN market_margin DECIMAL;",
// NULL = unknown, 0 = not a bot, 1 = bot
"ALTER TABLE stats_swaps ADD COLUMN is_maker_bot INTEGER;",
Comment on lines +101 to +102

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if making it maker_type with backing it Rust enum would be any good. 🤔 Just for the extensibility.

So it would look something like this:

enum MakerType {
  Unknown,
  WebUser,
  DesktopUser,
  Bot,
  .
  . 
  .
}

and can be extended to anything without changing things on the DB side.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if making it maker_type with backing it Rust enum would be any good.

like the idea. we can have a new issue about that to harvest such data (cc/ @shamardy)

we can ofc do a migration to change the table to the appropriate format later.

];

/// Returns SQL statements to initially fill stats_swaps table using existing DB with JSON files
pub async fn create_and_fill_stats_swaps_from_json_statements(ctx: &MmArc) -> Vec<(&'static str, Vec<String>)> {
let maker_swaps = SavedSwap::load_all_from_maker_stats_db(ctx).await.unwrap_or_default();
Expand Down Expand Up @@ -369,6 +376,151 @@ pub fn add_swap_to_index(conn: &Connection, swap: &SavedSwap) {
execute_query_with_params(conn, sql, params);
}

/// Constructs the update query for optional fields of the TPU swap status struct. That is, any field that is wrapped in an Option<T>.
fn update_optional_info(swap: &TPUSwapStatusForStats) -> (String, OwnedSqlNamedParams) {
let mut extra_args = String::new();
let mut params = OwnedSqlNamedParams::new();

if let Some(ref maker_pubkey) = swap.maker_swap_pubkey {
extra_args.push_str(", maker_pubkey = :maker_pubkey");
params.push((":maker_pubkey", maker_pubkey.clone().into()));
}
if let Some(ref taker_pubkey) = swap.taker_swap_pubkey {
extra_args.push_str(", taker_pubkey = :taker_pubkey");
params.push((":taker_pubkey", taker_pubkey.clone().into()));
}
if let Some(ref maker_coin_usd_price) = swap.maker_coin_usd_price {
extra_args.push_str(", maker_coin_usd_price = :maker_coin_usd_price");
params.push((":maker_coin_usd_price", maker_coin_usd_price.to_string().into()));
}
if let Some(ref taker_coin_usd_price) = swap.taker_coin_usd_price {
extra_args.push_str(", taker_coin_usd_price = :taker_coin_usd_price");
params.push((":taker_coin_usd_price", taker_coin_usd_price.to_string().into()));
}
if let Some(ref market_margin) = swap.market_margin {
extra_args.push_str(", market_margin = :market_margin");
params.push((":market_margin", market_margin.to_string().into()));
}
if let Some(is_maker_bot) = swap.is_maker_bot {
extra_args.push_str(", is_maker_bot = :is_maker_bot");
params.push((":is_maker_bot", (is_maker_bot as u32).into()));
}
if let Some(ref maker_gui) = swap.maker_gui {
extra_args.push_str(", maker_gui = :maker_gui");
params.push((":maker_gui", maker_gui.clone().into()));
}
if let Some(ref maker_version) = swap.maker_version {
extra_args.push_str(", maker_version = :maker_version");
params.push((":maker_version", maker_version.clone().into()));
}
if let Some(ref taker_gui) = swap.taker_gui {
extra_args.push_str(", taker_gui = :taker_gui");
params.push((":taker_gui", taker_gui.clone().into()));
}
if let Some(ref taker_version) = swap.taker_version {
extra_args.push_str(", taker_version = :taker_version");
params.push((":taker_version", taker_version.clone().into()));
}

let update_query = format!("UPDATE stats_swaps set swap_version = :swap_version{extra_args} WHERE uuid = :uuid;",);

// Swap version is actually mandatory and will always be set.
params.push((":uuid", swap.uuid.to_string().into()));
params.push((":swap_version", (swap.swap_version as u32).into()));

(update_query, params)
}

/// Adds a TPU swap to stats_swap table if it's not present and updates the swap with the new data if it already exists.
pub fn add_v2swap_to_index(conn: &Connection, swap: &TPUSwapStatusForStats) -> Result<(), String> {
let params = vec![swap.uuid.to_string()];
let query_row = conn.query_row(SELECT_ID_BY_UUID, params_from_iter(params.iter()), |row| {
row.get::<_, i64>(0)
});
match query_row.optional() {
// swap is not indexed yet, insert it into the DB
Ok(None) => {
let (maker_coin_ticker, maker_coin_platform) = split_coin(&swap.maker_coin);
let (taker_coin_ticker, taker_coin_platform) = split_coin(&swap.taker_coin);

let params = owned_named_params! {
":maker_coin": swap.maker_coin.clone(),
":maker_coin_ticker": maker_coin_ticker,
":maker_coin_platform": maker_coin_platform,
":taker_coin": swap.taker_coin.clone(),
":taker_coin_ticker": taker_coin_ticker,
":taker_coin_platform": taker_coin_platform,
":uuid": swap.uuid.to_string(),
":started_at": swap.started_at.to_string(),
":finished_at": swap.finished_at.to_string(),
":maker_amount": swap.maker_amount.to_string(),
":taker_amount": swap.taker_amount.to_string(),
":is_success": (swap.is_success() as u32).to_string(),
// We will set those in the optional info update. For now setting them to NULL to conform with `INSERT_STATS_SWAP` statement.
":maker_coin_usd_price": None::<String>,
":taker_coin_usd_price": None::<String>,
":maker_pubkey": None::<String>,
":taker_pubkey": None::<String>,
};

execute_query_with_params(conn, INSERT_STATS_SWAP, params);
},
// swap is already indexed. Only update missing fields.
Ok(Some(_)) => (),
Err(e) => {
let err_msg = format!("Error {} on query {} with params {:?}", e, SELECT_ID_BY_UUID, params);
error!("{}", err_msg);
return Err(err_msg);
},
};

let (sql, params) = update_optional_info(swap);

execute_query_with_params(conn, &sql, params);

Ok(())
}

#[derive(Debug, Serialize)]
pub struct SwapStatusFromDB {
pub maker_coin: String,
pub taker_coin: String,
pub started_at: u64,
pub finished_at: u64,
pub maker_amount: f64,
pub taker_amount: f64,
pub is_success: bool,
}

pub fn fetch_swap_status(conn: &Connection, uuid: &str) -> Result<Option<SwapStatusFromDB>, String> {
const SELECT_SWAP_STATUS_BY_UUID: &str = "SELECT maker_coin, taker_coin, started_at, finished_at, maker_amount, taker_amount, is_success FROM stats_swaps WHERE uuid = ?1";

let params = vec![uuid.to_string()];
let query_row = conn.query_row(SELECT_SWAP_STATUS_BY_UUID, params_from_iter(params.iter()), |row| {
Ok(SwapStatusFromDB {
maker_coin: row.get(0)?,
taker_coin: row.get(1)?,
started_at: row.get(2)?,
finished_at: row.get(3)?,
maker_amount: row.get(4)?,
taker_amount: row.get(5)?,
is_success: row.get::<_, u32>(6)? != 0,
})
});
match query_row.optional() {
Ok(Some(swap_status)) => Ok(Some(swap_status)),
Ok(None) => Ok(None),
Err(e) => {
let err_msg = format!(
"Error {} on query {} with params {:?}",
e, SELECT_SWAP_STATUS_BY_UUID, params
);
error!("{}", err_msg);
Err(err_msg)
},
}
}

#[test]
fn test_split_coin() {
let input = "";
Expand Down
100 changes: 85 additions & 15 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ use pubkey_banning::BanReason;
pub use pubkey_banning::{ban_pubkey_rpc, is_pubkey_banned, list_banned_pubkeys_rpc, unban_pubkeys_rpc};
pub use recreate_swap_data::recreate_swap_data;
pub use saved_swap::{SavedSwap, SavedSwapError, SavedSwapIo, SavedSwapResult};
pub use swap_v2_common::TPUSwapStatusForStats;
use swap_v2_common::{
get_unfinished_swaps_uuids, swap_kickstart_handler_for_maker, swap_kickstart_handler_for_taker, ActiveSwapV2Info,
};
Expand Down Expand Up @@ -363,12 +364,22 @@ pub async fn process_swap_msg(ctx: MmArc, topic: &str, msg: &[u8]) -> P2PRequest
Err(swap_msg_err) => {
#[cfg(not(target_arch = "wasm32"))]
return match json::from_slice::<SwapStatus>(msg) {
Ok(mut status) => {
status.data.fetch_and_set_usd_prices().await;
if let Err(e) = save_stats_swap(&ctx, &status.data).await {
error!("Error saving the swap {} status: {}", status.data.uuid(), e);
}
Ok(())
Ok(SwapStatus { data, .. }) => match data {
SwapStatusData::Legacy(mut status) => {
status.fetch_and_set_usd_prices().await;
let uuid = *status.uuid();
if let Err(e) = save_stats_swap(&ctx, *status).await {
error!("Error saving the swap {} status: {}", uuid, e);
}
Ok(())
},
SwapStatusData::Tpu(status) => {
let uuid = status.uuid;
if let Err(e) = add_v2swap_to_stats_db_index(&ctx, *status).await {
error!("Error saving the swap {} status: {}", uuid, e);
}
Ok(())
},
},
Err(swap_status_err) => {
let error = format!(
Expand Down Expand Up @@ -1012,16 +1023,33 @@ pub async fn insert_new_swap_to_db(
}

#[cfg(not(target_arch = "wasm32"))]
fn add_swap_to_db_index(ctx: &MmArc, swap: &SavedSwap) {
if let Some(conn) = ctx.sqlite_conn_opt() {
crate::database::stats_swaps::add_swap_to_index(&conn, swap)
}
pub async fn add_v2swap_to_stats_db_index(ctx: &MmArc, swap: TPUSwapStatusForStats) -> Result<(), String> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have _index in add_v2swap_to_stats_db_index and add_v2swap_to_index fn names?
I think this mattered for the legacy swaps as there is indeed an index over json files with stat data.

For TPU we have only a db, no jsons, so could we name them, like: save_swap_v2_stat and add_swap_v2_stat_to_db?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was more or less following the names we have for legacy swaps. that being said, i think of index as tables/sqls.

db used to mean a sqldb to me, but it's used in a different way in our codebase. we treat json files as db (file/directory DB basically), see for example load_from_maker_stats_db.

that's why i find the _index distinction helpful in this case as to denote that such db storage is a table format rather than file dump. but i can remove it too if u find it way off.

let ctx = ctx.clone();
common::async_blocking(move || {
if let Some(conn) = ctx.sqlite_conn_opt() {
crate::database::stats_swaps::add_v2swap_to_index(&conn, &swap)
Comment on lines +1028 to +1030

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not making add_v2swap_to_index and spawning the task inside of it? If it's a heavy blocking task, then it should do this in a self-contained way as the caller may not always remember to wrap it with a spawning task.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crate::database::stats_swaps::add_v2swap_to_index is only used by one method which is lp_swap::add_v2swap_to_stats_db_index (the latter is a simple wrapper around the former)

same for crate::database::stats_swaps::add_swap_to_index and lp_swap::add_swap_to_db_index

so having the async_blocking at either level is the doable. i went for adding the async_blocking to the top level as the db-level functions take &Connection which isn't gonna play nice with the requirements of async_blocking (we can ofc pass the whole MmArc to the db-level functions but i think them taking a &Connection is more intuitive).

also other db methods follow the same sync interface, so that aligns with them (except our &AsyncConnection-based code)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think other db functions should do the same but I don't want to block this PR with other tech debts. It's fine if you don't want to do it that way.

} else {
ERR!("No SQLite connection available")
}
})
.await
}

#[cfg(not(target_arch = "wasm32"))]
async fn add_swap_to_db_index(ctx: &MmArc, swap: SavedSwap) {
let ctx = ctx.clone();
common::async_blocking(move || {
if let Some(conn) = ctx.sqlite_conn_opt() {
crate::database::stats_swaps::add_swap_to_index(&conn, &swap)
Comment on lines +1041 to +1043

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}
})
.await
}

#[cfg(not(target_arch = "wasm32"))]
async fn save_stats_swap(ctx: &MmArc, swap: &SavedSwap) -> Result<(), String> {
async fn save_stats_swap(ctx: &MmArc, swap: SavedSwap) -> Result<(), String> {
try_s!(swap.save_to_stats_db(ctx).await);
add_swap_to_db_index(ctx, swap);
add_swap_to_db_index(ctx, swap).await;
Ok(())
}

Expand Down Expand Up @@ -1168,10 +1196,17 @@ pub async fn stats_swap_status(ctx: MmArc, req: Json) -> Result<Response<Vec<u8>
Ok(try_s!(Response::builder().body(res)))
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
enum SwapStatusData {
Legacy(Box<SavedSwap>),
Tpu(Box<TPUSwapStatusForStats>),
}

#[derive(Debug, Deserialize, Serialize)]
struct SwapStatus {
method: String,
data: SavedSwap,
data: SwapStatusData,
}

/// Broadcasts `my` swap status to P2P network
Expand All @@ -1182,15 +1217,50 @@ async fn broadcast_my_swap_status(ctx: &MmArc, uuid: Uuid) -> Result<(), String>
};
status.hide_secrets();

let status = SwapStatus {
method: "swapstatus".into(),
data: SwapStatusData::Legacy(Box::new(status)),
};
let msg = json::to_vec(&status).expect("Swap status ser should never fail");

#[cfg(not(target_arch = "wasm32"))]
try_s!(save_stats_swap(ctx, &status).await);
{
// Extract status back. We are avoiding cloning it since it's a big object.
let SwapStatusData::Legacy(status) = status.data else {
unreachable!("We are sure that status is Legacy variant");
};

try_s!(save_stats_swap(ctx, *status).await);
}

broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None);
Ok(())
}

async fn broadcast_my_v2swap_status(ctx: &MmArc, status: TPUSwapStatusForStats) -> Result<(), String> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this function not be called save_and_broadcast_my_v2swap_status?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm save is done only for native, maybe no need to rename then

let uuid = status.uuid;

// Serialize the status to prepare for broadcasting
let status = SwapStatus {
method: "swapstatus".into(),
data: status,
data: SwapStatusData::Tpu(Box::new(status)),
};
let msg = json::to_vec(&status).expect("Swap status ser should never fail");

#[cfg(not(target_arch = "wasm32"))]
{
// Extract status back. We are avoiding cloning it since it's a big object.
let SwapStatusData::Tpu(status) = status.data else {
unreachable!("We are sure that status is Tpu variant");
};

// Add the status to our own stats db index
try_s!(add_v2swap_to_stats_db_index(ctx, *status).await);
}

// Broadcast the status to the P2P network
broadcast_p2p_msg(ctx, swap_topic(&uuid), msg, None);

Ok(())
}

Expand Down
Loading
Loading