Skip to content

Commit

Permalink
Revert "Introduce mpp query level processListEntry (#7644)" (#7688)
Browse files Browse the repository at this point in the history
ref #7687
  • Loading branch information
windtalker authored Jun 21, 2023
1 parent 7aafea8 commit a57ec6d
Show file tree
Hide file tree
Showing 17 changed files with 56 additions and 132 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ namespace DB
M(tiflash_coprocessor_request_memory_usage, "Bucketed histogram of request memory usage", Histogram, \
F(type_cop, {{"type", "cop"}}, ExpBuckets{1024 * 1024, 2, 16}), \
F(type_batch, {{"type", "batch"}}, ExpBuckets{1024 * 1024, 2, 20}), \
F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{1024 * 1024, 2, 20}), \
F(type_run_mpp_query, {{"type", "run_mpp_query"}}, ExpBuckets{1024 * 1024, 2, 20})) \
F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{1024 * 1024, 2, 20})) \
M(tiflash_coprocessor_request_error, "Total number of request error", Counter, F(reason_meet_lock, {"reason", "meet_lock"}), \
F(reason_region_not_found, {"reason", "region_not_found"}), F(reason_epoch_not_match, {"reason", "epoch_not_match"}), \
F(reason_kv_client_error, {"reason", "kv_client_error"}), F(reason_internal_error, {"reason", "internal_error"}), \
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void setTipbRegionInfo(coprocessor::RegionInfo * tipb_region_info, const std::pa
range->set_end(RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id));
}

BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, DAGSchema & root_task_schema, const String & root_addr)
BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, DAGSchema & root_task_schema, const String & root_addr, bool enable_local_tunnel)
{
for (auto & field : root_task_schema)
{
Expand All @@ -65,7 +65,7 @@ BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb:
root_tm,
context.getTMTContext().getKVCluster(),
context.getTMTContext().getMPPTaskManager(),
false,
enable_local_tunnel,
context.getSettingsRef().enable_async_grpc_client),
tipb_exchange_receiver.encoded_task_meta_size(),
10,
Expand All @@ -77,7 +77,7 @@ BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb:
return ret;
}

BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProperties & properties, std::vector<Int64> & root_task_ids, DAGSchema & root_task_schema)
BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProperties & properties, std::vector<Int64> & root_task_ids, DAGSchema & root_task_schema, bool enable_local_tunnel)
{
tipb::ExchangeReceiver tipb_exchange_receiver;
for (const auto root_task_id : root_task_ids)
Expand All @@ -93,7 +93,7 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp
auto * tm_string = tipb_exchange_receiver.add_encoded_task_meta();
tm.AppendToString(tm_string);
}
return constructRootExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, Debug::LOCAL_HOST);
return constructExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, Debug::LOCAL_HOST, enable_local_tunnel);
}

void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, Int64 task_id, String & addr)
Expand All @@ -116,7 +116,7 @@ BlockInputStreamPtr prepareRootExchangeReceiverWithMultipleContext(Context & con

prepareExchangeReceiverMetaWithMultipleContext(tipb_exchange_receiver, properties, task_id, addr);

return constructRootExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, root_addr);
return constructExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, root_addr, true);
}

void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr<mpp::DispatchTaskRequest> req, const DAGProperties & properties, std::vector<Int64> & root_task_ids, DAGSchema & root_task_schema, String & addr)
Expand Down Expand Up @@ -222,7 +222,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro
if (call.getResp()->has_error())
throw Exception("Meet error while dispatch mpp task: " + call.getResp()->error().msg());
}
return prepareRootExchangeReceiver(context, properties, root_task_ids, root_task_schema);
return prepareRootExchangeReceiver(context, properties, root_task_ids, root_task_schema, context.getSettingsRef().enable_local_tunnel);
}

BlockInputStreamPtr executeNonMPPQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ void ExchangeReceiverBase<RPCContext>::setUpLocalConnections(std::vector<Request
LoggerPtr local_log = Logger::get(fmt::format("{} {}", exc_log->identifier(), req_info));

LocalRequestHandler local_request_handler(
getMemoryTracker(),
[this, log = local_log](bool meet_error, const String & local_err_msg) {
this->connectionDone(meet_error, local_err_msg, log);
},
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Mpp/LocalRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ namespace DB
struct LocalRequestHandler
{
LocalRequestHandler(
MemoryTracker * recv_mem_tracker_,
std::function<void(bool, const String &)> && notify_write_done_,
std::function<void()> && notify_close_,
std::function<void()> && add_local_conn_num_,
ReceiverChannelWriter && channel_writer_)
: notify_write_done(std::move(notify_write_done_))
: recv_mem_tracker(recv_mem_tracker_)
, notify_write_done(std::move(notify_write_done_))
, notify_close(std::move(notify_close_))
, add_local_conn_num(std::move(add_local_conn_num_))
, channel_writer(std::move(channel_writer_))
Expand Down Expand Up @@ -73,6 +75,7 @@ struct LocalRequestHandler
return waiting_task_time;
}

MemoryTracker * recv_mem_tracker;
std::function<void(bool, const String &)> notify_write_done;
std::function<void()> notify_close;
std::function<void()> add_local_conn_num;
Expand Down
44 changes: 10 additions & 34 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ void MPPTaskMonitorHelper::initAndAddself(MPPTaskManager * manager_, const Strin
MPPTaskMonitorHelper::~MPPTaskMonitorHelper()
{
if (initialized)
{
manager->removeMonitoredTask(task_unique_id);
}
}

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
Expand All @@ -121,9 +119,8 @@ MPPTask::~MPPTask()
{
/// MPPTask maybe destructed by different thread, set the query memory_tracker
/// to current_memory_tracker in the destructor
auto * query_memory_tracker = getMemoryTracker();
if (query_memory_tracker != nullptr && current_memory_tracker != query_memory_tracker)
current_memory_tracker = query_memory_tracker;
if (process_list_entry != nullptr && current_memory_tracker != process_list_entry->get().getMemoryTrackerPtr().get())
current_memory_tracker = process_list_entry->get().getMemoryTrackerPtr().get();
abortTunnels("", true);
LOG_INFO(log, "finish MPPTask: {}", id.toString());
}
Expand Down Expand Up @@ -190,8 +187,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
if (unlikely(!task_meta.ParseFromString(exchange_sender.encoded_task_meta(i))))
throw TiFlashException("Failed to decode task meta info in ExchangeSender", Errors::Coprocessor::BadRequest);

/// when the receiver task is root task, it should never be local tunnel
bool is_local = context->getSettingsRef().enable_local_tunnel && task_meta.task_id() != -1 && meta.address() == task_meta.address();
bool is_local = context->getSettingsRef().enable_local_tunnel && meta.address() == task_meta.address();
bool is_async = !is_local && context->getSettingsRef().enable_async_server;
MPPTunnelPtr tunnel = std::make_shared<MPPTunnel>(
task_meta,
Expand Down Expand Up @@ -297,13 +293,6 @@ void MPPTask::setErrString(const String & message)
err_string = message;
}

MemoryTracker * MPPTask::getMemoryTracker() const
{
if (process_list_entry_holder.process_list_entry != nullptr)
return process_list_entry_holder.process_list_entry->get().getMemoryTrackerPtr().get();
return nullptr;
}

void MPPTask::unregisterTask()
{
auto [result, reason] = manager->unregisterTask(id);
Expand All @@ -313,19 +302,6 @@ void MPPTask::unregisterTask()
LOG_WARNING(log, "task failed to unregister, reason: {}", reason);
}

void MPPTask::initProcessListEntry(MPPTaskManagerPtr & task_manager)
{
/// all the mpp tasks of the same mpp query shares the same process list entry
auto [query_process_list_entry, aborted_reason] = task_manager->getOrCreateQueryProcessListEntry(id.query_id, context);
if (!aborted_reason.empty())
throw TiFlashException(fmt::format("MPP query is already aborted, aborted reason: {}", aborted_reason), Errors::Coprocessor::Internal);
assert(query_process_list_entry != nullptr);
process_list_entry_holder.process_list_entry = query_process_list_entry;
dag_context->setProcessListEntry(query_process_list_entry);
context->setProcessListElement(&query_process_list_entry->get());
current_memory_tracker = getMemoryTracker();
}

void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
{
dag_req = getDAGRequestFromStringWithRetry(task_request.encoded_plan());
Expand Down Expand Up @@ -382,13 +358,13 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
dag_context->tidb_host = context->getClientInfo().current_address.toString();

context->setDAGContext(dag_context.get());

auto task_manager = tmt_context.getMPPTaskManager();
initProcessListEntry(task_manager);
process_list_entry = setProcessListElement(*context, dag_context->dummy_query_string, dag_context->dummy_ast.get());
dag_context->setProcessListEntry(process_list_entry);

injectFailPointBeforeRegisterTunnel(dag_context->isRootMPPTask());
registerTunnels(task_request);

auto task_manager = tmt_context.getMPPTaskManager();
LOG_DEBUG(log, "begin to register the task {}", id.toString());

injectFailPointBeforeRegisterMPPTask(dag_context->isRootMPPTask());
Expand Down Expand Up @@ -429,7 +405,7 @@ void MPPTask::preprocess()
void MPPTask::runImpl()
{
CPUAffinityManager::getInstance().bindSelfQueryThread();
RUNTIME_ASSERT(current_memory_tracker == getMemoryTracker(), log, "The current memory tracker is not set correctly for MPPTask::runImpl");
RUNTIME_ASSERT(current_memory_tracker == process_list_entry->get().getMemoryTrackerPtr().get(), log, "The current memory tracker is not set correctly for MPPTask::runImpl");
if (!switchStatus(INITIALIZING, RUNNING))
{
LOG_WARNING(log, "task not in initializing state, skip running");
Expand Down Expand Up @@ -532,9 +508,9 @@ void MPPTask::runImpl()
// todo when error happens, should try to update the metrics if it is available
if (auto throughput = dag_context->getTableScanThroughput(); throughput.first)
GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second);
/// note that memory_tracker is shared by all the mpp tasks, the peak memory usage is not accurate
/// todo log executor level peak memory usage instead
auto peak_memory = getMemoryTracker()->getPeak();
auto process_info = context->getProcessListElement()->getInfo();
auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0;
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory);
mpp_task_statistics.setMemoryPeak(peak_memory);
}
}
Expand Down
19 changes: 2 additions & 17 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
namespace DB
{
class MPPTaskManager;
using MPPTaskManagerPtr = std::shared_ptr<MPPTaskManager>;
class DAGContext;
class ProcessListEntry;

Expand Down Expand Up @@ -124,27 +123,12 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void registerTunnels(const mpp::DispatchTaskRequest & task_request);

void initProcessListEntry(MPPTaskManagerPtr & task_manager);

void initExchangeReceivers();

String getErrString() const;
void setErrString(const String & message);

MemoryTracker * getMemoryTracker() const;

private:
struct ProcessListEntryHolder
{
std::shared_ptr<ProcessListEntry> process_list_entry;
~ProcessListEntryHolder()
{
/// Because MemoryTracker is now saved in `MPPQueryTaskSet` and shared by all the mpp tasks belongs to the same mpp query,
/// it may not be destructed when MPPTask is destructed, so need to manually reset current_memory_tracker to nullptr at the
/// end of the destructor of MPPTask, otherwise, current_memory_tracker may point to a invalid memory tracker
current_memory_tracker = nullptr;
}
};
// We must ensure this member variable is put at this place to be destructed at proper time
MPPTaskMonitorHelper mpp_task_monitor_helper;

Expand All @@ -160,11 +144,12 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

MPPTaskScheduleEntry schedule_entry;

ProcessListEntryHolder process_list_entry_holder;
// `dag_context` holds inputstreams which could hold ref to `context` so it should be destructed
// before `context`.
std::unique_ptr<DAGContext> dag_context;

std::shared_ptr<ProcessListEntry> process_list_entry;

QueryExecutorHolder query_executor_holder;

std::atomic<TaskStatus> status{INITIALIZING};
Expand Down
42 changes: 6 additions & 36 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPTask.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <fmt/core.h>

#include <magic_enum.hpp>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
Expand All @@ -37,15 +34,6 @@ extern const char random_task_manager_find_task_failure_failpoint[];
extern const char pause_before_register_non_root_mpp_task[];
} // namespace FailPoints

MPPQueryTaskSet::~MPPQueryTaskSet()
{
if likely (process_list_entry != nullptr)
{
auto peak_memory = process_list_entry->get().getMemoryTrackerPtr()->getPeak();
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_query).Observe(peak_memory);
}
}

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)
Expand Down Expand Up @@ -247,13 +235,14 @@ std::pair<bool, String> MPPTaskManager::registerTask(MPPTaskPtr task)
{
return {false, fmt::format("query is being aborted, error message = {}", error_msg)};
}
/// query_set must not be nullptr if the current query is not aborted since MPPTask::initProcessListEntry
/// will always create the query_set
RUNTIME_CHECK_MSG(query_set != nullptr, "query set must not be null when register task");
if (query_set->task_map.find(task->id) != query_set->task_map.end())
if (query_set != nullptr && query_set->task_map.find(task->id) != query_set->task_map.end())
{
return {false, "task has been registered"};
}
if (query_set == nullptr) /// the first one
{
query_set = addMPPQueryTaskSet(task->id.query_id);
}
query_set->task_map.emplace(task->id, task);
/// cancel all the alarm waiting on this task
auto alarm_it = query_set->alarms.find(task->id.task_id);
Expand Down Expand Up @@ -304,25 +293,6 @@ String MPPTaskManager::toString()
return res + ")";
}

std::pair<std::shared_ptr<ProcessListEntry>, String> MPPTaskManager::getOrCreateQueryProcessListEntry(const MPPQueryId & query_id, const ContextPtr & context)
{
std::lock_guard lock(mu);
auto [query_set, abort_reason] = getQueryTaskSetWithoutLock(query_id);
if (!abort_reason.empty())
return {nullptr, abort_reason};
if (query_set == nullptr)
query_set = addMPPQueryTaskSet(query_id);
if (query_set->process_list_entry == nullptr)
{
query_set->process_list_entry = setProcessListElement(
*context,
context->getDAGContext()->dummy_query_string,
context->getDAGContext()->dummy_ast.get(),
true);
}
return {query_set->process_list_entry, ""};
}

std::pair<MPPQueryTaskSetPtr, String> MPPTaskManager::getQueryTaskSetWithoutLock(const MPPQueryId & query_id)
{
auto it = mpp_query_map.find(query_id);
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ struct MPPQueryTaskSet
/// task can only be registered state is Normal
State state = Normal;
String error_message;
std::shared_ptr<ProcessListEntry> process_list_entry;
MPPTaskMap task_map;
std::unordered_map<Int64, std::unordered_map<Int64, grpc::Alarm>> alarms;
/// only used in scheduler
Expand All @@ -53,7 +52,6 @@ struct MPPQueryTaskSet
{
return state == Normal || state == Aborted;
}
~MPPQueryTaskSet();
};

/// A simple thread unsafe FIFO cache used to fix the "lost cancel" issues
Expand Down Expand Up @@ -195,8 +193,6 @@ class MPPTaskManager : private boost::noncopyable

void abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type);

std::pair<std::shared_ptr<ProcessListEntry>, String> getOrCreateQueryProcessListEntry(const MPPQueryId & query_id, const ContextPtr & context);

String toString();

private:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ std::shared_ptr<DB::TrackedMppDataPacket> LocalTunnelSenderV1::readForLocal()
if (result == MPMCQueueResult::OK)
{
MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong());

// switch tunnel's memory tracker into receiver's
res->switchMemTracker(current_memory_tracker);
return res;
}
else if (result == MPMCQueueResult::CANCELLED)
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ class LocalTunnelSenderV2 : public TunnelSender
if (unlikely(checkPacketErr(data)))
return false;

// receiver_mem_tracker pointer will always be valid because ExchangeReceiverBase won't be destructed
// before all local tunnels are destructed so that the MPPTask which contains ExchangeReceiverBase and
// is responsible for deleting receiver_mem_tracker must be destroyed after these local tunnels.
data->switchMemTracker(local_request_handler.recv_mem_tracker);

// When ExchangeReceiver receives data from local and remote tiflash, number of local tunnel threads
// is very large and causes the time of transfering data by grpc threads becomes longer, because
// grpc thread is hard to get chance to push data into MPMCQueue in ExchangeReceiver.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/TrackedMppDataPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct MemTrackerWrapper

struct TrackedMppDataPacket
{
TrackedMppDataPacket(const mpp::MPPDataPacket & data, MemoryTracker * memory_tracker)
explicit TrackedMppDataPacket(const mpp::MPPDataPacket & data, MemoryTracker * memory_tracker)
: mem_tracker_wrapper(estimateAllocatedSize(data), memory_tracker)
{
packet = data;
Expand Down
Loading

0 comments on commit a57ec6d

Please sign in to comment.