Skip to content

Commit

Permalink
Add more information to the error message in MPPTunnel (#6787)
Browse files Browse the repository at this point in the history
ref #6233
  • Loading branch information
windtalker authored Feb 9, 2023
1 parent 636398d commit 2480a1d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 30 deletions.
31 changes: 21 additions & 10 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
const LoggerPtr & log)
{
Int32 copy_live_connections;
String first_err_msg = local_err_msg;
{
std::lock_guard lock(mu);

Expand All @@ -843,29 +844,39 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
state = ExchangeReceiverState::ERROR;
if (err_msg.empty())
err_msg = local_err_msg;
else
first_err_msg = err_msg;
}

copy_live_connections = --live_connections;
}

LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_connections);

if (meet_error)
{
LOG_WARNING(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_connections);
}
else
{
LOG_DEBUG(
log,
"connection end. Current alive connections: {}",
copy_live_connections);
}
assert(copy_live_connections >= 0);
if (copy_live_connections == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
cv.notify_all();
}
else if (copy_live_connections < 0)
throw Exception("alive_connection_num should not be less than 0!");

if (meet_error || copy_live_connections == 0)
{
LOG_INFO(exc_log, "receiver channels finished");
LOG_INFO(exc_log, "receiver channels finished, meet error: {}, error message: {}", meet_error, first_err_msg);
finishAllMsgChannels();
}
}
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data)
{
std::unique_lock lk(mu);
waitUntilConnectedOrFinished(lk);
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel which is already closed.");
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
}

auto pushed_data_size = data->getPacket().ByteSizeLong();
Expand All @@ -167,7 +167,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data)
updateConnProfileInfo(pushed_data_size);
return;
}
throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : ""));
throw Exception(fmt::format("write to tunnel {} which is already closed, {}", tunnel_id, tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : ""));
}

/// done normally and being called exactly once after writing all packets
Expand All @@ -179,7 +179,7 @@ void MPPTunnel::writeDone()
/// make sure to finish the tunnel after it is connected
waitUntilConnectedOrFinished(lk);
if (tunnel_sender == nullptr)
throw Exception(fmt::format("write to tunnel which is already closed."));
throw Exception(fmt::format("write to tunnel {} which is already closed.", tunnel_id));
}
tunnel_sender->finish();
waitForSenderFinish(/*allow_throw=*/true);
Expand Down Expand Up @@ -208,8 +208,8 @@ void MPPTunnel::connectLocalV2(size_t source_index, LocalRequestHandler & local_
{
{
std::unique_lock lk(mu);
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel has connected or finished: {}", statusToString()));
RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "This should be a local tunnel");
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()));
RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "{} should be a local tunnel", tunnel_id);

LOG_TRACE(log, "ready to connect local tunnel version 2");
if (is_fine_grained)
Expand All @@ -233,7 +233,7 @@ void MPPTunnel::connectAsync(IAsyncCallData * call_data)
{
{
std::unique_lock lk(mu);
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel has connected or finished: {}", statusToString());
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString());

LOG_TRACE(log, "ready to connect async");
RUNTIME_ASSERT(mode == TunnelSenderMode::ASYNC_GRPC, log, "mode {} is not async grpc in connectAsync", magic_enum::enum_name(mode));
Expand Down Expand Up @@ -283,7 +283,7 @@ void MPPTunnel::waitForSenderFinish(bool allow_throw)
status = TunnelStatus::Finished;
}
if (allow_throw && !err_msg.empty())
throw Exception("Consumer exits unexpected, " + err_msg);
throw Exception(fmt::format("{}: consumer exits unexpected, error message: {} ", tunnel_id, err_msg));
LOG_TRACE(log, "end wait for consumer finish!");
}

Expand Down Expand Up @@ -311,7 +311,7 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock<std::mutex> & lk)
LOG_TRACE(log, "end waitUntilConnectedOrFinished");
}
if (status == TunnelStatus::Unconnected)
throw Exception("MPPTunnel can not be connected because MPPTask is cancelled");
throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id));
}

StringRef MPPTunnel::statusToString()
Expand Down Expand Up @@ -399,7 +399,7 @@ void MPPTunnel::connectLocalV1(PacketWriter * writer)
{
std::unique_lock lk(mu);
if (status != TunnelStatus::Unconnected)
throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString()));
throw Exception(fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()));

LOG_TRACE(log, "ready to connect local tunnel version 1");

Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ TEST_F(TestMPPTunnel, SyncWriteAfterUnconnectFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel 0000_0001 which is already closed.");
}
}

Expand All @@ -392,7 +392,7 @@ TEST_F(TestMPPTunnel, SyncWriteDoneAfterUnconnectFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed.");
}
}

Expand Down Expand Up @@ -460,7 +460,7 @@ TEST_F(TestMPPTunnel, SyncWriteError)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed.");
GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: 0000_0001 meet error: grpc writes failed. ");
}
}

Expand All @@ -481,7 +481,7 @@ TEST_F(TestMPPTunnel, SyncWriteAfterFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed, ");
}
if (mpp_tunnel_ptr != nullptr)
mpp_tunnel_ptr->waitForFinish();
Expand Down Expand Up @@ -573,7 +573,7 @@ TEST_F(TestMPPTunnel, AsyncWriteError)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed.");
GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: 0000_0001 meet error: grpc writes failed. ");
}
}

Expand Down Expand Up @@ -660,7 +660,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel has connected or finished: Finished");
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel 0000_0001 has connected or finished: Finished");
}

TEST_F(TestMPPTunnel, LocalConnectWhenConnected)
Expand All @@ -679,7 +679,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel has connected or finished: Connected");
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel 0000_0001 has connected or finished: Connected");
}

TEST_F(TestMPPTunnel, LocalCloseBeforeConnect)
Expand Down Expand Up @@ -713,7 +713,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel 0000_0001 which is already closed.");
}

TEST_F(TestMPPTunnel, LocalWriteDoneAfterUnconnectFinished)
Expand All @@ -726,7 +726,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed.");
}

TEST_F(TestMPPTunnel, LocalWriteError)
Expand Down Expand Up @@ -756,7 +756,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, err");
GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: err ");
}

TEST_F(TestMPPTunnel, LocalWriteAfterFinished)
Expand All @@ -775,7 +775,7 @@ TEST_F(TestMPPTunnel, LocalWriteAfterFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed, ");
}
if (tunnel != nullptr)
tunnel->waitForFinish();
Expand Down

0 comments on commit 2480a1d

Please sign in to comment.