Skip to content

Commit

Permalink
(rpc_server): Extend metrics (#272)
Browse files Browse the repository at this point in the history
* extend and add additional metrics

* improvment accordinc pr comments
  • Loading branch information
kobayurii committed Jun 7, 2024
1 parent 60d0262 commit b8f87be
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 167 deletions.
9 changes: 6 additions & 3 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,26 @@ pub trait ReaderDbManager {
) -> anyhow::Result<readnode_primitives::ReceiptRecord>;

/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
/// TODO: rewrite this method to return a Result like a struct with block_height and transaction_details instead of a tuple
async fn get_transaction_by_hash(
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)>;
/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
/// TODO: rewrite this method to return a Result like a struct with block_height and transaction_details instead of a tuple
async fn get_indexed_transaction_by_hash(
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)>;
/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
/// TODO: rewrite this method to return a Result like a struct with block_height and transaction_details instead of a tuple
async fn get_indexing_transaction_by_hash(
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)>;

/// Returns the block height and shard id by the given block height
async fn get_block_by_height_and_shard_id(
Expand Down
4 changes: 2 additions & 2 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,14 @@ impl TransactionDetail {
pub async fn get_transaction_by_hash(
mut conn: crate::postgres::PgAsyncConn,
transaction_hash: &str,
) -> anyhow::Result<Vec<u8>> {
) -> anyhow::Result<(bigdecimal::BigDecimal, Vec<u8>)> {
let response = transaction_detail::table
.filter(transaction_detail::transaction_hash.eq(transaction_hash))
.select(Self::as_select())
.first(&mut conn)
.await?;

Ok(response.transaction_details)
Ok((response.block_height, response.transaction_details))
}
}

Expand Down
26 changes: 16 additions & 10 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl crate::ReaderDbManager for PostgresDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
if let Ok(transaction) = self
.get_indexed_transaction_by_hash(transaction_hash, method_name)
.await
Expand All @@ -302,20 +302,26 @@ impl crate::ReaderDbManager for PostgresDBManager {
&self,
transaction_hash: &str,
_method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
let transaction_data = crate::models::TransactionDetail::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
transaction_hash,
)
.await?;
Ok(borsh::from_slice::<readnode_primitives::TransactionDetails>(&transaction_data)?)
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
let (block_height, transaction_data) =
crate::models::TransactionDetail::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
transaction_hash,
)
.await?;
Ok((
block_height
.to_u64()
.expect("Failed to parse `block_height` to u64"),
borsh::from_slice::<readnode_primitives::TransactionDetails>(&transaction_data)?,
))
}

async fn get_indexing_transaction_by_hash(
&self,
transaction_hash: &str,
_method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
let data_value = crate::models::TransactionCache::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
transaction_hash,
Expand All @@ -342,7 +348,7 @@ impl crate::ReaderDbManager for PostgresDBManager {
.push(execution_outcome)
}

Ok(transaction_details.into())
Ok((transaction_details.block_height, transaction_details.into()))
}

async fn get_block_by_height_and_shard_id(
Expand Down
16 changes: 8 additions & 8 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl ScyllaStorageManager for ScyllaDBManager {
// ref: https://github.com/near/near-indexer-for-explorer/issues/84
get_transaction_by_hash: Self::prepare_read_query(
&scylla_db_session,
"SELECT transaction_details FROM tx_indexer.transactions_details WHERE transaction_hash = ? LIMIT 1",
"SELECT block_height, transaction_details FROM tx_indexer.transactions_details WHERE transaction_hash = ? LIMIT 1",
).await?,
get_indexing_transaction_by_hash: Self::prepare_read_query(
&scylla_db_session,
Expand Down Expand Up @@ -485,7 +485,7 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
if let Ok(transaction) = self
.get_indexed_transaction_by_hash(transaction_hash, method_name)
.await
Expand All @@ -503,20 +503,20 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
crate::metrics::DATABASE_READ_QUERIES
.with_label_values(&[method_name, "tx_indexer.transactions_details"])
.inc();
let (data_value,) = Self::execute_prepared_query(
let (block_height, data_value) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_transaction_by_hash,
(transaction_hash.to_string(),),
)
.await?
.single_row()?
.into_typed::<(Vec<u8>,)>()?;
.into_typed::<(num_bigint::BigInt, Vec<u8>)>()?;
match borsh::from_slice::<readnode_primitives::TransactionDetails>(&data_value) {
Ok(tx) => Ok(tx),
Ok(tx) => Ok((u64::try_from(block_height)?, tx)),
Err(e) => {
tracing::warn!(
"Failed tx_details borsh deserialize: TX_HASH - {}, ERROR: {:?}",
Expand All @@ -534,7 +534,7 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
crate::metrics::DATABASE_READ_QUERIES
.with_label_values(&[method_name, "tx_indexer_cache.transactions"])
.inc();
Expand Down Expand Up @@ -574,7 +574,7 @@ impl crate::ReaderDbManager for ScyllaDBManager {
.push(execution_outcome);
}

Ok(transaction_details.into())
Ok((transaction_details.block_height, transaction_details.into()))
}

/// Returns the block height and shard id by the given block height
Expand Down
1 change: 1 addition & 0 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl GenesisInfo {
let genesis_config = near_rpc_client
.call(
near_jsonrpc_client::methods::EXPERIMENTAL_genesis_config::RpcGenesisConfigRequest,
None,
)
.await
.expect("Error to get genesis config");
Expand Down
50 changes: 31 additions & 19 deletions rpc-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ lazy_static! {
&["method_name"] // This declares a label named `method name`
).unwrap();

pub(crate) static ref REQUESTS_COUNTER: IntCounterVec = register_int_counter_vec(
"requests_counter",
"Total number of requests",
&["request_type"] // This declares a label named `request_type`
pub(crate) static ref TOTAL_REQUESTS_COUNTER: IntCounterVec = register_int_counter_vec(
"total_requests_counter",
"Total number of method requests by type",
&["method_name", "request_type"] // This declares a label named `method_name` and `request_type`
).unwrap();

pub(crate) static ref OPTIMISTIC_STATUS: IntGauge = try_create_int_gauge(
Expand Down Expand Up @@ -128,45 +128,57 @@ lazy_static! {
pub async fn increase_request_category_metrics(
data: &jsonrpc_v2::Data<crate::config::ServerContext>,
block_reference: &near_primitives::types::BlockReference,
method_name: &str,
block_height: Option<u64>,
) {
match block_reference {
near_primitives::types::BlockReference::BlockId(_) => {
let final_block = data.blocks_info_by_finality.final_cache_block().await;
let expected_earliest_available_block =
final_block.block_height - 5 * data.genesis_info.genesis_config.epoch_length;
if block_height.unwrap_or_default() > expected_earliest_available_block {
// By default, all requests should be historical, therefore
// if block_height is None we use `genesis.block_height` by default
if block_height.unwrap_or(data.genesis_info.genesis_block_cache.block_height)
> expected_earliest_available_block
{
// This is request to regular nodes which includes 5 last epochs
REQUESTS_COUNTER.with_label_values(&["regular"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "regular"])
.inc();
} else {
// This is a request to archival nodes which include blocks from genesis (later than 5 epochs ago)
REQUESTS_COUNTER.with_label_values(&["historical"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "historical"])
.inc();
}
}
near_primitives::types::BlockReference::Finality(finality) => {
// All Finality is requests to regular nodes which includes 5 last epochs
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "regular"])
.inc();
match finality {
// Increase the REQUESTS_COUNTER `final` metric
// Increase the TOTAL_REQUESTS_COUNTER `final` metric
// if the request has final finality
near_primitives::types::Finality::DoomSlug
| near_primitives::types::Finality::Final => {
REQUESTS_COUNTER.with_label_values(&["final"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "final"])
.inc();
}
// Increase the REQUESTS_COUNTER `optimistic` metric
// Increase the TOTAL_REQUESTS_COUNTER `optimistic` metric
// if the request has optimistic finality
near_primitives::types::Finality::None => {
REQUESTS_COUNTER.with_label_values(&["optimistic"]).inc();
// Increase the REQUESTS_COUNTER `proxy_optimistic` metric
// if the optimistic is not updating and proxy to native JSON-RPC
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
REQUESTS_COUNTER
.with_label_values(&["proxy_optimistic"])
.inc();
}
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "optimistic"])
.inc();
}
}
}
near_primitives::types::BlockReference::SyncCheckpoint(_) => {
REQUESTS_COUNTER.with_label_values(&["historical"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "historical"])
.inc();
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions rpc-server/src/middlewares.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::metrics::{METHOD_CALLS_COUNTER, REQUESTS_COUNTER};
use crate::metrics::METHOD_CALLS_COUNTER;
use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform};
use futures::future::LocalBoxFuture;
use futures::StreamExt;
Expand Down Expand Up @@ -53,8 +53,6 @@ where
return service_clone.call(request).await;
}

REQUESTS_COUNTER.with_label_values(&["total"]).inc();

let (req, mut payload) = request.into_parts();
let mut body = actix_web::web::BytesMut::new();
while let Some(chunk) = &payload.next().await {
Expand Down
63 changes: 23 additions & 40 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ pub async fn block(
) = &block_request.block_reference
{
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
// increase metrics before proxy request
crate::metrics::increase_request_category_metrics(
&data,
&block_request.block_reference,
None,
)
.await;
// Proxy if the optimistic updating is not working
let block_view = data.near_rpc_client.call(block_request).await?;
let block_view = data
.near_rpc_client
.call(block_request, Some("optimistic"))
.await?;
return Ok(near_jsonrpc::primitives::types::blocks::RpcBlockResponse { block_view });
}
};
Expand Down Expand Up @@ -81,15 +77,11 @@ pub async fn changes_in_block_by_type(
) = &changes_in_block_request.block_reference
{
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
// increase metrics before proxy request
crate::metrics::increase_request_category_metrics(
&data,
&changes_in_block_request.block_reference,
None,
)
.await;
// Proxy if the optimistic updating is not working
return Ok(data.near_rpc_client.call(changes_in_block_request).await?);
return Ok(data
.near_rpc_client
.call(changes_in_block_request, Some("optimistic"))
.await?);
}
};

Expand All @@ -114,15 +106,11 @@ pub async fn changes_in_block(
) = &changes_in_block_request.block_reference
{
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
// increase metrics before proxy request
crate::metrics::increase_request_category_metrics(
&data,
&changes_in_block_request.block_reference,
None,
)
.await;
// Proxy if the optimistic updating is not working
return Ok(data.near_rpc_client.call(changes_in_block_request).await?);
return Ok(data
.near_rpc_client
.call(changes_in_block_request, Some("optimistic"))
.await?);
}
};

Expand All @@ -143,6 +131,7 @@ async fn block_call(
crate::metrics::increase_request_category_metrics(
&data,
&block_request.block_reference,
"block",
Some(block.block_view.header.height),
)
.await;
Expand Down Expand Up @@ -192,14 +181,6 @@ async fn changes_in_block_call(
.await
.map_err(near_jsonrpc::primitives::errors::RpcError::from)?;

// increase block category metrics
crate::metrics::increase_request_category_metrics(
&data,
&params.block_reference,
Some(cache_block.block_height),
)
.await;

let result = fetch_changes_in_block(&data, cache_block, &params.block_reference).await;
#[cfg(feature = "shadow_data_consistency")]
{
Expand Down Expand Up @@ -233,14 +214,6 @@ async fn changes_in_block_by_type_call(
.await
.map_err(near_jsonrpc::primitives::errors::RpcError::from)?;

// increase block category metrics
crate::metrics::increase_request_category_metrics(
&data,
&params.block_reference,
Some(cache_block.block_height),
)
.await;

let result = fetch_changes_in_block_by_type(
&data,
cache_block,
Expand Down Expand Up @@ -402,6 +375,16 @@ pub async fn fetch_chunk(
shard_id,
)
.await?;
// increase block category metrics
crate::metrics::increase_request_category_metrics(
data,
&near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height(
block_height,
)),
"chunk",
Some(block_height),
)
.await;

Ok(near_jsonrpc::primitives::types::chunks::RpcChunkResponse { chunk_view })
}
Expand Down
Loading

0 comments on commit b8f87be

Please sign in to comment.