From 45a8da6c0f85dabb48000696c8849ef1ab326fd2 Mon Sep 17 00:00:00 2001 From: qihongcheng Date: Tue, 22 Oct 2024 10:32:45 +0800 Subject: [PATCH] feat:add transfer Leader cmd --- src/clients/meta/MetaClient.cpp | 20 ++ src/clients/meta/MetaClient.h | 4 + src/common/base/ObjectPool.h | 14 +- src/graph/executor/CMakeLists.txt | 1 + src/graph/executor/Executor.cpp | 4 + .../executor/admin/TransferLeaderExecutor.cpp | 47 ++++ .../executor/admin/TransferLeaderExecutor.h | 27 +++ src/graph/planner/plan/Admin.h | 36 ++++ src/graph/planner/plan/PlanNode.cpp | 2 + src/graph/planner/plan/PlanNode.h | 1 + src/graph/service/PermissionCheck.cpp | 3 +- src/graph/validator/AdminValidator.cpp | 15 ++ src/graph/validator/AdminValidator.h | 14 ++ src/graph/validator/Validator.cpp | 2 + src/interface/meta.thrift | 9 + src/meta/CMakeLists.txt | 2 + src/meta/MetaServiceHandler.cpp | 9 + src/meta/MetaServiceHandler.h | 2 + src/meta/processors/job/LeaderTransfer.cpp | 202 ++++++++++++++++++ .../job/LeaderTransferProcessor.cpp | 15 ++ .../processors/job/LeaderTransferProcessor.h | 53 +++++ src/parser/AdminSentences.cpp | 7 + src/parser/AdminSentences.h | 23 ++ src/parser/Sentence.h | 1 + src/parser/parser.yy | 21 +- src/parser/scanner.lex | 1 + 26 files changed, 526 insertions(+), 9 deletions(-) create mode 100644 src/graph/executor/admin/TransferLeaderExecutor.cpp create mode 100644 src/graph/executor/admin/TransferLeaderExecutor.h create mode 100644 src/meta/processors/job/LeaderTransfer.cpp create mode 100644 src/meta/processors/job/LeaderTransferProcessor.cpp create mode 100644 src/meta/processors/job/LeaderTransferProcessor.h diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index df4e3dc91f..11f9969598 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -2840,6 +2840,26 @@ folly::Future>> MetaClient::getUserRoles(st return future; } +folly::Future> MetaClient::transferLeader(const HostAddr& host, + GraphSpaceID spaceId, + int32_t concurrency) { + cpp2::LeaderTransferReq req; + req.space_id_ref() = spaceId; + req.host_ref() = host; + req.concurrency_ref() = concurrency; + folly::Promise> promise; + auto future = promise.getFuture(); + getResponse( + std::move(req), + [](auto client, auto request) { return client->future_leaderTransfer(request); }, + [](cpp2::ExecResp&& resp) -> bool { + return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + }, + std::move(promise)); + return future; +} + + folly::Future> MetaClient::regConfig(const std::vector& items) { memory::MemoryCheckOffGuard g; cpp2::RegConfigReq req; diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 471804df3a..a79a9c3a5d 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -411,6 +411,10 @@ class MetaClient : public BaseMetaClient { folly::Future>> getUserRoles(std::string account); + folly::Future> transferLeader(const HostAddr& host, + GraphSpaceID spaceId, + int32_t concurrency); + // Operations for config folly::Future> regConfig(const std::vector& items); diff --git a/src/common/base/ObjectPool.h b/src/common/base/ObjectPool.h index ce0f420c36..3fd78cceaa 100644 --- a/src/common/base/ObjectPool.h +++ b/src/common/base/ObjectPool.h @@ -51,6 +51,13 @@ class ObjectPool final : private boost::noncopyable, private cpp::NonMovable { return objects_.empty(); } + template + T *add(T *obj) { + SLGuard g(lock_); + objects_.emplace_back(obj); + return obj; + } + private: // Holder the ownership of the any object class OwnershipHolder { @@ -68,13 +75,6 @@ class ObjectPool final : private boost::noncopyable, private cpp::NonMovable { std::function deleteFn_; }; - template - T *add(T *obj) { - SLGuard g(lock_); - objects_.emplace_back(obj); - return obj; - } - std::list objects_; Arena arena_; diff --git a/src/graph/executor/CMakeLists.txt b/src/graph/executor/CMakeLists.txt index d99fbc27ed..aa263691a8 100644 --- a/src/graph/executor/CMakeLists.txt +++ b/src/graph/executor/CMakeLists.txt @@ -84,6 +84,7 @@ nebula_add_library( admin/SessionExecutor.cpp admin/ShowQueriesExecutor.cpp admin/KillQueryExecutor.cpp + admin/TransferLeaderExecutor.cpp maintain/TagExecutor.cpp maintain/TagIndexExecutor.cpp maintain/EdgeExecutor.cpp diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index b6ae9f242d..5acccd2ffa 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -45,6 +45,7 @@ #include "graph/executor/admin/SwitchSpaceExecutor.h" #include "graph/executor/admin/UpdateUserExecutor.h" #include "graph/executor/admin/ZoneExecutor.h" +#include "graph/executor/admin/TransferLeaderExecutor.h" #include "graph/executor/algo/AllPathsExecutor.h" #include "graph/executor/algo/BFSShortestPathExecutor.h" #include "graph/executor/algo/CartesianProductExecutor.h" @@ -452,6 +453,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kShowHosts: { return pool->makeAndAdd(node, qctx); } + case PlanNode::Kind::kTransferLeader: { + return pool->add(new TransferLeaderExecutor(node, qctx)); + } case PlanNode::Kind::kShowMetaLeader: { return pool->makeAndAdd(node, qctx); } diff --git a/src/graph/executor/admin/TransferLeaderExecutor.cpp b/src/graph/executor/admin/TransferLeaderExecutor.cpp new file mode 100644 index 0000000000..679f79ef8f --- /dev/null +++ b/src/graph/executor/admin/TransferLeaderExecutor.cpp @@ -0,0 +1,47 @@ +// +// Created by fujie on 24-10-21. +// + +#include "TransferLeaderExecutor.h" + +#include "clients/meta/MetaClient.h" +#include "common/time/ScopedTimer.h" +#include "graph/context/QueryContext.h" +#include "graph/planner/plan/Admin.h" +#include "graph/planner/plan/Query.h" +#include "graph/service/PermissionManager.h" +#include "interface/gen-cpp2/meta_types.h" + +namespace nebula { +namespace graph { + +folly::Future TransferLeaderExecutor::execute() { + SCOPED_TIMER(&execTime_); + return transferLeader(); +} + +folly::Future TransferLeaderExecutor::transferLeader() { + auto *transferLeaderNode = asNode(node()); + auto spaceName = transferLeaderNode->spaceName(); + auto spaceIdResult = qctx()->getMetaClient()->getSpaceIdByNameFromCache(spaceName); + NG_RETURN_IF_ERROR(spaceIdResult); + auto spaceId = spaceIdResult.value(); + auto *session = qctx_->rctx()->session(); + NG_RETURN_IF_ERROR(PermissionManager::canWriteSpace(session)); + + return qctx() + ->getMetaClient() + ->transferLeader(transferLeaderNode->address(), spaceId, transferLeaderNode->concurrency()) + .via(runner()) + .thenValue([this](StatusOr resp) { + SCOPED_TIMER(&execTime_); + NG_RETURN_IF_ERROR(resp); + if (!resp.value()) { + return Status::Error("Transfer leader failed!"); + } + return Status::OK(); + }); +} + +} // namespace graph +} // namespace nebula \ No newline at end of file diff --git a/src/graph/executor/admin/TransferLeaderExecutor.h b/src/graph/executor/admin/TransferLeaderExecutor.h new file mode 100644 index 0000000000..44f8299b93 --- /dev/null +++ b/src/graph/executor/admin/TransferLeaderExecutor.h @@ -0,0 +1,27 @@ +// +// Created by fujie on 24-10-21. +// + +#ifndef NEBULA_GRAPH_TRANSFERLEADEREXECUTOR_H +#define NEBULA_GRAPH_TRANSFERLEADEREXECUTOR_H + +#include "graph/executor/Executor.h" + +namespace nebula { +namespace graph { + +class TransferLeaderExecutor final : public Executor { + public: + TransferLeaderExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("TransferLeaderExecutor", node, qctx) {} + + folly::Future execute() override; + + private: + folly::Future transferLeader(); +}; + +} // namespace graph +} // namespace nebula + +#endif // NEBULA_GRAPH_TRANSFERLEADEREXECUTOR_H diff --git a/src/graph/planner/plan/Admin.h b/src/graph/planner/plan/Admin.h index dc9a2407f5..41f9d0416c 100644 --- a/src/graph/planner/plan/Admin.h +++ b/src/graph/planner/plan/Admin.h @@ -939,6 +939,42 @@ class SubmitJob final : public SingleDependencyNode { const std::vector params_; }; +class TransferLeader final : public SingleDependencyNode { + public: + static TransferLeader* make(QueryContext* qctx, + PlanNode* input, + HostAddr address, + std::string spaceName, + int32_t concurrency) { + return qctx->objPool()->add( + new TransferLeader(qctx, input, std::move(address), std::move(spaceName), concurrency)); + } + + const std::string& spaceName() const { return spaceName_; } + + const HostAddr& address() const { return address_; } + + int32_t concurrency() const { return concurrency_; } + + private: + TransferLeader(QueryContext* qctx, + PlanNode* input, + HostAddr address, + std::string spaceName, + int32_t concurrency) + : SingleDependencyNode(qctx, Kind::kTransferLeader, input) { + spaceName_ = std::move(spaceName); + address_ = std::move(address); + concurrency_ = concurrency; + } + + private: + std::string spaceName_; + HostAddr address_; + int32_t concurrency_; +}; + + class ShowCharset final : public SingleDependencyNode { public: static ShowCharset* make(QueryContext* qctx, PlanNode* input) { diff --git a/src/graph/planner/plan/PlanNode.cpp b/src/graph/planner/plan/PlanNode.cpp index f459649815..45a21b3108 100644 --- a/src/graph/planner/plan/PlanNode.cpp +++ b/src/graph/planner/plan/PlanNode.cpp @@ -249,6 +249,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "AddHosts"; case Kind::kDropHosts: return "DropHosts"; + case Kind::kTransferLeader: + return "TransferLeader"; // Zone case Kind::kMergeZone: return "MergeZone"; diff --git a/src/graph/planner/plan/PlanNode.h b/src/graph/planner/plan/PlanNode.h index f327b69b9d..509d63ace2 100644 --- a/src/graph/planner/plan/PlanNode.h +++ b/src/graph/planner/plan/PlanNode.h @@ -168,6 +168,7 @@ class PlanNode { kDivideZone, kAddHosts, kDropHosts, + kTransferLeader, kDescribeZone, kAddHostsIntoZone, diff --git a/src/graph/service/PermissionCheck.cpp b/src/graph/service/PermissionCheck.cpp index db5b9fd834..ff98d61b69 100644 --- a/src/graph/service/PermissionCheck.cpp +++ b/src/graph/service/PermissionCheck.cpp @@ -18,7 +18,7 @@ namespace graph { * kAlterEdge, kDropTag, kDropEdge, * kCreateTagIndex, kCreateEdgeIndex, kDropTagIndex, * kDropEdgeIndex, Read user : Write user : kCreateUser, kDropUser, kAlterUser, - * Write role : kGrant, kRevoke, + * Write role : kGrant, kRevoke, kTransferLeader * Read data : kGo , kSet, kPipe, kMatch, kAssignment, kLookup, * kYield, kOrderBy, kFetchVertices, kFind * kFetchEdges, kFindPath, kLimit, KGroupBy, kReturn @@ -65,6 +65,7 @@ namespace graph { case Sentence::Kind::kDescribeZone: case Sentence::Kind::kListZones: case Sentence::Kind::kAddHostsIntoZone: + case Sentence::Kind::kTransferLeader: case Sentence::Kind::kShowConfigs: case Sentence::Kind::kSetConfig: case Sentence::Kind::kGetConfig: diff --git a/src/graph/validator/AdminValidator.cpp b/src/graph/validator/AdminValidator.cpp index 4af794f31e..e534a0d022 100644 --- a/src/graph/validator/AdminValidator.cpp +++ b/src/graph/validator/AdminValidator.cpp @@ -712,5 +712,20 @@ Status KillQueryValidator::toPlan() { tail_ = root_; return Status::OK(); } + +Status TransferLeaderValidator::validateImpl() { return Status::OK(); } + +Status TransferLeaderValidator::toPlan() { + auto sentence = static_cast(sentence_); + auto *node = TransferLeader::make(qctx_, + nullptr, + *sentence->address(), + *sentence->spaceName(), + sentence->concurrency()); + root_ = node; + tail_ = root_; + return Status::OK(); +} + } // namespace graph } // namespace nebula diff --git a/src/graph/validator/AdminValidator.h b/src/graph/validator/AdminValidator.h index 9aca1d728a..f94db2c055 100644 --- a/src/graph/validator/AdminValidator.h +++ b/src/graph/validator/AdminValidator.h @@ -429,6 +429,20 @@ class KillQueryValidator final : public Validator { Status toPlan() override; }; + +class TransferLeaderValidator final : public Validator { + public: + TransferLeaderValidator(Sentence* sentence, QueryContext* context) + : Validator(sentence, context) { + setNoSpaceRequired(); + } + + private: + Status validateImpl() override; + + Status toPlan() override; +}; + } // namespace graph } // namespace nebula #endif // GRAPH_VALIDATOR_ADMINVALIDATOR_H_ diff --git a/src/graph/validator/Validator.cpp b/src/graph/validator/Validator.cpp index 3ba99753cf..1cf46e4335 100644 --- a/src/graph/validator/Validator.cpp +++ b/src/graph/validator/Validator.cpp @@ -139,6 +139,8 @@ std::unique_ptr Validator::makeValidator(Sentence* sentence, QueryCon return std::make_unique(sentence, context); case Sentence::Kind::kDescribeUser: return std::make_unique(sentence, context); + case Sentence::Kind::kTransferLeader: + return std::make_unique(sentence, context); case Sentence::Kind::kAdminJob: case Sentence::Kind::kAdminShowJobs: return std::make_unique(sentence, context); diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index c452c68705..931f6cf6db 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -760,6 +760,13 @@ struct BalanceTask { 5: i64 stop_time, } +struct LeaderTransferReq { + 1: common.HostAddr host, + 2: common.GraphSpaceID space_id, + 3: i32 concurrency, +} + + enum ConfigModule { UNKNOWN = 0x00, ALL = 0x01, @@ -1262,6 +1269,8 @@ service MetaService { HBResp heartBeat(1: HBReq req); AgentHBResp agentHeartbeat(1: AgentHBReq req); + ExecResp leaderTransfer(1: LeaderTransferReq req); + ExecResp regConfig(1: RegConfigReq req); GetConfigResp getConfig(1: GetConfigReq req); ExecResp setConfig(1: SetConfigReq req); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index cbba10e2b6..952a1b8d1a 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -53,6 +53,8 @@ nebula_add_library( processors/admin/GetMetaDirInfoProcessor.cpp processors/admin/VerifyClientVersionProcessor.cpp processors/admin/SaveGraphVersionProcessor.cpp + processors/job/LeaderTransferProcessor.cpp + processors/job/LeaderTransfer.cpp processors/config/RegConfigProcessor.cpp processors/config/GetConfigProcessor.cpp processors/config/ListConfigsProcessor.cpp diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 4a43bd086d..c46178757f 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -70,6 +70,7 @@ #include "meta/processors/zone/ListZonesProcessor.h" #include "meta/processors/zone/MergeZoneProcessor.h" #include "meta/processors/zone/RenameZoneProcessor.h" +#include "meta/processors/job/LeaderTransferProcessor.h" #define RETURN_FUTURE(processor) \ auto f = processor->getFuture(); \ @@ -561,5 +562,13 @@ folly::Future MetaServiceHandler::future_getSegmentId( auto* processor = GetSegmentIdProcessor::instance(kvstore_); RETURN_FUTURE(processor); } + + +folly::Future MetaServiceHandler::future_leaderTransfer( + const cpp2::LeaderTransferReq& req) { + auto* processor = LeaderTransferProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index a00033ee0b..d646c20255 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -237,6 +237,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_getSegmentId( const cpp2::GetSegmentIdReq& req) override; + folly::Future future_leaderTransfer(const cpp2::LeaderTransferReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; ClusterID clusterId_{0}; diff --git a/src/meta/processors/job/LeaderTransfer.cpp b/src/meta/processors/job/LeaderTransfer.cpp new file mode 100644 index 0000000000..7073e90713 --- /dev/null +++ b/src/meta/processors/job/LeaderTransfer.cpp @@ -0,0 +1,202 @@ +// +// Created by fujie on 24-10-21. +// + +#ifndef NEBULA_GRAPH_LEADERTRANSFER_H +#define NEBULA_GRAPH_LEADERTRANSFER_H + +#include "meta/processors/job/LeaderBalanceJobExecutor.h" + +#include + +#include +#include +#include + +#include "kvstore/NebulaStore.h" +#include "meta/ActiveHostsMan.h" +#include "meta/MetaServiceUtils.h" +#include "meta/processors/Common.h" +#include "LeaderTransferProcessor.h" + + +namespace nebula::meta { +nebula::cpp2::ErrorCode LeaderTransferProcessor::buildLeaderTransferPlan( + const HostAddr &source, + HostLeaderMap *hostLeaderMap, + GraphSpaceID spaceId, + std::vector > &plan) const { + auto &leaderPartIds = (*hostLeaderMap)[source][spaceId]; + PartAllocation peersMap; + // store peers of all paritions in peerMap + { + folly::SharedMutex::ReadHolder rHolder(LockUtils::lock()); + const auto &prefix = MetaKeyUtils::partPrefix(spaceId); + std::unique_ptr iter; + auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Access kvstore failed, spaceId: " << spaceId + << ", error: " << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + + while (iter->valid()) { + PartitionID partId = MetaKeyUtils::parsePartKeyPartId(iter->key()); + auto peers = MetaKeyUtils::parsePartVal(iter->val()); + peersMap[partId] = std::move(peers); + iter->next(); + } + } + + auto activeHostsRet = ActiveHostsMan::getActiveHosts(kv_); + if (!nebula::ok(activeHostsRet)) { + auto retCode = nebula::error(activeHostsRet); + LOG(ERROR) << "Get active hosts failed, error: " + << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + auto activeHosts = nebula::value(activeHostsRet); + + std::unordered_map leadersMap; + // Caculate the leader number of each storage node + for (auto &spaceLeaders: *hostLeaderMap) { + auto &host = spaceLeaders.first; + if (spaceLeaders.second.find(spaceId) == spaceLeaders.second.end()) { + leadersMap[host] = 0; + } else { + leadersMap[host] = spaceLeaders.second[spaceId].size(); + } + } + for (auto partId: leaderPartIds) { + HostAddr candidate; + int32_t minNum = INT_MAX; + // Transfer the leader to the partition peer with the least leaders + for (auto &host: peersMap[partId]) { + if (host == source) { + continue; + } + auto found = std::find(activeHosts.begin(), activeHosts.end(), host); + if (found == activeHosts.end()) { + continue; + } + if (leadersMap[host] < minNum) { + minNum = leadersMap[host]; + candidate = host; + } + } + if (minNum != INT_MAX) { + plan.emplace_back(partId, candidate); + ++leadersMap[candidate]; + } else { + LOG(ERROR) << "Can't find leader candidiate for part " << partId; + continue; + } + } + + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +nebula::cpp2::ErrorCode LeaderTransferProcessor::leaderTransfer(const meta::cpp2::LeaderTransferReq &req) { + if (running_) { + LOG(INFO) << "Balance process still running"; + return nebula::cpp2::ErrorCode::E_BALANCER_RUNNING; + } + + const auto &source = req.get_host(); + auto activeHostRet = ActiveHostsMan::isLived(kv_, source); + if (!nebula::ok(activeHostRet)) { + auto code = nebula::error(activeHostRet); + LOG(ERROR) << "Check host live failed, error: " << apache::thrift::util::enumNameSafe(code); + return code; + } else { + auto isLive = nebula::value(activeHostRet); + if (!isLive) { + LOG(ERROR) << "The host is not lived: " << source; + // no need to transfer leader for inactive host + return nebula::cpp2::ErrorCode::E_INVALID_HOST; + } + } + + auto spaceId = req.get_space_id(); + auto spaceKey = MetaKeyUtils::spaceKey(spaceId); + std::string val; + auto retCode = kv_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Get space (" << spaceId << ") failed: " + << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + auto properties = MetaKeyUtils::parseSpace(val); + + bool expected = false; + // treat leader transfer same as leader balance + if (inLeaderBalance_.compare_exchange_strong(expected, true)) { + hostLeaderMap_ = std::make_unique(); + auto leaderDistRet = client_->getLeaderDist(hostLeaderMap_.get()).get(); + if (!leaderDistRet.ok() || hostLeaderMap_->empty()) { + LOG(ERROR) << "Get leader distribution failed"; + inLeaderBalance_ = false; + return nebula::cpp2::ErrorCode::E_RPC_FAILURE; + } + if (hostLeaderMap_->find(source) == hostLeaderMap_->end() || + (*hostLeaderMap_)[source].empty() || + (*hostLeaderMap_)[source].find(spaceId) == (*hostLeaderMap_)[source].end() || + (*hostLeaderMap_)[source][spaceId].empty()) { + LOG(INFO) << "No leader found, no need to transfer leader"; + inLeaderBalance_ = false; + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + std::vector > plans; + retCode = buildLeaderTransferPlan(source, + hostLeaderMap_.get(), + spaceId, + plans); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Building leader transfer plan failed. " << "Space: " << spaceId; + inLeaderBalance_ = false; + return retCode; + } + if (plans.empty()) { + LOG(INFO) << "No leader transfer plan is generated."; + inLeaderBalance_ = false; + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + int32_t num = 0; + auto concurrency = req.get_concurrency(); + std::vector > futures; + for (const auto &plan: plans) { + futures.emplace_back(client_->transLeader(spaceId, // spaceId + plan.first, // partId + source, // source + plan.second)); // destination + ++num; + if (concurrency != 0 && num >= concurrency) { + break; + } + } + + int32_t failed = 0; + folly::collectAll(futures) + .via(executor_.get()) + .thenTry([&](const auto &result) { + auto tries = result.value(); + for (const auto &t: tries) { + if (!t.value().ok()) { + ++failed; + } + } + }).wait(); + + inLeaderBalance_ = false; + if (failed != 0) { + LOG(ERROR) << failed << " partiton failed to transfer leader"; + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + return nebula::cpp2::ErrorCode::E_BALANCER_RUNNING; +} +} // namespace nebula::meta + +#endif // NEBULA_GRAPH_LEADERTRANSFER_H \ No newline at end of file diff --git a/src/meta/processors/job/LeaderTransferProcessor.cpp b/src/meta/processors/job/LeaderTransferProcessor.cpp new file mode 100644 index 0000000000..42a362d72a --- /dev/null +++ b/src/meta/processors/job/LeaderTransferProcessor.cpp @@ -0,0 +1,15 @@ +// +// Created by fujie on 24-10-21. +// + +#include "LeaderTransferProcessor.h" + +namespace nebula { +namespace meta { +void LeaderTransferProcessor::process(const cpp2::LeaderTransferReq &req) { + auto ret = LeaderTransferProcessor::instance(kv_)->leaderTransfer(req); + handleErrorCode(ret); + onFinished(); +} +} //namespace meta +} //namespace nebula diff --git a/src/meta/processors/job/LeaderTransferProcessor.h b/src/meta/processors/job/LeaderTransferProcessor.h new file mode 100644 index 0000000000..d0d2c48e49 --- /dev/null +++ b/src/meta/processors/job/LeaderTransferProcessor.h @@ -0,0 +1,53 @@ +// +// Created by fujie on 24-10-21. +// + +#ifndef NEBULA_GRAPH_LEADERTRANSFERPROCESSOR_H +#define NEBULA_GRAPH_LEADERTRANSFERPROCESSOR_H + + +#include +#include +#include "meta/processors/BaseProcessor.h" +#include "meta/processors/admin/AdminClient.h" + +namespace nebula { +namespace meta { +/** + * @brief Transfer leaders of specified space from specified host + */ +class LeaderTransferProcessor : public BaseProcessor { +public: + static LeaderTransferProcessor *instance(kvstore::KVStore *kvstore) { + static std::unique_ptr client(new AdminClient(kvstore)); + static std::unique_ptr processor(new LeaderTransferProcessor(kvstore, client.get())); + return new LeaderTransferProcessor(kvstore, client.get()); + } + + void process(const cpp2::LeaderTransferReq &req); + + nebula::cpp2::ErrorCode leaderTransfer(const meta::cpp2::LeaderTransferReq &req); + + nebula::cpp2::ErrorCode buildLeaderTransferPlan( + const HostAddr &source, + HostLeaderMap *hostLeaderMap, + GraphSpaceID spaceId, + std::vector > &plan) const; + +private: + LeaderTransferProcessor(kvstore::KVStore *kvstore, AdminClient *client) : BaseProcessor(kvstore), + kv_(kvstore), client_(client) { + executor_ = std::make_unique(1); + } + + std::atomic_bool running_{false}; + kvstore::KVStore *kv_{nullptr}; + AdminClient *client_{nullptr}; + std::unique_ptr hostLeaderMap_; + std::atomic_bool inLeaderBalance_{false}; + std::unique_ptr executor_; +}; +} +} + +#endif // NEBULA_GRAPH_LEADERTRANSFERPROCESSOR_H diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index f8581e9873..dd20cf3f3a 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -442,4 +442,11 @@ std::string KillQuerySentence::toString() const { buf += ")"; return buf; } + +std::string TransferLeaderSentence::toString() const { + return folly::stringPrintf("TRANSFER LEADER FROM %s ON %s %d", + address_->toString().c_str(), + spaceName_.get()->c_str(), + concurrency_); +} } // namespace nebula diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index eac6c4893b..0adf045d77 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -883,6 +883,29 @@ class KillSessionSentence final : public Sentence { Expression* sessionId_{nullptr}; }; +class TransferLeaderSentence final : public Sentence { + public: + explicit TransferLeaderSentence(HostAddr *address, std::string* spaceName, int32_t concurrency) { + address_.reset(address); + spaceName_.reset(spaceName); + concurrency_ = concurrency; + kind_ = Kind::kTransferLeader; + } + + const std::string *spaceName() const { return spaceName_.get(); } + + const HostAddr *address() const { return address_.get(); } + + int32_t concurrency() const { return concurrency_; } + + std::string toString() const override; + + private: + std::unique_ptr spaceName_; + std::unique_ptr address_; + int32_t concurrency_; +}; + } // namespace nebula #endif // PARSER_ADMINSENTENCES_H_ diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index ce36cabb72..780870f183 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -134,6 +134,7 @@ class Sentence { kAlterSpace, kClearSpace, kUnwind, + kTransferLeader, }; Kind kind() const { diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 5a90260609..fae6c53ae3 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -215,6 +215,7 @@ using namespace nebula; %token KW_LIST KW_MAP %token KW_MERGE KW_DIVIDE KW_RENAME %token KW_JOIN KW_LEFT KW_RIGHT KW_OUTER KW_INNER KW_SEMI KW_ANTI +%token KW_TRANSFER /* symbols */ %token L_PAREN R_PAREN L_BRACKET R_BRACKET L_BRACE R_BRACE COMMA @@ -345,7 +346,7 @@ using namespace nebula; %type service_client_item %type service_client_list -%type legal_integer unary_integer rank port +%type legal_integer unary_integer rank port job_concurrency %type comment_prop_assignment comment_prop opt_comment_prop %type column_property @@ -384,6 +385,7 @@ using namespace nebula; %type create_user_sentence alter_user_sentence drop_user_sentence change_password_sentence describe_user_sentence %type show_queries_sentence kill_query_sentence kill_session_sentence %type show_sentence +%type transfer_leader_sentence %type mutate_sentence %type insert_vertex_sentence insert_edge_sentence @@ -3312,6 +3314,13 @@ delete_edge_sentence } ; +transfer_leader_sentence + : KW_TRANSFER KW_LEADER KW_FROM host_item KW_ON name_label job_concurrency { + $$ = new TransferLeaderSentence($4, $6, $7); + } + ; + + admin_job_sentence : KW_SUBMIT KW_JOB KW_COMPACT { auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, @@ -3390,6 +3399,15 @@ admin_job_sentence } ; +job_concurrency + : %empty { + $$ = 0; + } + | legal_integer { + $$ = $1; + } + ; + show_queries_sentence : KW_SHOW KW_LOCAL KW_QUERIES { $$ = new ShowQueriesSentence(); @@ -3966,6 +3984,7 @@ maintain_sentence | drop_snapshot_sentence { $$ = $1; } | sign_in_service_sentence { $$ = $1; } | sign_out_service_sentence { $$ = $1; } + | transfer_leader_sentence { $$ = $1; } ; sentence diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index ba4e539c89..410b5459a9 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -196,6 +196,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* "OUTER" { return TokenType::KW_OUTER; } "SEMI" { return TokenType::KW_SEMI; } "ANTI" { return TokenType::KW_ANTI; } +"TRANSFER" { return TokenType::KW_TRANSFER; } /** * TODO(dutor) Manage the dynamic allocated objects with an object pool,