Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ grpc::Status FlashService::IsAlive(
return check_result;

auto & tmt_context = context->getTMTContext();
response->set_available(tmt_context.checkRunning());
response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable());
response->set_mpp_version(DB::GetMppVersion());
return grpc::Status::OK;
}
Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,36 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
return ptr;
}

void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
{
auto start = std::chrono::steady_clock::now();
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
// The default value of flash.graceful_wait_before_shutdown
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600;
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown);
auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown);
while (true)
{
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
std::this_thread::sleep_for(std::chrono::milliseconds(200));
{
std::unique_lock lock(mu);
if (monitored_tasks.empty())
break;
}
auto current_time = std::chrono::steady_clock::now();
if (current_time >= max_wait_time)
{
LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}s", graceful_wait_before_shutdown);
break;
}
}
}

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ class MPPTaskManager : private boost::noncopyable

std::shared_ptr<MPPTaskMonitor> monitor;

std::atomic<bool> is_available{true};

public:
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);

Expand Down Expand Up @@ -273,6 +275,9 @@ class MPPTaskManager : private boost::noncopyable

bool isTaskExists(const MPPTaskId & id);

void setUnavailable() { is_available = false; }
bool isAvailable() { return is_available; }

private:
MPPQueryPtr addMPPQuery(
const MPPQueryId & query_id,
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,10 @@ try
LOG_INFO(log, "Start to wait for terminal signal");
waitForTerminationRequest();

LOG_INFO(log, "Set unavailable for MPPTask");
tmt_context.getMPPTaskManager()->setUnavailable();
tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);

{
// Set limiters stopping and wakeup threads in waitting queue.
global_context->getIORateLimiter().setStop();
Expand Down