Skip to content

Commit

Permalink
fix bug that TiDB Kill mpp query hangs (#1679) (#1680)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Mar 29, 2021
1 parent dbebb46 commit cb69993
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 14 deletions.
5 changes: 5 additions & 0 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

String getName() const override { return name; }

void cancel(bool kill) override
{
if (kill)
remote_reader->cancel();
}
Block readImpl() override
{
if (block_queue.empty())
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class CoprocessorReader

const DAGSchema & getOutputSchema() const { return schema; }

void cancel() {}

CoprocessorReaderResult nextResult()
{
auto && [result, has_next] = resp_iter.next();
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void ExchangeReceiver::setUpConnection()

void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
{
bool meet_error = false;
try
{
auto sender_task = new mpp::TaskMeta();
Expand All @@ -66,7 +67,12 @@ void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
{
throw Exception("exchange receiver meet error : " + packet.error().msg());
}
decodePacket(packet, source_index, req_info);
if (!decodePacket(packet, source_index, req_info))
{
meet_error = true;
LOG_WARNING(log, "Decode packet meet error, exit from ReadLoop");
break;
}
}
LOG_DEBUG(log, "finish read : " << req->DebugString());
}
Expand All @@ -87,6 +93,8 @@ void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
}
std::lock_guard<std::mutex> lock(mu);
live_connections--;
if (meet_error && state == NORMAL)
state = ERROR;
cv.notify_all();
LOG_DEBUG(log, "read thread end!!! live connections: " << std::to_string(live_connections));
}
Expand Down
57 changes: 47 additions & 10 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ struct ExchangeReceiverResult
ExchangeReceiverResult() : ExchangeReceiverResult(nullptr, 0) {}
};

enum State
{
NORMAL,
ERROR,
CANCELED,
CLOSED,
};

class ExchangeReceiver
{
public:
Expand All @@ -60,28 +68,38 @@ class ExchangeReceiver
std::condition_variable cv;
std::queue<ExchangeReceiverResult> result_buffer;
Int32 live_connections;
bool meet_error;
State state;
Exception err;
Logger * log;

void setUpConnection();

void ReadLoop(const String & meta_raw, size_t source_index);

void decodePacket(const mpp::MPPDataPacket & p, size_t source_index, const String & req_info)
bool decodePacket(const mpp::MPPDataPacket & p, size_t source_index, const String & req_info)
{
bool ret = true;
std::shared_ptr<tipb::SelectResponse> resp_ptr = std::make_shared<tipb::SelectResponse>();
if (!resp_ptr->ParseFromString(p.data()))
{
resp_ptr = nullptr;
ret = false;
}
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&] { return result_buffer.size() < max_buffer_size || meet_error; });
if (resp_ptr != nullptr)
result_buffer.emplace(resp_ptr, source_index, req_info);
cv.wait(lock, [&] { return result_buffer.size() < max_buffer_size || state != NORMAL; });
if (state == NORMAL)
{
if (resp_ptr != nullptr)
result_buffer.emplace(resp_ptr, source_index, req_info);
else
result_buffer.emplace(resp_ptr, source_index, req_info, true, "Error while decoding MPPDataPacket");
}
else
result_buffer.emplace(resp_ptr, source_index, req_info, true, "Error while decoding MPPDataPacket");
{
ret = false;
}
cv.notify_all();
return ret;
}

public:
Expand All @@ -92,7 +110,7 @@ class ExchangeReceiver
task_meta(meta),
max_buffer_size(max_buffer_size_),
live_connections(0),
meet_error(false),
state(NORMAL),
log(&Logger::get("exchange_receiver"))
{
for (int i = 0; i < exc.field_types_size(); i++)
Expand All @@ -106,22 +124,41 @@ class ExchangeReceiver

~ExchangeReceiver()
{
{
std::unique_lock<std::mutex> lk(mu);
state = CLOSED;
cv.notify_all();
}
for (auto & worker : workers)
{
worker.join();
}
}

void cancel()
{
std::unique_lock<std::mutex> lk(mu);
state = CANCELED;
cv.notify_all();
}

const DAGSchema & getOutputSchema() const { return schema; }

ExchangeReceiverResult nextResult()
{
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&] { return !result_buffer.empty() || live_connections == 0 || meet_error; });
cv.wait(lk, [&] { return !result_buffer.empty() || live_connections == 0 || state != NORMAL; });
ExchangeReceiverResult result;
if (meet_error)
if (state != NORMAL)
{
result = {nullptr, 0, "ExchangeReceiver", true, err.message(), false};
String msg;
if (state == CANCELED)
msg = "query canceled";
else if (state == CLOSED)
msg = "ExchangeReceiver closed";
else
msg = err.message();
result = {nullptr, 0, "ExchangeReceiver", true, msg, false};
}
else if (result_buffer.empty())
{
Expand Down
19 changes: 16 additions & 3 deletions dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,23 @@ class MPPTaskManager : private boost::noncopyable
if (it == mpp_query_map.end())
return;
it->second.to_be_cancelled = true;
LOG_WARNING(log, "Begin cancel query: " + std::to_string(query_id));
}
for (auto & task_id : it->second.task_map)
task_id.second->cancel(reason);
LOG_WARNING(log, "Begin cancel query: " + std::to_string(query_id));
std::stringstream ss;
ss << "Remaining task in query " + std::to_string(query_id) + " are: ";

std::vector<std::thread> cancel_workers;
for (auto task_it = it->second.task_map.rbegin(); task_it != it->second.task_map.rend(); task_it++)
{
ss << task_it->first.toString() << " ";
std::thread t(&MPPTask::cancel, task_it->second, std::ref(reason));
cancel_workers.push_back(std::move(t));
}
LOG_WARNING(log, ss.str());
for (auto & worker : cancel_workers)
{
worker.join();
}
MPPQueryTaskSet canceled_task_set;
{
std::lock_guard<std::mutex> lock(mu);
Expand Down

0 comments on commit cb69993

Please sign in to comment.