Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: Implement rolling withdrawal window in Emily #1528

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
84 changes: 83 additions & 1 deletion emily/handler/src/api/handlers/chainstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
api::{
handlers::internal::{execute_reorg_handler, ExecuteReorgRequest},
models::chainstate::Chainstate,
models::chainstate::{Chainstate, HeightsMapping, UpdateBitcoinChaintip},
},
common::error::{Error, Inconsistency},
context::EmilyContext,
Expand Down Expand Up @@ -169,6 +169,88 @@ pub async fn update_chainstate(
.map_or_else(Reply::into_response, Reply::into_response)
}

/// Update bitcoin chainstate handler.
#[utoipa::path(
put,
operation_id = "updateBitcoinChainstate",
path = "/bitcoinChainstate",
tag = "bitcoinChainstate",
request_body = UpdateBitcoinChaintip,
responses(
(status = 201, description = "Bitcoin chainstate updated successfully", body = ()),
(status = 400, description = "Invalid request body", body = ErrorResponse),
(status = 404, description = "Address not found", body = ErrorResponse),
(status = 405, description = "Method not allowed", body = ErrorResponse),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
security(("ApiGatewayKey" = []))
)]
#[instrument(skip(context, api_key))]
pub async fn update_bitcoin_chain_tip(
context: EmilyContext,
api_key: String,
request: UpdateBitcoinChaintip,
) -> impl warp::reply::Reply {
debug!("Attempting to update bitcoin chainstate: {request:?}");
// Internal handler so `?` can be used correctly while still returning a reply.
async fn handler(
context: EmilyContext,
_api_key: String,
body: UpdateBitcoinChaintip,
) -> Result<impl warp::reply::Reply, Error> {
// Convert body to the correct type.
let new_tip: UpdateBitcoinChaintip = body;
accessors::set_new_bitcoin_chain_tip(&context, new_tip).await?;
// Respond.
Ok(with_status(json(&()), StatusCode::CREATED))
}
// Handle and respond.
handler(context, api_key, request)
.await
.map_or_else(Reply::into_response, Reply::into_response)
}

/// Update bitcoin chainstate handler.
#[utoipa::path(
put,
operation_id = "updateHeightsMapping",
path = "/heightsMapping",
tag = "heightsMapping",
request_body = HeightsMapping,
responses(
(status = 201, description = "Heights mapping updated successfully", body = ()),
(status = 400, description = "Invalid request body", body = ErrorResponse),
(status = 404, description = "Address not found", body = ErrorResponse),
(status = 405, description = "Method not allowed", body = ErrorResponse),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
security(("ApiGatewayKey" = []))
)]
#[instrument(skip(context, api_key))]
pub async fn update_heights_mapping(
context: EmilyContext,
api_key: String,
request: HeightsMapping,
) -> impl warp::reply::Reply {
debug!("Attempting to update heights mapping: {request:?}");
// Internal handler so `?` can be used correctly while still returning a reply.
async fn handler(
context: EmilyContext,
_api_key: String,
body: HeightsMapping,
) -> Result<impl warp::reply::Reply, Error> {
// Convert body to the correct type.
let new_mapping: HeightsMapping = body;
accessors::update_heights_mapping(&context, new_mapping).await?;
// Respond.
Ok(with_status(json(&()), StatusCode::CREATED))
}
// Handle and respond.
handler(context, api_key, request)
.await
.map_or_else(Reply::into_response, Reply::into_response)
}

/// Adds the chainstate to the table, and reorganizes the API if there's a
/// conflict that suggests it needs a reorg in order for this entry to be
/// consistent.
Expand Down
25 changes: 25 additions & 0 deletions emily/handler/src/api/handlers/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,31 @@ pub async fn get_limits(context: EmilyContext) -> impl warp::reply::Reply {
.map_or_else(Reply::into_response, Reply::into_response)
}

/// Get the global limits.
#[utoipa::path(
get,
operation_id = "getTotalWithdrawed",
path = "/withdrawed",
tag = "limits",
responses(
(status = 200, description = "TotalWithdrawed retrieved successfully", body = TotalWithdrawedAmount),
(status = 500, description = "Internal server error", body = ErrorResponse)
),
)]
#[instrument(skip(context))]
pub async fn get_withdrawed(context: EmilyContext) -> impl warp::reply::Reply {
// Internal handler so `?` can be used correctly while still returning a reply.
async fn handler(context: EmilyContext) -> Result<impl warp::reply::Reply, Error> {
let total_withdrawed =
accessors::get_total_withdrawed_amount_in_rolling_window(&context).await?;
Ok(with_status(json(&total_withdrawed), StatusCode::OK))
}
// Handle and respond.
handler(context)
.await
.map_or_else(Reply::into_response, Reply::into_response)
}

/// Get limits handler.
#[utoipa::path(
post,
Expand Down
19 changes: 19 additions & 0 deletions emily/handler/src/api/models/chainstate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Request structures for chainstate api calls.

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use utoipa::{ToResponse, ToSchema};

/// Chainstate.
Expand All @@ -25,3 +26,21 @@ pub struct Chainstate {
/// Stacks block hash at the height.
pub stacks_block_hash: String,
}

/// Update heights mapping request body
#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema, ToResponse)]
#[serde(rename_all = "camelCase")]
pub struct HeightsMapping {
/// An update mapping, update all entries or add new ones
pub mapping: HashMap<u64, u64>,
}

/// Update bitcoin chaintip request body
#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema, ToResponse)]
#[serde(rename_all = "camelCase")]
pub struct UpdateBitcoinChaintip {
/// A new bitcoin chaintip height
pub height: u64,
/// A new bitcoin chaintip hash
pub hash: String,
}
12 changes: 12 additions & 0 deletions emily/handler/src/api/models/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,15 @@ pub struct AccountLimits {
/// Maximum total sBTC that can be withdrawn within the rolling withdrawal window.
pub rolling_withdrawal_cap: Option<u64>,
}

/// Contains info about total withdrawed amount in rolling window
#[derive(Clone, Default, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema, ToResponse)]
#[serde(rename_all = "camelCase")]
pub struct TotalWithdrawedAmount {
/// Total withdrawed amount in rolling window
pub total_withdrawed_amount: Option<u64>,
/// Stacks chain tip at the moment of calculation (begin of rolling window)
pub stacks_chain_tip: Option<u64>,
/// Last block in rolling window.
pub last_stacks_block_in_rolling_window: Option<u64>,
}
14 changes: 14 additions & 0 deletions emily/handler/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ use crate::common::error::Error;
pub struct Settings {
/// Whether the Emily lambda is running locally.
pub is_local: bool,
/// HeightsMapping table name.
pub heights_mapping_table_name: String,
/// Bitcoin chainstate table name.
pub bitcoin_chainstate_table_name: String,
/// Deposit table name.
pub deposit_table_name: String,
/// Withdrawal table name.
Expand Down Expand Up @@ -93,6 +97,8 @@ impl Settings {
Ok(Settings {
is_local: env::var("IS_LOCAL")?.to_lowercase() == "true",
deposit_table_name: env::var("DEPOSIT_TABLE_NAME")?,
heights_mapping_table_name: env::var("HEIGHTS_MAPPING_TABLE_NAME")?,
bitcoin_chainstate_table_name: env::var("BITCOIN_CHAINSTATE_TABLE_NAME")?,
withdrawal_table_name: env::var("WITHDRAWAL_TABLE_NAME")?,
chainstate_table_name: env::var("CHAINSTATE_TABLE_NAME")?,
limit_table_name: env::var("LIMIT_TABLE_NAME")?,
Expand Down Expand Up @@ -194,6 +200,14 @@ impl EmilyContext {
Ok(EmilyContext {
settings: Settings {
is_local: true,
bitcoin_chainstate_table_name: table_name_map
.get("BitcoinChainstate")
.expect("Failed to find valid bitcoin chainstate table in existing table list.")
.to_string(),
heights_mapping_table_name: table_name_map
.get("HeightsMapping")
.expect("Failed to find valid heights mapping table in existing table list.")
.to_string(),
deposit_table_name: table_name_map
.get("Deposit")
.expect("Couldn't find valid deposit table in existing table list.")
Expand Down
113 changes: 111 additions & 2 deletions emily/handler/src/database/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use serde_dynamo::Item;

use tracing::{debug, warn};

use crate::api::models::limits::{AccountLimits, Limits};
use crate::api::models::chainstate::{HeightsMapping, UpdateBitcoinChaintip};
use crate::api::models::limits::{AccountLimits, Limits, TotalWithdrawedAmount};
use crate::common::error::{Error, Inconsistency};

use crate::{api::models::common::Status, context::EmilyContext};
Expand All @@ -26,7 +27,9 @@ use super::entries::withdrawal::{
};
use super::entries::{
chainstate::{
ApiStateEntry, ApiStatus, ChainstateEntry, ChainstateTablePrimaryIndex,
ApiStateEntry, ApiStatus, BitcoinChainstateEntry, BitcoinChainstateEntryKey,
BitcoinChainstateTablePrimaryIndex, ChainstateEntry, ChainstateTablePrimaryIndex,
HeightsMappingEntry, HeightsMappingEntryKey, HeightsMappingTablePrimaryIndex,
SpecialApiStateIndex,
},
deposit::{
Expand Down Expand Up @@ -765,6 +768,112 @@ pub async fn get_limits(context: &EmilyContext) -> Result<Limits, Error> {
})
}

async fn get_bitcoin_chain_tip_height(context: &EmilyContext) -> Result<u64, Error> {
let query_resp = query_with_partition_key::<BitcoinChainstateTablePrimaryIndex>(
context,
&"dummy".to_string(),
None,
None,
)
.await?
.0;
if query_resp.len() != 1 {
return Err(Error::NotFound);
}
let res = query_resp.first().unwrap().height;
Ok(res)
}

async fn get_first_stacks_block_for_ancor(
context: &EmilyContext,
ancor_height: u64,
) -> Result<u64, Error> {
let query_resp = query_with_partition_key::<HeightsMappingTablePrimaryIndex>(
context,
&ancor_height,
None,
None,
)
.await?
.0;
if query_resp.len() != 1 {
return Err(Error::NotFound);
}
let result = query_resp.first().unwrap();
Ok(result.first_ancored_stacks_height)
}

/// Returns total amount of withdrawn sBTC in rolling window. All withdrawals except failed are
/// counted here.
pub async fn get_total_withdrawed_amount_in_rolling_window(
context: &EmilyContext,
) -> Result<TotalWithdrawedAmount, Error> {
let rolling_window_size = context
.settings
.default_limits
.rolling_withdrawal_blocks
.unwrap_or(0);

let stacks_chain_tip = get_api_state(context).await?.chaintip().key.height;
let bitcoin_tip_height = get_bitcoin_chain_tip_height(context).await?;
let minimum_height = bitcoin_tip_height - rolling_window_size;
let minimum_stacks_height = get_first_stacks_block_for_ancor(context, minimum_height).await?;

let all_statuses_except_failed: Vec<_> = ALL_STATUSES
.iter()
.filter(|status| **status != Status::Failed)
.collect();

let mut withdrawals = vec![];
for status in all_statuses_except_failed {
let mut withdrawals_for_status =
get_all_withdrawal_entries_modified_from_height_with_status(
context,
status,
minimum_stacks_height,
None,
)
.await?;
withdrawals.append(&mut withdrawals_for_status);
}
let total_amounts: u64 = withdrawals.iter().map(|withdrawal| withdrawal.amount).sum();

Ok(TotalWithdrawedAmount {
total_withdrawed_amount: Some(total_amounts),
stacks_chain_tip: Some(stacks_chain_tip),
last_stacks_block_in_rolling_window: Some(minimum_stacks_height),
})
}

/// Updates bitcoin chain tip info
pub async fn set_new_bitcoin_chain_tip(
context: &EmilyContext,
new_tip: UpdateBitcoinChaintip,
) -> Result<(), Error> {
let entry = BitcoinChainstateEntry {
key: BitcoinChainstateEntryKey { dummy: "dummy".to_string() },
height: new_tip.height,
};
put_entry::<BitcoinChainstateTablePrimaryIndex>(context, &entry).await
}

/// Updates heights mapping
pub async fn update_heights_mapping(
context: &EmilyContext,
update: HeightsMapping,
) -> Result<(), Error> {
// TODO: remove mappings for old enough blocks
for (bitcoin_height, stacks_height) in update.mapping {
let entry = HeightsMappingEntry {
key: HeightsMappingEntryKey { bitcoin_height },
version: 1,
first_ancored_stacks_height: stacks_height,
};
put_entry::<HeightsMappingTablePrimaryIndex>(context, &entry).await?;
}
Ok(())
}

/// Get the limit for a specific account.
pub async fn get_limit_for_account(
context: &EmilyContext,
Expand Down
Loading
Loading