diff --git a/dbms/src/Common/TiFlashSecurity.h b/dbms/src/Common/TiFlashSecurity.h index 8dde3fe5a98..acd76d85c24 100644 --- a/dbms/src/Common/TiFlashSecurity.h +++ b/dbms/src/Common/TiFlashSecurity.h @@ -129,7 +129,7 @@ struct TiFlashSecurityConfig } - bool checkGrpcContext(grpc::ServerContext * grpc_context) const + bool checkGrpcContext(const grpc::ServerContext * grpc_context) const { if (allowed_common_names.empty() || grpc_context == nullptr) { diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 8af81e30962..6544c5e6446 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -37,6 +37,11 @@ EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCo EstablishCallData::~EstablishCallData() { GET_METRIC(tiflash_object_count, type_count_of_establish_calldata).Decrement(); + if (stopwatch) + { + GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement(); + GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn).Observe(stopwatch->elapsedSeconds()); + } } EstablishCallData * EstablishCallData::spawn(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown) diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 63c75e126fc..e60420e1d54 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -88,10 +88,9 @@ grpc::Status FlashService::Coprocessor( CPUAffinityManager::getInstance().bindSelfGrpcThread(); LOG_FMT_DEBUG(log, "Handling coprocessor request: {}", request->DebugString()); - if (!security_config.checkGrpcContext(grpc_context)) - { - return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); - } + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; GET_METRIC(tiflash_coprocessor_request_count, type_cop).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_cop).Increment(); @@ -122,10 +121,9 @@ ::grpc::Status FlashService::BatchCoprocessor(::grpc::ServerContext * grpc_conte CPUAffinityManager::getInstance().bindSelfGrpcThread(); LOG_FMT_DEBUG(log, "Handling coprocessor request: {}", request->DebugString()); - if (!security_config.checkGrpcContext(grpc_context)) - { - return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); - } + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; GET_METRIC(tiflash_coprocessor_request_count, type_super_batch).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_super_batch).Increment(); @@ -158,10 +156,9 @@ ::grpc::Status FlashService::DispatchMPPTask( { CPUAffinityManager::getInstance().bindSelfGrpcThread(); LOG_FMT_DEBUG(log, "Handling mpp dispatch request: {}", request->DebugString()); - if (!security_config.checkGrpcContext(grpc_context)) - { - return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); - } + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; GET_METRIC(tiflash_coprocessor_request_count, type_dispatch_mpp_task).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_dispatch_mpp_task).Increment(); GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Increment(); @@ -196,17 +193,15 @@ ::grpc::Status FlashService::IsAlive(::grpc::ServerContext * grpc_context [[mayb ::mpp::IsAliveResponse * response [[maybe_unused]]) { CPUAffinityManager::getInstance().bindSelfGrpcThread(); - if (!security_config.checkGrpcContext(grpc_context)) - { - return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); - } + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; auto [context, status] = createDBContext(grpc_context); if (!status.ok()) { return status; } - auto & tmt_context = context->getTMTContext(); response->set_available(tmt_context.checkRunning()); return ::grpc::Status::OK; @@ -231,14 +226,18 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon // We need to find it out and bind the grpc stream with it. LOG_FMT_DEBUG(log, "Handling establish mpp connection request: {}", request->DebugString()); - if (!security_config.checkGrpcContext(grpc_context)) + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; + + if (!calldata) { - return returnStatus(calldata, grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg)); + GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment(); + GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment(); + GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment(); + GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment(); } - GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment(); - GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment(); - GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment(); - GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment(); + if (!tryToResetMaxThreadsMetrics()) { GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Value(), GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Value())); @@ -246,10 +245,13 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon } Stopwatch watch; SCOPE_EXIT({ - GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement(); - GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement(); - GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement(); - GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn).Observe(watch.elapsedSeconds()); + if (!calldata) + { + GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement(); + GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn).Observe(watch.elapsedSeconds()); + GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement(); + GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement(); + } // TODO: update the value of metric tiflash_coprocessor_response_bytes. }); @@ -299,6 +301,8 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon if (calldata) { calldata->attachTunnel(tunnel); + GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment(); + GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment(); // In async mode, this function won't wait for the request done and the finish event is handled in EstablishCallData. tunnel->connect(calldata); LOG_FMT_DEBUG(tunnel->getLogger(), "connect tunnel successfully in async way"); @@ -311,9 +315,6 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon tunnel->waitForFinish(); LOG_FMT_INFO(tunnel->getLogger(), "connection for {} cost {} ms.", tunnel->id(), stopwatch.elapsedMilliseconds()); } - - // TODO: Check if there are errors in task. - return grpc::Status::OK; } @@ -326,10 +327,9 @@ ::grpc::Status FlashService::CancelMPPTask( // CancelMPPTask cancels the query of the task. LOG_FMT_DEBUG(log, "cancel mpp task request: {}", request->DebugString()); - if (!security_config.checkGrpcContext(grpc_context)) - { - return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); - } + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; GET_METRIC(tiflash_coprocessor_request_count, type_cancel_mpp_task).Increment(); GET_METRIC(tiflash_coprocessor_handling_request_count, type_cancel_mpp_task).Increment(); Stopwatch watch; @@ -361,6 +361,25 @@ String getClientMetaVarWithDefault(const grpc::ServerContext * grpc_context, con return default_val; } +grpc::Status FlashService::checkGrpcContext(const grpc::ServerContext * grpc_context) const +{ + // For coprocessor/mpp test, we don't care about security config. + auto [context, status] = createDBContext(grpc_context); + if (!status.ok()) + return status; + + if (!security_config.checkGrpcContext(grpc_context)) + return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); + + std::string peer = grpc_context->peer(); + Int64 pos = peer.find(':'); + if (pos == -1) + { + return grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid peer address: " + peer); + } + return grpc::Status::OK; +} + std::tuple FlashService::createDBContext(const grpc::ServerContext * grpc_context) const { try @@ -375,10 +394,6 @@ std::tuple FlashService::createDBContext(const grpc::S std::string quota_key = getClientMetaVarWithDefault(grpc_context, "quota_key", ""); std::string peer = grpc_context->peer(); Int64 pos = peer.find(':'); - if (pos == -1) - { - return std::make_tuple(context, ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid peer address: " + peer)); - } std::string client_ip = peer.substr(pos + 1); Poco::Net::SocketAddress client_address(client_ip); @@ -430,10 +445,9 @@ std::tuple FlashService::createDBContext(const grpc::S ::grpc::Status FlashService::Compact(::grpc::ServerContext * grpc_context, const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response) { CPUAffinityManager::getInstance().bindSelfGrpcThread(); - if (!security_config.checkGrpcContext(grpc_context)) - { - return grpc::Status(grpc::PERMISSION_DENIED, tls_err_msg); - } + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; return manual_compact_manager->handleRequest(request, response); } diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 2b39479ac49..10e390b41ea 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -81,7 +81,8 @@ class FlashService : public tikvpb::Tikv::Service ::grpc::Status Compact(::grpc::ServerContext * context, const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response) override; protected: - std::tuple createDBContext(const grpc::ServerContext * grpc_context) const; + std::tuple createDBContext(const grpc::ServerContext * grpc_context) const; + grpc::Status checkGrpcContext(const grpc::ServerContext * grpc_context) const; IServer & server; const TiFlashSecurityConfig & security_config;