Skip to content

Commit

Permalink
Add executor count metrics (#1091) (#1092)
Browse files Browse the repository at this point in the history
* add executor count metrics

* fix

Co-authored-by: xufei <[email protected]>
  • Loading branch information
ti-srebot and windtalker authored Sep 11, 2020
1 parent c45159e commit 2e2ca33
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 13 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DB
F(type_super_batch, {"type", "super_batch"}), F(type_super_batch_cop_dag, {"type", "super_batch_cop_dag"})) \
M(tiflash_coprocessor_executor_count, "Total number of each executor", Counter, F(type_ts, {"type", "table_scan"}), \
F(type_sel, {"type", "selection"}), F(type_agg, {"type", "aggregation"}), F(type_topn, {"type", "top_n"}), \
F(type_limit, {"type", "limit"})) \
F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"})) \
M(tiflash_coprocessor_request_duration_seconds, "Bucketed histogram of request duration", Histogram, \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20})) \
Expand Down
26 changes: 21 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma GCC diagnostic pop

#include <Common/TiFlashException.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/DAGQueryBlock.h>
#include <Flash/Coprocessor/DAGUtils.h>

Expand Down Expand Up @@ -63,7 +64,7 @@ void collectOutPutFieldTypesFromAgg(std::vector<tipb::FieldType> & field_type, c

/// construct DAGQueryBlock from a tree struct based executors, which is the
/// format after supporting join in dag request
DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_)
DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_, TiFlashMetricsPtr metrics)
: id(id_), root(&root_), qb_column_prefix("__QB_" + std::to_string(id_) + "_"), qb_join_subquery_alias(qb_column_prefix + "join")
{
const tipb::Executor * current = root;
Expand All @@ -72,23 +73,27 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_)
switch (current->tp())
{
case tipb::ExecType::TypeSelection:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_sel).Increment();
assignOrThrowException(&selection, current, SEL_NAME);
selection_name = current->executor_id();
current = &current->selection().child();
break;
case tipb::ExecType::TypeAggregation:
case tipb::ExecType::TypeStreamAgg:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_agg).Increment();
assignOrThrowException(&aggregation, current, AGG_NAME);
aggregation_name = current->executor_id();
collectOutPutFieldTypesFromAgg(output_field_types, current->aggregation());
current = &current->aggregation().child();
break;
case tipb::ExecType::TypeLimit:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_limit).Increment();
assignOrThrowException(&limitOrTopN, current, LIMIT_NAME);
limitOrTopN_name = current->executor_id();
current = &current->limit().child();
break;
case tipb::ExecType::TypeTopN:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_topn).Increment();
assignOrThrowException(&limitOrTopN, current, TOPN_NAME);
limitOrTopN_name = current->executor_id();
current = &current->topn().child();
Expand All @@ -109,22 +114,28 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_)
{
if (source->join().children_size() != 2)
throw TiFlashException("Join executor children size not equal to 2", Errors::Coprocessor::BadRequest);
children.push_back(std::make_shared<DAGQueryBlock>(id * 2, source->join().children(0)));
children.push_back(std::make_shared<DAGQueryBlock>(id * 2 + 1, source->join().children(1)));
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_join).Increment();
children.push_back(std::make_shared<DAGQueryBlock>(id * 2, source->join().children(0), metrics));
children.push_back(std::make_shared<DAGQueryBlock>(id * 2 + 1, source->join().children(1), metrics));
}
else
{
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_ts).Increment();
}
fillOutputFieldTypes();
}

/// construct DAGQueryBlock from a list struct based executors, which is the
/// format before supporting join in dag request
DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrField<tipb::Executor> & executors)
DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrField<tipb::Executor> & executors, TiFlashMetricsPtr metrics)
: id(id_), root(nullptr), qb_column_prefix("__QB_" + std::to_string(id_) + "_"), qb_join_subquery_alias(qb_column_prefix + "join")
{
for (int i = (int)executors.size() - 1; i >= 0; i--)
{
switch (executors[i].tp())
{
case tipb::ExecType::TypeTableScan:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_ts).Increment();
assignOrThrowException(&source, &executors[i], SOURCE_NAME);
/// use index as the prefix for executor name so when we sort by
/// the executor name, it will result in the same order as it is
Expand All @@ -133,25 +144,30 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
source_name = std::to_string(i) + "_tablescan";
break;
case tipb::ExecType::TypeSelection:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_sel).Increment();
assignOrThrowException(&selection, &executors[i], SEL_NAME);
selection_name = std::to_string(i) + "_selection";
break;
case tipb::ExecType::TypeStreamAgg:
case tipb::ExecType::TypeAggregation:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_agg).Increment();
assignOrThrowException(&aggregation, &executors[i], AGG_NAME);
aggregation_name = std::to_string(i) + "_aggregation";
collectOutPutFieldTypesFromAgg(output_field_types, executors[i].aggregation());
break;
case tipb::ExecType::TypeTopN:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_topn).Increment();
assignOrThrowException(&limitOrTopN, &executors[i], TOPN_NAME);
limitOrTopN_name = std::to_string(i) + "_limitOrTopN";
break;
case tipb::ExecType::TypeLimit:
GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_limit).Increment();
assignOrThrowException(&limitOrTopN, &executors[i], LIMIT_NAME);
limitOrTopN_name = std::to_string(i) + "_limitOrTopN";
break;
default:
throw TiFlashException("Unsupported executor in DAG request: " + executors[i].DebugString(), Errors::Coprocessor::Unimplemented);
throw TiFlashException(
"Unsupported executor in DAG request: " + executors[i].DebugString(), Errors::Coprocessor::Unimplemented);
}
}
fillOutputFieldTypes();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ using TiFlashMetricsPtr = std::shared_ptr<TiFlashMetrics>;
class DAGQueryBlock
{
public:
DAGQueryBlock(UInt32 id, const tipb::Executor & root);
DAGQueryBlock(UInt32 id, const ::google::protobuf::RepeatedPtrField<tipb::Executor> & executors);
DAGQueryBlock(UInt32 id, const tipb::Executor & root, TiFlashMetricsPtr metrics);
DAGQueryBlock(UInt32 id, const ::google::protobuf::RepeatedPtrField<tipb::Executor> & executors, TiFlashMetricsPtr metrics);
/// the xxx_name is added for compatibility issues: before join is supported, executor does not
/// has executor name, after join is supported in dag request, every executor has an unique
/// name(executor->executor_id()). Since We can not always get the executor name from executor
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, co
dag_context(dag_context_),
regions(regions_),
dag_request(dag_request_),
metrics(context.getTiFlashMetrics()),
is_batch_cop(is_batch_cop_)
{
if (dag_request.has_root_executor())
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.root_executor());
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.root_executor(), context.getTiFlashMetrics());
}
else
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors(), context.getTiFlashMetrics());
}
root_query_block->collectAllPossibleChildrenJoinSubqueryAlias(dag_context.qb_id_to_join_alias_map);
for (Int32 i : dag_request.output_offsets())
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class DAGQuerySource : public IQuerySource

const tipb::DAGRequest & dag_request;

TiFlashMetricsPtr metrics;

std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type;
std::shared_ptr<DAGQueryBlock> root_query_block;
Expand Down

0 comments on commit 2e2ca33

Please sign in to comment.