diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 0f837a6d86b..f8db2575c96 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -77,8 +77,7 @@ namespace DB M(tiflash_coprocessor_request_memory_usage, "Bucketed histogram of request memory usage", Histogram, \ F(type_cop, {{"type", "cop"}}, ExpBuckets{1024 * 1024, 2, 16}), \ F(type_batch, {{"type", "batch"}}, ExpBuckets{1024 * 1024, 2, 20}), \ - F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{1024 * 1024, 2, 20}), \ - F(type_run_mpp_query, {{"type", "run_mpp_query"}}, ExpBuckets{1024 * 1024, 2, 20})) \ + F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{1024 * 1024, 2, 20})) \ M(tiflash_coprocessor_request_error, "Total number of request error", Counter, F(reason_meet_lock, {"reason", "meet_lock"}), \ F(reason_region_not_found, {"reason", "region_not_found"}), F(reason_epoch_not_match, {"reason", "epoch_not_match"}), \ F(reason_kv_client_error, {"reason", "kv_client_error"}), F(reason_internal_error, {"reason", "internal_error"}), \ diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index 133d730ee18..3d2077682f2 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -39,7 +39,7 @@ void setTipbRegionInfo(coprocessor::RegionInfo * tipb_region_info, const std::pa range->set_end(RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id)); } -BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, DAGSchema & root_task_schema, const String & root_addr) +BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, DAGSchema & root_task_schema, const String & root_addr, bool enable_local_tunnel) { for (auto & field : root_task_schema) { @@ -65,7 +65,7 @@ BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb: root_tm, context.getTMTContext().getKVCluster(), context.getTMTContext().getMPPTaskManager(), - false, + enable_local_tunnel, context.getSettingsRef().enable_async_grpc_client), tipb_exchange_receiver.encoded_task_meta_size(), 10, @@ -77,7 +77,7 @@ BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb: return ret; } -BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProperties & properties, std::vector & root_task_ids, DAGSchema & root_task_schema) +BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProperties & properties, std::vector & root_task_ids, DAGSchema & root_task_schema, bool enable_local_tunnel) { tipb::ExchangeReceiver tipb_exchange_receiver; for (const auto root_task_id : root_task_ids) @@ -93,7 +93,7 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp auto * tm_string = tipb_exchange_receiver.add_encoded_task_meta(); tm.AppendToString(tm_string); } - return constructRootExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, Debug::LOCAL_HOST); + return constructExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, Debug::LOCAL_HOST, enable_local_tunnel); } void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, Int64 task_id, String & addr) @@ -116,7 +116,7 @@ BlockInputStreamPtr prepareRootExchangeReceiverWithMultipleContext(Context & con prepareExchangeReceiverMetaWithMultipleContext(tipb_exchange_receiver, properties, task_id, addr); - return constructRootExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, root_addr); + return constructExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, root_addr, true); } void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr req, const DAGProperties & properties, std::vector & root_task_ids, DAGSchema & root_task_schema, String & addr) @@ -222,7 +222,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro if (call.getResp()->has_error()) throw Exception("Meet error while dispatch mpp task: " + call.getResp()->error().msg()); } - return prepareRootExchangeReceiver(context, properties, root_task_ids, root_task_schema); + return prepareRootExchangeReceiver(context, properties, root_task_ids, root_task_schema, context.getSettingsRef().enable_local_tunnel); } BlockInputStreamPtr executeNonMPPQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 0ff40775dd6..9545151a7a7 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -562,6 +562,7 @@ void ExchangeReceiverBase::setUpLocalConnections(std::vectoridentifier(), req_info)); LocalRequestHandler local_request_handler( + getMemoryTracker(), [this, log = local_log](bool meet_error, const String & local_err_msg) { this->connectionDone(meet_error, local_err_msg, log); }, diff --git a/dbms/src/Flash/Mpp/LocalRequestHandler.h b/dbms/src/Flash/Mpp/LocalRequestHandler.h index fc436bc5885..cd885cb370e 100644 --- a/dbms/src/Flash/Mpp/LocalRequestHandler.h +++ b/dbms/src/Flash/Mpp/LocalRequestHandler.h @@ -22,11 +22,13 @@ namespace DB struct LocalRequestHandler { LocalRequestHandler( + MemoryTracker * recv_mem_tracker_, std::function && notify_write_done_, std::function && notify_close_, std::function && add_local_conn_num_, ReceiverChannelWriter && channel_writer_) - : notify_write_done(std::move(notify_write_done_)) + : recv_mem_tracker(recv_mem_tracker_) + , notify_write_done(std::move(notify_write_done_)) , notify_close(std::move(notify_close_)) , add_local_conn_num(std::move(add_local_conn_num_)) , channel_writer(std::move(channel_writer_)) @@ -73,6 +75,7 @@ struct LocalRequestHandler return waiting_task_time; } + MemoryTracker * recv_mem_tracker; std::function notify_write_done; std::function notify_close; std::function add_local_conn_num; diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index bef23bffdd2..36a9e787bf7 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -99,9 +99,7 @@ void MPPTaskMonitorHelper::initAndAddself(MPPTaskManager * manager_, const Strin MPPTaskMonitorHelper::~MPPTaskMonitorHelper() { if (initialized) - { manager->removeMonitoredTask(task_unique_id); - } } MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) @@ -121,9 +119,8 @@ MPPTask::~MPPTask() { /// MPPTask maybe destructed by different thread, set the query memory_tracker /// to current_memory_tracker in the destructor - auto * query_memory_tracker = getMemoryTracker(); - if (query_memory_tracker != nullptr && current_memory_tracker != query_memory_tracker) - current_memory_tracker = query_memory_tracker; + if (process_list_entry != nullptr && current_memory_tracker != process_list_entry->get().getMemoryTrackerPtr().get()) + current_memory_tracker = process_list_entry->get().getMemoryTrackerPtr().get(); abortTunnels("", true); LOG_INFO(log, "finish MPPTask: {}", id.toString()); } @@ -190,8 +187,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) if (unlikely(!task_meta.ParseFromString(exchange_sender.encoded_task_meta(i)))) throw TiFlashException("Failed to decode task meta info in ExchangeSender", Errors::Coprocessor::BadRequest); - /// when the receiver task is root task, it should never be local tunnel - bool is_local = context->getSettingsRef().enable_local_tunnel && task_meta.task_id() != -1 && meta.address() == task_meta.address(); + bool is_local = context->getSettingsRef().enable_local_tunnel && meta.address() == task_meta.address(); bool is_async = !is_local && context->getSettingsRef().enable_async_server; MPPTunnelPtr tunnel = std::make_shared( task_meta, @@ -297,13 +293,6 @@ void MPPTask::setErrString(const String & message) err_string = message; } -MemoryTracker * MPPTask::getMemoryTracker() const -{ - if (process_list_entry_holder.process_list_entry != nullptr) - return process_list_entry_holder.process_list_entry->get().getMemoryTrackerPtr().get(); - return nullptr; -} - void MPPTask::unregisterTask() { auto [result, reason] = manager->unregisterTask(id); @@ -313,19 +302,6 @@ void MPPTask::unregisterTask() LOG_WARNING(log, "task failed to unregister, reason: {}", reason); } -void MPPTask::initProcessListEntry(MPPTaskManagerPtr & task_manager) -{ - /// all the mpp tasks of the same mpp query shares the same process list entry - auto [query_process_list_entry, aborted_reason] = task_manager->getOrCreateQueryProcessListEntry(id.query_id, context); - if (!aborted_reason.empty()) - throw TiFlashException(fmt::format("MPP query is already aborted, aborted reason: {}", aborted_reason), Errors::Coprocessor::Internal); - assert(query_process_list_entry != nullptr); - process_list_entry_holder.process_list_entry = query_process_list_entry; - dag_context->setProcessListEntry(query_process_list_entry); - context->setProcessListElement(&query_process_list_entry->get()); - current_memory_tracker = getMemoryTracker(); -} - void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) { dag_req = getDAGRequestFromStringWithRetry(task_request.encoded_plan()); @@ -382,13 +358,13 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) dag_context->tidb_host = context->getClientInfo().current_address.toString(); context->setDAGContext(dag_context.get()); - - auto task_manager = tmt_context.getMPPTaskManager(); - initProcessListEntry(task_manager); + process_list_entry = setProcessListElement(*context, dag_context->dummy_query_string, dag_context->dummy_ast.get()); + dag_context->setProcessListEntry(process_list_entry); injectFailPointBeforeRegisterTunnel(dag_context->isRootMPPTask()); registerTunnels(task_request); + auto task_manager = tmt_context.getMPPTaskManager(); LOG_DEBUG(log, "begin to register the task {}", id.toString()); injectFailPointBeforeRegisterMPPTask(dag_context->isRootMPPTask()); @@ -429,7 +405,7 @@ void MPPTask::preprocess() void MPPTask::runImpl() { CPUAffinityManager::getInstance().bindSelfQueryThread(); - RUNTIME_ASSERT(current_memory_tracker == getMemoryTracker(), log, "The current memory tracker is not set correctly for MPPTask::runImpl"); + RUNTIME_ASSERT(current_memory_tracker == process_list_entry->get().getMemoryTrackerPtr().get(), log, "The current memory tracker is not set correctly for MPPTask::runImpl"); if (!switchStatus(INITIALIZING, RUNNING)) { LOG_WARNING(log, "task not in initializing state, skip running"); @@ -532,9 +508,9 @@ void MPPTask::runImpl() // todo when error happens, should try to update the metrics if it is available if (auto throughput = dag_context->getTableScanThroughput(); throughput.first) GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second); - /// note that memory_tracker is shared by all the mpp tasks, the peak memory usage is not accurate - /// todo log executor level peak memory usage instead - auto peak_memory = getMemoryTracker()->getPeak(); + auto process_info = context->getProcessListElement()->getInfo(); + auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0; + GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory); mpp_task_statistics.setMemoryPeak(peak_memory); } } diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 2061d674740..c1994930b42 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -37,7 +37,6 @@ namespace DB { class MPPTaskManager; -using MPPTaskManagerPtr = std::shared_ptr; class DAGContext; class ProcessListEntry; @@ -124,27 +123,12 @@ class MPPTask : public std::enable_shared_from_this void registerTunnels(const mpp::DispatchTaskRequest & task_request); - void initProcessListEntry(MPPTaskManagerPtr & task_manager); - void initExchangeReceivers(); String getErrString() const; void setErrString(const String & message); - MemoryTracker * getMemoryTracker() const; - private: - struct ProcessListEntryHolder - { - std::shared_ptr process_list_entry; - ~ProcessListEntryHolder() - { - /// Because MemoryTracker is now saved in `MPPQueryTaskSet` and shared by all the mpp tasks belongs to the same mpp query, - /// it may not be destructed when MPPTask is destructed, so need to manually reset current_memory_tracker to nullptr at the - /// end of the destructor of MPPTask, otherwise, current_memory_tracker may point to a invalid memory tracker - current_memory_tracker = nullptr; - } - }; // We must ensure this member variable is put at this place to be destructed at proper time MPPTaskMonitorHelper mpp_task_monitor_helper; @@ -160,11 +144,12 @@ class MPPTask : public std::enable_shared_from_this MPPTaskScheduleEntry schedule_entry; - ProcessListEntryHolder process_list_entry_holder; // `dag_context` holds inputstreams which could hold ref to `context` so it should be destructed // before `context`. std::unique_ptr dag_context; + std::shared_ptr process_list_entry; + QueryExecutorHolder query_executor_holder; std::atomic status{INITIALIZING}; diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 4608930e11a..29e7d745dc5 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -15,16 +15,13 @@ #include #include #include -#include #include #include -#include -#include -#include #include #include #include +#include #include #include #include @@ -37,15 +34,6 @@ extern const char random_task_manager_find_task_failure_failpoint[]; extern const char pause_before_register_non_root_mpp_task[]; } // namespace FailPoints -MPPQueryTaskSet::~MPPQueryTaskSet() -{ - if likely (process_list_entry != nullptr) - { - auto peak_memory = process_list_entry->get().getMemoryTrackerPtr()->getPeak(); - GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_query).Observe(peak_memory); - } -} - MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE) @@ -247,13 +235,14 @@ std::pair MPPTaskManager::registerTask(MPPTaskPtr task) { return {false, fmt::format("query is being aborted, error message = {}", error_msg)}; } - /// query_set must not be nullptr if the current query is not aborted since MPPTask::initProcessListEntry - /// will always create the query_set - RUNTIME_CHECK_MSG(query_set != nullptr, "query set must not be null when register task"); - if (query_set->task_map.find(task->id) != query_set->task_map.end()) + if (query_set != nullptr && query_set->task_map.find(task->id) != query_set->task_map.end()) { return {false, "task has been registered"}; } + if (query_set == nullptr) /// the first one + { + query_set = addMPPQueryTaskSet(task->id.query_id); + } query_set->task_map.emplace(task->id, task); /// cancel all the alarm waiting on this task auto alarm_it = query_set->alarms.find(task->id.task_id); @@ -304,25 +293,6 @@ String MPPTaskManager::toString() return res + ")"; } -std::pair, String> MPPTaskManager::getOrCreateQueryProcessListEntry(const MPPQueryId & query_id, const ContextPtr & context) -{ - std::lock_guard lock(mu); - auto [query_set, abort_reason] = getQueryTaskSetWithoutLock(query_id); - if (!abort_reason.empty()) - return {nullptr, abort_reason}; - if (query_set == nullptr) - query_set = addMPPQueryTaskSet(query_id); - if (query_set->process_list_entry == nullptr) - { - query_set->process_list_entry = setProcessListElement( - *context, - context->getDAGContext()->dummy_query_string, - context->getDAGContext()->dummy_ast.get(), - true); - } - return {query_set->process_list_entry, ""}; -} - std::pair MPPTaskManager::getQueryTaskSetWithoutLock(const MPPQueryId & query_id) { auto it = mpp_query_map.find(query_id); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index f702aa8f9b0..83046e327fd 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -40,7 +40,6 @@ struct MPPQueryTaskSet /// task can only be registered state is Normal State state = Normal; String error_message; - std::shared_ptr process_list_entry; MPPTaskMap task_map; std::unordered_map> alarms; /// only used in scheduler @@ -53,7 +52,6 @@ struct MPPQueryTaskSet { return state == Normal || state == Aborted; } - ~MPPQueryTaskSet(); }; /// A simple thread unsafe FIFO cache used to fix the "lost cancel" issues @@ -195,8 +193,6 @@ class MPPTaskManager : private boost::noncopyable void abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type); - std::pair, String> getOrCreateQueryProcessListEntry(const MPPQueryId & query_id, const ContextPtr & context); - String toString(); private: diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 56ba3971095..c41f4f14c8c 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -459,6 +459,9 @@ std::shared_ptr LocalTunnelSenderV1::readForLocal() if (result == MPMCQueueResult::OK) { MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong()); + + // switch tunnel's memory tracker into receiver's + res->switchMemTracker(current_memory_tracker); return res; } else if (result == MPMCQueueResult::CANCELLED) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 0e077138e12..86998a4aefe 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -354,6 +354,11 @@ class LocalTunnelSenderV2 : public TunnelSender if (unlikely(checkPacketErr(data))) return false; + // receiver_mem_tracker pointer will always be valid because ExchangeReceiverBase won't be destructed + // before all local tunnels are destructed so that the MPPTask which contains ExchangeReceiverBase and + // is responsible for deleting receiver_mem_tracker must be destroyed after these local tunnels. + data->switchMemTracker(local_request_handler.recv_mem_tracker); + // When ExchangeReceiver receives data from local and remote tiflash, number of local tunnel threads // is very large and causes the time of transfering data by grpc threads becomes longer, because // grpc thread is hard to get chance to push data into MPMCQueue in ExchangeReceiver. diff --git a/dbms/src/Flash/Mpp/TrackedMppDataPacket.h b/dbms/src/Flash/Mpp/TrackedMppDataPacket.h index 76af732c682..f9530cb689b 100644 --- a/dbms/src/Flash/Mpp/TrackedMppDataPacket.h +++ b/dbms/src/Flash/Mpp/TrackedMppDataPacket.h @@ -110,7 +110,7 @@ struct MemTrackerWrapper struct TrackedMppDataPacket { - TrackedMppDataPacket(const mpp::MPPDataPacket & data, MemoryTracker * memory_tracker) + explicit TrackedMppDataPacket(const mpp::MPPDataPacket & data, MemoryTracker * memory_tracker) : mem_tracker_wrapper(estimateAllocatedSize(data), memory_tracker) { packet = data; diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 3cc6c088fa8..f83a169d71a 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -191,6 +191,7 @@ class MockExchangeReceiver for (auto & tunnel : tunnels) { LocalRequestHandler local_request_handler( + nullptr, [this](bool meet_error, const String & local_err_msg) { this->connectionDone(meet_error, local_err_msg); }, @@ -654,6 +655,7 @@ try ReceivedMessageQueue received_message_queue(mock_ptr, Logger::get(), 1, false, 0); LocalRequestHandler local_req_handler( + nullptr, [](bool, const String &) {}, []() {}, []() {}, @@ -674,6 +676,7 @@ try AsyncRequestHandlerWaitQueuePtr mock_ptr = std::make_shared(); ReceivedMessageQueue queue(mock_ptr, Logger::get(), 1, false, 0); LocalRequestHandler local_req_handler( + nullptr, [](bool, const String &) {}, []() {}, []() {}, diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index c3c1988d017..894973e1434 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -60,7 +60,7 @@ ProcessList::EntryPtr getProcessListEntry(Context & context, DAGContext & dag_co { if (dag_context.is_mpp_task) { - /// for MPPTask, process list entry is set in MPPTask::initProcessListEntry() + /// for MPPTask, process list entry is created in MPPTask::prepare() RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr"); return dag_context.getProcessListEntry(); } @@ -70,8 +70,7 @@ ProcessList::EntryPtr getProcessListEntry(Context & context, DAGContext & dag_co auto process_list_entry = setProcessListElement( context, dag_context.dummy_query_string, - dag_context.dummy_ast.get(), - true); + dag_context.dummy_ast.get()); dag_context.setProcessListEntry(process_list_entry); return process_list_entry; } diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index 37bbeb770a7..e19858a82f4 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -83,8 +83,7 @@ ProcessList::EntryPtr ProcessList::insert( const IAST * ast, const ClientInfo & client_info, const Settings & settings, - const UInt64 total_memory, - bool is_dag_task) + const UInt64 total_memory) { EntryPtr res; @@ -142,7 +141,7 @@ ProcessList::EntryPtr ProcessList::insert( ++cur_size; - res = std::make_shared(*this, cont.emplace(cont.end(), query_, client_info, settings.max_memory_usage.getActualBytes(total_memory), settings.memory_tracker_fault_probability, priorities.insert(settings.priority), is_dag_task)); + res = std::make_shared(*this, cont.emplace(cont.end(), query_, client_info, settings.max_memory_usage.getActualBytes(total_memory), settings.memory_tracker_fault_probability, priorities.insert(settings.priority))); ProcessListForUser & user_process_list = user_to_queries[client_info.current_user]; user_process_list.queries.emplace(client_info.current_query_id, &res->get()); @@ -218,11 +217,11 @@ ProcessListEntry::~ProcessListEntry() auto range = user_process_list.queries.equal_range(query_id); if (range.first != range.second) { - for (auto current_it = range.first; current_it != range.second; ++current_it) + for (auto it = range.first; it != range.second; ++it) { - if (current_it->second == process_list_element_ptr) + if (it->second == process_list_element_ptr) { - user_process_list.queries.erase(current_it); + user_process_list.queries.erase(it); found = true; break; } @@ -255,8 +254,6 @@ ProcessListEntry::~ProcessListEntry() void ProcessListElement::setQueryStreams(const BlockIO & io) { - if likely (for_dag_task) - return; std::lock_guard lock(query_streams_mutex); query_stream_in = io.in; @@ -266,8 +263,6 @@ void ProcessListElement::setQueryStreams(const BlockIO & io) void ProcessListElement::releaseQueryStreams() { - if likely (for_dag_task) - return; BlockInputStreamPtr in; BlockOutputStreamPtr out; @@ -284,7 +279,6 @@ void ProcessListElement::releaseQueryStreams() bool ProcessListElement::streamsAreReleased() { - RUNTIME_CHECK_MSG(!for_dag_task, "Should not reach here for dag task"); std::lock_guard lock(query_streams_mutex); return query_streams_status == QueryStreamsStatus::Released; @@ -292,7 +286,6 @@ bool ProcessListElement::streamsAreReleased() bool ProcessListElement::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const { - RUNTIME_CHECK_MSG(!for_dag_task, "Should not reach here for dag task"); std::lock_guard lock(query_streams_mutex); if (query_streams_status != QueryStreamsStatus::Initialized) diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index 39a37263bf0..a5da97c3fcb 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -85,10 +85,6 @@ class ProcessListElement QueryPriorities::Handle priority_handle; - /// if for_dag_task is true, it means the request comes from TiDB, processList is only used to - /// maintain the memory tracker, all the query streams related field is useless - bool for_dag_task; - std::atomic is_killed{false}; /// Be careful using it. For example, queries field could be modified concurrently. @@ -117,13 +113,11 @@ class ProcessListElement const ClientInfo & client_info_, size_t max_memory_usage, double memory_tracker_fault_probability, - QueryPriorities::Handle && priority_handle_, - bool for_dag_task_) + QueryPriorities::Handle && priority_handle_) : query(query_) , client_info(client_info_) , memory_tracker(MemoryTracker::create(max_memory_usage)) , priority_handle(std::move(priority_handle_)) - , for_dag_task(for_dag_task_) { memory_tracker->setDescription("(for query)"); current_memory_tracker = memory_tracker.get(); @@ -316,7 +310,7 @@ class ProcessList * If timeout is passed - throw an exception. * Don't count KILL QUERY queries. */ - EntryPtr insert(const String & query_, const IAST * ast, const ClientInfo & client_info, const Settings & settings, const UInt64 total_memory, bool is_dag_task); + EntryPtr insert(const String & query_, const IAST * ast, const ClientInfo & client_info, const Settings & settings, const UInt64 total_memory); /// Number of currently executing queries. size_t size() const { return cur_size; } diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 3472b91dfe0..2abd5e3a558 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -147,7 +147,7 @@ void onExceptionBeforeStart(const String & query, Context & context, time_t curr } std::tuple executeQueryImpl( - SQLQuerySource & query_src, + IQuerySource & query_src, Context & context, bool internal, QueryProcessingStage::Enum stage) @@ -205,7 +205,7 @@ std::tuple executeQueryImpl( ProcessList::EntryPtr process_list_entry; if (!internal && nullptr == typeid_cast(&*ast)) { - process_list_entry = setProcessListElement(context, query, ast.get(), false); + process_list_entry = setProcessListElement(context, query, ast.get()); } FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); @@ -401,8 +401,7 @@ void prepareForInputStream( std::shared_ptr setProcessListElement( Context & context, const String & query, - const IAST * ast, - bool is_dag_task) + const IAST * ast) { assert(ast); auto total_memory = context.getServerInfo().has_value() ? context.getServerInfo()->memory_info.capacity : 0; @@ -411,8 +410,7 @@ std::shared_ptr setProcessListElement( ast, context.getClientInfo(), context.getSettingsRef(), - total_memory, - is_dag_task); + total_memory); context.setProcessListElement(&process_list_entry->get()); return process_list_entry; } diff --git a/dbms/src/Interpreters/executeQuery.h b/dbms/src/Interpreters/executeQuery.h index 9564ec1cf55..f33aed02ea3 100644 --- a/dbms/src/Interpreters/executeQuery.h +++ b/dbms/src/Interpreters/executeQuery.h @@ -55,8 +55,7 @@ BlockIO executeQuery( std::shared_ptr setProcessListElement( Context & context, const String & query, - const IAST * ast, - bool is_dag_task); + const IAST * ast); void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in);