Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
return ptr;
}

void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
UInt64 MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
{
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
Expand All @@ -94,6 +94,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & glob
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown);
UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000;
Stopwatch watch;
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -105,13 +106,14 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & glob
if (monitored_tasks.empty())
{
LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms);
break;
return graceful_wait_before_shutdown_ms > elapsed_ms ? graceful_wait_before_shutdown_ms - elapsed_ms
: 0;
}
}
if (elapsed_ms >= graceful_wait_before_shutdown * 1000)
if (elapsed_ms >= graceful_wait_before_shutdown_ms)
{
LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms);
break;
LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms);
return 0;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ struct MPPTaskMonitor
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
}

void waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context);
UInt64 waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context);

std::mutex mu;
std::condition_variable cv;
Expand Down
39 changes: 23 additions & 16 deletions dbms/src/Server/FlashGrpcServerHolder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
#include <Interpreters/Context.h>
#include <Server/FlashGrpcServerHolder.h>

#include <chrono>

// In order to include grpc::SecureServerCredentials which used in
// sslServerCredentialsWithFetcher()
// We implement sslServerCredentialsWithFetcher() to set config fetcher
// to auto reload sslServerCredentials
#include "../../contrib/grpc/src/cpp/server/secure_server_credentials.h"
#include "common/logger_useful.h"

namespace DB
{
Expand Down Expand Up @@ -52,7 +55,7 @@ void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log)
// tells us whether there is any kind of event or cq is shutting down.
if (!curcq->Next(&tag, &ok))
{
LOG_INFO(log, "CQ is fully drained and shut down");
LOG_DEBUG(log, "CQ is fully drained and shut down");
break;
}
GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment();
Expand Down Expand Up @@ -220,21 +223,25 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
{
/// Shut down grpc server.
LOG_INFO(log, "Begin to shut down flash grpc server");
flash_grpc_server->Shutdown();
Stopwatch watch;
while (true)
{
auto elapsed_ms = watch.elapsedMilliseconds();
if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0)
{
LOG_INFO(log, "All grpc connections have finished after {}ms", elapsed_ms);
break;
}
if (elapsed_ms >= grpc_shutdown_max_wait_ms)
{
LOG_WARNING(log, "Timed out waiting for grpc connections to finish after {}ms", elapsed_ms);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}

*is_shutdown = true;
// Wait all existed MPPTunnels done to prevent crash.
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
const int max_wait_cnt = 300;
int wait_cnt = 0;
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
std::this_thread::sleep_for(std::chrono::seconds(1));
if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1)
LOG_WARNING(
log,
"Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak",
wait_cnt);
else
LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt);
flash_grpc_server->Shutdown();

for (auto & cq : cqs)
cq->Shutdown();
Expand All @@ -252,7 +259,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
GRPCCompletionQueuePool::global_instance->markShutdown();

GRPCCompletionQueuePool::global_instance = nullptr;
LOG_INFO(log, "Shut down flash grpc server");
LOG_INFO(log, "Shut down flash grpc server after {}ms", watch.elapsedMilliseconds());

/// Close flash service.
LOG_INFO(log, "Begin to shut down flash service");
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Server/FlashGrpcServerHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class FlashGrpcServerHolder

std::unique_ptr<FlashService> & flashService();

void setMaxWaitMsDuringGRPCShutdown(UInt64 max_wait_ms)
{
// At least 10 seconds
grpc_shutdown_max_wait_ms = std::max(max_wait_ms, 10 * 1000);
}

private:
const LoggerPtr & log;
std::shared_ptr<std::atomic<bool>> is_shutdown;
Expand All @@ -56,6 +62,7 @@ class FlashGrpcServerHolder
std::vector<std::thread> cq_workers;
std::vector<std::thread> notify_cq_workers;
CollectProcInfoBackgroundTask background_task;
UInt64 grpc_shutdown_max_wait_ms = 10 * 1000;
};

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,9 @@ try
// Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully.
LOG_INFO(log, "Set unavailable for MPPTask");
tmt_context.getMPPTaskManager()->setUnavailable();
tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);
UInt64 remaining_wait_time
= tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);
flash_grpc_server_holder.setMaxWaitMsDuringGRPCShutdown(remaining_wait_time);

{
// Set limiters stopping and wakeup threads in waitting queue.
Expand Down