Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(rpc_server): Extend metrics (#272) #273

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,26 @@ pub trait ReaderDbManager {
) -> anyhow::Result<readnode_primitives::ReceiptRecord>;

/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
/// TODO: rewrite this method to return a Result like a struct with block_height and transaction_details instead of a tuple
async fn get_transaction_by_hash(
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)>;
/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
/// TODO: rewrite this method to return a Result like a struct with block_height and transaction_details instead of a tuple
async fn get_indexed_transaction_by_hash(
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)>;
/// Returns the readnode_primitives::TransactionDetails at the given transaction hash
/// TODO: rewrite this method to return a Result like a struct with block_height and transaction_details instead of a tuple
async fn get_indexing_transaction_by_hash(
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails>;
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)>;

/// Returns the block height and shard id by the given block height
async fn get_block_by_height_and_shard_id(
Expand Down
4 changes: 2 additions & 2 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,14 @@ impl TransactionDetail {
pub async fn get_transaction_by_hash(
mut conn: crate::postgres::PgAsyncConn,
transaction_hash: &str,
) -> anyhow::Result<Vec<u8>> {
) -> anyhow::Result<(bigdecimal::BigDecimal, Vec<u8>)> {
let response = transaction_detail::table
.filter(transaction_detail::transaction_hash.eq(transaction_hash))
.select(Self::as_select())
.first(&mut conn)
.await?;

Ok(response.transaction_details)
Ok((response.block_height, response.transaction_details))
}
}

Expand Down
26 changes: 16 additions & 10 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl crate::ReaderDbManager for PostgresDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
if let Ok(transaction) = self
.get_indexed_transaction_by_hash(transaction_hash, method_name)
.await
Expand All @@ -302,20 +302,26 @@ impl crate::ReaderDbManager for PostgresDBManager {
&self,
transaction_hash: &str,
_method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
let transaction_data = crate::models::TransactionDetail::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
transaction_hash,
)
.await?;
Ok(borsh::from_slice::<readnode_primitives::TransactionDetails>(&transaction_data)?)
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
let (block_height, transaction_data) =
crate::models::TransactionDetail::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
transaction_hash,
)
.await?;
Ok((
block_height
.to_u64()
.expect("Failed to parse `block_height` to u64"),
borsh::from_slice::<readnode_primitives::TransactionDetails>(&transaction_data)?,
))
}

async fn get_indexing_transaction_by_hash(
&self,
transaction_hash: &str,
_method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
let data_value = crate::models::TransactionCache::get_transaction_by_hash(
Self::get_connection(&self.pg_pool).await?,
transaction_hash,
Expand All @@ -342,7 +348,7 @@ impl crate::ReaderDbManager for PostgresDBManager {
.push(execution_outcome)
}

Ok(transaction_details.into())
Ok((transaction_details.block_height, transaction_details.into()))
}

async fn get_block_by_height_and_shard_id(
Expand Down
16 changes: 8 additions & 8 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl ScyllaStorageManager for ScyllaDBManager {
// ref: https://github.com/near/near-indexer-for-explorer/issues/84
get_transaction_by_hash: Self::prepare_read_query(
&scylla_db_session,
"SELECT transaction_details FROM tx_indexer.transactions_details WHERE transaction_hash = ? LIMIT 1",
"SELECT block_height, transaction_details FROM tx_indexer.transactions_details WHERE transaction_hash = ? LIMIT 1",
).await?,
get_indexing_transaction_by_hash: Self::prepare_read_query(
&scylla_db_session,
Expand Down Expand Up @@ -485,7 +485,7 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
if let Ok(transaction) = self
.get_indexed_transaction_by_hash(transaction_hash, method_name)
.await
Expand All @@ -503,20 +503,20 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
crate::metrics::DATABASE_READ_QUERIES
.with_label_values(&[method_name, "tx_indexer.transactions_details"])
.inc();
let (data_value,) = Self::execute_prepared_query(
let (block_height, data_value) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_transaction_by_hash,
(transaction_hash.to_string(),),
)
.await?
.single_row()?
.into_typed::<(Vec<u8>,)>()?;
.into_typed::<(num_bigint::BigInt, Vec<u8>)>()?;
match borsh::from_slice::<readnode_primitives::TransactionDetails>(&data_value) {
Ok(tx) => Ok(tx),
Ok(tx) => Ok((u64::try_from(block_height)?, tx)),
Err(e) => {
tracing::warn!(
"Failed tx_details borsh deserialize: TX_HASH - {}, ERROR: {:?}",
Expand All @@ -534,7 +534,7 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
transaction_hash: &str,
method_name: &str,
) -> anyhow::Result<readnode_primitives::TransactionDetails> {
) -> anyhow::Result<(u64, readnode_primitives::TransactionDetails)> {
crate::metrics::DATABASE_READ_QUERIES
.with_label_values(&[method_name, "tx_indexer_cache.transactions"])
.inc();
Expand Down Expand Up @@ -574,7 +574,7 @@ impl crate::ReaderDbManager for ScyllaDBManager {
.push(execution_outcome);
}

Ok(transaction_details.into())
Ok((transaction_details.block_height, transaction_details.into()))
}

/// Returns the block height and shard id by the given block height
Expand Down
1 change: 1 addition & 0 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl GenesisInfo {
let genesis_config = near_rpc_client
.call(
near_jsonrpc_client::methods::EXPERIMENTAL_genesis_config::RpcGenesisConfigRequest,
None,
)
.await
.expect("Error to get genesis config");
Expand Down
50 changes: 31 additions & 19 deletions rpc-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ lazy_static! {
&["method_name"] // This declares a label named `method name`
).unwrap();

pub(crate) static ref REQUESTS_COUNTER: IntCounterVec = register_int_counter_vec(
"requests_counter",
"Total number of requests",
&["request_type"] // This declares a label named `request_type`
pub(crate) static ref TOTAL_REQUESTS_COUNTER: IntCounterVec = register_int_counter_vec(
"total_requests_counter",
"Total number of method requests by type",
&["method_name", "request_type"] // This declares a label named `method_name` and `request_type`
).unwrap();

pub(crate) static ref OPTIMISTIC_STATUS: IntGauge = try_create_int_gauge(
Expand Down Expand Up @@ -128,45 +128,57 @@ lazy_static! {
pub async fn increase_request_category_metrics(
data: &jsonrpc_v2::Data<crate::config::ServerContext>,
block_reference: &near_primitives::types::BlockReference,
method_name: &str,
block_height: Option<u64>,
) {
match block_reference {
near_primitives::types::BlockReference::BlockId(_) => {
let final_block = data.blocks_info_by_finality.final_cache_block().await;
let expected_earliest_available_block =
final_block.block_height - 5 * data.genesis_info.genesis_config.epoch_length;
if block_height.unwrap_or_default() > expected_earliest_available_block {
// By default, all requests should be historical, therefore
// if block_height is None we use `genesis.block_height` by default
if block_height.unwrap_or(data.genesis_info.genesis_block_cache.block_height)
> expected_earliest_available_block
{
// This is request to regular nodes which includes 5 last epochs
REQUESTS_COUNTER.with_label_values(&["regular"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "regular"])
.inc();
} else {
// This is a request to archival nodes which include blocks from genesis (later than 5 epochs ago)
REQUESTS_COUNTER.with_label_values(&["historical"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "historical"])
.inc();
}
}
near_primitives::types::BlockReference::Finality(finality) => {
// All Finality is requests to regular nodes which includes 5 last epochs
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "regular"])
.inc();
match finality {
// Increase the REQUESTS_COUNTER `final` metric
// Increase the TOTAL_REQUESTS_COUNTER `final` metric
// if the request has final finality
near_primitives::types::Finality::DoomSlug
| near_primitives::types::Finality::Final => {
REQUESTS_COUNTER.with_label_values(&["final"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "final"])
.inc();
}
// Increase the REQUESTS_COUNTER `optimistic` metric
// Increase the TOTAL_REQUESTS_COUNTER `optimistic` metric
// if the request has optimistic finality
near_primitives::types::Finality::None => {
REQUESTS_COUNTER.with_label_values(&["optimistic"]).inc();
// Increase the REQUESTS_COUNTER `proxy_optimistic` metric
// if the optimistic is not updating and proxy to native JSON-RPC
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
REQUESTS_COUNTER
.with_label_values(&["proxy_optimistic"])
.inc();
}
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "optimistic"])
.inc();
}
}
}
near_primitives::types::BlockReference::SyncCheckpoint(_) => {
REQUESTS_COUNTER.with_label_values(&["historical"]).inc();
TOTAL_REQUESTS_COUNTER
.with_label_values(&[method_name, "historical"])
.inc();
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions rpc-server/src/middlewares.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::metrics::{METHOD_CALLS_COUNTER, REQUESTS_COUNTER};
use crate::metrics::METHOD_CALLS_COUNTER;
use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform};
use futures::future::LocalBoxFuture;
use futures::StreamExt;
Expand Down Expand Up @@ -53,8 +53,6 @@ where
return service_clone.call(request).await;
}

REQUESTS_COUNTER.with_label_values(&["total"]).inc();

let (req, mut payload) = request.into_parts();
let mut body = actix_web::web::BytesMut::new();
while let Some(chunk) = &payload.next().await {
Expand Down
63 changes: 23 additions & 40 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ pub async fn block(
) = &block_request.block_reference
{
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
// increase metrics before proxy request
crate::metrics::increase_request_category_metrics(
&data,
&block_request.block_reference,
None,
)
.await;
// Proxy if the optimistic updating is not working
let block_view = data.near_rpc_client.call(block_request).await?;
let block_view = data
.near_rpc_client
.call(block_request, Some("optimistic"))
.await?;
return Ok(near_jsonrpc::primitives::types::blocks::RpcBlockResponse { block_view });
}
};
Expand Down Expand Up @@ -81,15 +77,11 @@ pub async fn changes_in_block_by_type(
) = &changes_in_block_request.block_reference
{
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
// increase metrics before proxy request
crate::metrics::increase_request_category_metrics(
&data,
&changes_in_block_request.block_reference,
None,
)
.await;
// Proxy if the optimistic updating is not working
return Ok(data.near_rpc_client.call(changes_in_block_request).await?);
return Ok(data
.near_rpc_client
.call(changes_in_block_request, Some("optimistic"))
.await?);
}
};

Expand All @@ -114,15 +106,11 @@ pub async fn changes_in_block(
) = &changes_in_block_request.block_reference
{
if crate::metrics::OPTIMISTIC_UPDATING.is_not_working() {
// increase metrics before proxy request
crate::metrics::increase_request_category_metrics(
&data,
&changes_in_block_request.block_reference,
None,
)
.await;
// Proxy if the optimistic updating is not working
return Ok(data.near_rpc_client.call(changes_in_block_request).await?);
return Ok(data
.near_rpc_client
.call(changes_in_block_request, Some("optimistic"))
.await?);
}
};

Expand All @@ -143,6 +131,7 @@ async fn block_call(
crate::metrics::increase_request_category_metrics(
&data,
&block_request.block_reference,
"block",
Some(block.block_view.header.height),
)
.await;
Expand Down Expand Up @@ -192,14 +181,6 @@ async fn changes_in_block_call(
.await
.map_err(near_jsonrpc::primitives::errors::RpcError::from)?;

// increase block category metrics
crate::metrics::increase_request_category_metrics(
&data,
&params.block_reference,
Some(cache_block.block_height),
)
.await;

let result = fetch_changes_in_block(&data, cache_block, &params.block_reference).await;
#[cfg(feature = "shadow_data_consistency")]
{
Expand Down Expand Up @@ -233,14 +214,6 @@ async fn changes_in_block_by_type_call(
.await
.map_err(near_jsonrpc::primitives::errors::RpcError::from)?;

// increase block category metrics
crate::metrics::increase_request_category_metrics(
&data,
&params.block_reference,
Some(cache_block.block_height),
)
.await;

let result = fetch_changes_in_block_by_type(
&data,
cache_block,
Expand Down Expand Up @@ -402,6 +375,16 @@ pub async fn fetch_chunk(
shard_id,
)
.await?;
// increase block category metrics
crate::metrics::increase_request_category_metrics(
data,
&near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height(
block_height,
)),
"chunk",
Some(block_height),
)
.await;

Ok(near_jsonrpc::primitives::types::chunks::RpcChunkResponse { chunk_view })
}
Expand Down
Loading
Loading