Skip to content

Commit

Permalink
Cherry-pick: fix metrics for establish connection (#6658)
Browse files Browse the repository at this point in the history
close #6197
  • Loading branch information
xzhangxian1008 authored Jan 19, 2023
1 parent 7ea00bf commit 9f45e26
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 44 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/TiFlashSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct TiFlashSecurityConfig
}


bool checkGrpcContext(grpc::ServerContext * grpc_context) const
bool checkGrpcContext(const grpc::ServerContext * grpc_context) const
{
if (allowed_common_names.empty() || grpc_context == nullptr)
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCo
EstablishCallData::~EstablishCallData()
{
GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Decrement();
if (stopwatch)
{
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn).Observe(stopwatch->elapsedSeconds());
}
}

EstablishCallData * EstablishCallData::spawn(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr<std::atomic<bool>> & is_shutdown)
Expand Down
98 changes: 56 additions & 42 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ grpc::Status FlashService::Coprocessor(
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_FMT_DEBUG(log, "Handling coprocessor request: {}", request->DebugString());

if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

GET_METRIC(tiflash_coprocessor_request_count, type_cop).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_cop).Increment();
Expand Down Expand Up @@ -122,10 +121,9 @@ ::grpc::Status FlashService::BatchCoprocessor(::grpc::ServerContext * grpc_conte
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_FMT_DEBUG(log, "Handling coprocessor request: {}", request->DebugString());

if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

GET_METRIC(tiflash_coprocessor_request_count, type_super_batch).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_super_batch).Increment();
Expand Down Expand Up @@ -158,10 +156,9 @@ ::grpc::Status FlashService::DispatchMPPTask(
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
LOG_FMT_DEBUG(log, "Handling mpp dispatch request: {}", request->DebugString());
if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;
GET_METRIC(tiflash_coprocessor_request_count, type_dispatch_mpp_task).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_dispatch_mpp_task).Increment();
GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Increment();
Expand Down Expand Up @@ -196,17 +193,15 @@ ::grpc::Status FlashService::IsAlive(::grpc::ServerContext * grpc_context [[mayb
::mpp::IsAliveResponse * response [[maybe_unused]])
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

auto [context, status] = createDBContext(grpc_context);
if (!status.ok())
{
return status;
}

auto & tmt_context = context->getTMTContext();
response->set_available(tmt_context.checkRunning());
return ::grpc::Status::OK;
Expand All @@ -231,25 +226,32 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon
// We need to find it out and bind the grpc stream with it.
LOG_FMT_DEBUG(log, "Handling establish mpp connection request: {}", request->DebugString());

if (!security_config.checkGrpcContext(grpc_context))
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

if (!calldata)
{
return returnStatus(calldata, grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg));
GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment();
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment();
}
GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment();
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment();

if (!tryToResetMaxThreadsMetrics())
{
GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Value(), GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Value()));
GET_METRIC(tiflash_thread_count, type_max_threads_of_raw).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_raw).Value(), GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Value()));
}
Stopwatch watch;
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn).Observe(watch.elapsedSeconds());
if (!calldata)
{
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn).Observe(watch.elapsedSeconds());
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement();
}
// TODO: update the value of metric tiflash_coprocessor_response_bytes.
});

Expand Down Expand Up @@ -299,6 +301,8 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon
if (calldata)
{
calldata->attachTunnel(tunnel);
GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment();
// In async mode, this function won't wait for the request done and the finish event is handled in EstablishCallData.
tunnel->connect(calldata);
LOG_FMT_DEBUG(tunnel->getLogger(), "connect tunnel successfully in async way");
Expand All @@ -311,9 +315,6 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon
tunnel->waitForFinish();
LOG_FMT_INFO(tunnel->getLogger(), "connection for {} cost {} ms.", tunnel->id(), stopwatch.elapsedMilliseconds());
}

// TODO: Check if there are errors in task.

return grpc::Status::OK;
}

Expand All @@ -326,10 +327,9 @@ ::grpc::Status FlashService::CancelMPPTask(
// CancelMPPTask cancels the query of the task.
LOG_FMT_DEBUG(log, "cancel mpp task request: {}", request->DebugString());

if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;
GET_METRIC(tiflash_coprocessor_request_count, type_cancel_mpp_task).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment();
Stopwatch watch;
Expand Down Expand Up @@ -361,6 +361,25 @@ String getClientMetaVarWithDefault(const grpc::ServerContext * grpc_context, con
return default_val;
}

grpc::Status FlashService::checkGrpcContext(const grpc::ServerContext * grpc_context) const
{
// For coprocessor/mpp test, we don't care about security config.
auto [context, status] = createDBContext(grpc_context);
if (!status.ok())
return status;

if (!security_config.checkGrpcContext(grpc_context))
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);

std::string peer = grpc_context->peer();
Int64 pos = peer.find(':');
if (pos == -1)
{
return grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid peer address: " + peer);
}
return grpc::Status::OK;
}

std::tuple<ContextPtr, grpc::Status> FlashService::createDBContext(const grpc::ServerContext * grpc_context) const
{
try
Expand All @@ -375,10 +394,6 @@ std::tuple<ContextPtr, grpc::Status> FlashService::createDBContext(const grpc::S
std::string quota_key = getClientMetaVarWithDefault(grpc_context, "quota_key", "");
std::string peer = grpc_context->peer();
Int64 pos = peer.find(':');
if (pos == -1)
{
return std::make_tuple(context, ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid peer address: " + peer));
}
std::string client_ip = peer.substr(pos + 1);
Poco::Net::SocketAddress client_address(client_ip);

Expand Down Expand Up @@ -430,10 +445,9 @@ std::tuple<ContextPtr, grpc::Status> FlashService::createDBContext(const grpc::S
::grpc::Status FlashService::Compact(::grpc::ServerContext * grpc_context, const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response)
{
CPUAffinityManager::getInstance().bindSelfGrpcThread();
if (!security_config.checkGrpcContext(grpc_context))
{
return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg);
}
auto check_result = checkGrpcContext(grpc_context);
if (!check_result.ok())
return check_result;

return manual_compact_manager->handleRequest(request, response);
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class FlashService : public tikvpb::Tikv::Service
::grpc::Status Compact(::grpc::ServerContext * context, const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response) override;

protected:
std::tuple<ContextPtr, ::grpc::Status> createDBContext(const grpc::ServerContext * grpc_context) const;
std::tuple<ContextPtr, grpc::Status> createDBContext(const grpc::ServerContext * grpc_context) const;
grpc::Status checkGrpcContext(const grpc::ServerContext * grpc_context) const;

IServer & server;
const TiFlashSecurityConfig & security_config;
Expand Down

0 comments on commit 9f45e26

Please sign in to comment.