Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <Interpreters/ProcessList.h>
#include <Storages/KVStore/Read/LockException.h>
#include <Storages/KVStore/Read/RegionException.h>
#include <Storages/KVStore/TMTContext.h>
#include <pingcap/Exception.h>

namespace DB
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,6 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
if (context.getDAGContext()->is_disaggregated_task)
throw;

if (tmt.checkShuttingDown())
throw TiFlashException("TiFlash server is terminating", Errors::Coprocessor::Internal);
// By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark.
// When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop.
force_retry.insert(e.unavailable_region.begin(), e.unavailable_region.end());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ grpc::Status FlashService::IsAlive(
return check_result;

auto & tmt_context = context->getTMTContext();
response->set_available(tmt_context.checkRunning());
response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable());
response->set_mpp_version(DB::GetMppVersion());
return grpc::Status::OK;
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & glob
auto start = std::chrono::steady_clock::now();
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, 60);
// The default value of flash.graceful_wait_before_shutdown
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600;
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown);
while (true)
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class MPPTaskManager : private boost::noncopyable

std::shared_ptr<MPPTaskMonitor> monitor;

std::atomic<bool> is_available{true};

public:
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);

Expand Down Expand Up @@ -275,6 +277,9 @@ class MPPTaskManager : private boost::noncopyable

bool isTaskExists(const MPPTaskId & id);

void setUnavailable() { is_available = false; }
bool isAvailable() { return is_available; }

private:
MPPQueryPtr addMPPQuery(
const MPPQueryId & query_id,
Expand Down
23 changes: 7 additions & 16 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,8 @@ try
}
}

SCOPE_EXIT({ proxy_machine.stopProxy(tmt_context); });

{
// Report the unix timestamp, git hash, release version
Poco::Timestamp ts;
Expand Down Expand Up @@ -1223,8 +1225,7 @@ try
}

/// startup grpc server to serve raft and/or flash services.
auto flash_grpc_server_holder
= std::make_unique<FlashGrpcServerHolder>(this->context(), this->config(), raft_config, log);
FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log);

SCOPE_EXIT({
// Stop LAC for AutoScaler managed CN before FlashGrpcServerHolder is destructed.
Expand All @@ -1236,6 +1237,8 @@ try
LocalAdmissionController::global_instance->safeStop();
});

proxy_machine.runKVStore(tmt_context);

try
{
// Bind CPU affinity after all threads started.
Expand All @@ -1246,24 +1249,12 @@ try
LOG_ERROR(log, "CPUAffinityManager::bindThreadCPUAffinity throws exception.");
}

// Ready to provide service
tmt_context.setStatusRunning();

LOG_INFO(log, "Start to wait for terminal signal");
waitForTerminationRequest();

LOG_INFO(log, "Set store context status Stopping");
tmt_context.setStatusStopping();

LOG_INFO(log, "Set unavailable for MPPTask");
tmt_context.getMPPTaskManager()->setUnavailable();
tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);
proxy_machine.waitAllReadIndexTasksFinish(tmt_context);

LOG_INFO(log, "Set store context status Terminated");
tmt_context.setStatusTerminated();

// Stop grpc server before proxy_machine because it depends on a DiagnosticsService which will call proxy
flash_grpc_server_holder.reset();
proxy_machine.stopProxy(tmt_context);

{
// Set limiters stopping and wakeup threads in waitting queue.
Expand Down
33 changes: 19 additions & 14 deletions dbms/src/Storages/KVStore/ProxyStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,28 +367,33 @@ struct ProxyStateMachine
}
}

/// Wait for all read index tasks to finish.
void waitAllReadIndexTasksFinish(TMTContext & tmt_context)
{
if (!proxy_conf.isProxyRunnable())
return;
if (tiflash_instance_wrap.status != EngineStoreServerStatus::Running)
{
LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen");
exit(-1);
}
// Wait until there is no read-index task.
while (tmt_context.getKVStore()->getReadIndexEvent())
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
// Set KVStore to running, so that it could handle read index requests.
void runKVStore(TMTContext & tmt_context) const { tmt_context.setStatusRunning(); }

/// Stop all services in TMTContext and ReadIndexWorkers.
/// Then, inform proxy to stop by setting `tiflash_instance_wrap.status`.
void stopProxy(TMTContext & tmt_context)
{
if (!proxy_conf.isProxyRunnable())
{
tmt_context.setStatusTerminated();
return;
}
if (proxy_conf.isProxyRunnable() && tiflash_instance_wrap.status != EngineStoreServerStatus::Running)
{
LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen");
exit(-1);
}
LOG_INFO(log, "Set store context status Stopping");
tmt_context.setStatusStopping();
{
// Wait until there is no read-index task.
while (tmt_context.getKVStore()->getReadIndexEvent())
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
tmt_context.setStatusTerminated();
tmt_context.getKVStore()->stopReadIndexWorkers();
LOG_INFO(log, "Set store context status Terminated");
{
// update status and let proxy stop all services except encryption.
tiflash_instance_wrap.status = EngineStoreServerStatus::Stopping;
Expand Down