From 9123cf95e787ef1d8dc3ee1d91a7a917bc064c61 Mon Sep 17 00:00:00 2001 From: cjcchen Date: Fri, 3 May 2024 11:55:28 +0800 Subject: [PATCH] Fix Bug (#142) * fix viewchange bug * format code * rm unused log * rm unused log * fix ut --- WORKSPACE | 4 +- api/BUILD | 7 +- api/pybind_kv_service.cpp | 47 ++-- benchmark/protocols/poe/BUILD | 1 - .../protocols/poe/kv_server_performance.cpp | 2 +- common/BUILD | 6 +- interface/common/resdb_txn_accessor.cpp | 18 +- interface/common/resdb_txn_accessor_test.cpp | 6 +- .../consensus/ordering/common/algorithm/BUILD | 1 - .../common/algorithm/protocol_base.cpp | 45 ++- .../ordering/common/algorithm/protocol_base.h | 64 +++-- .../consensus/ordering/common/framework/BUILD | 1 - .../ordering/common/framework/consensus.cpp | 53 ++-- .../ordering/common/framework/consensus.h | 13 +- .../common/framework/performance_manager.cpp | 47 ++-- .../common/framework/performance_manager.h | 2 +- .../common/framework/response_manager.cpp | 2 +- .../common/framework/response_manager.h | 4 +- .../ordering/pbft/checkpoint_manager.cpp | 6 +- .../consensus/ordering/pbft/commitment.cpp | 27 +- .../ordering/pbft/consensus_manager_pbft.cpp | 2 +- .../ordering/pbft/message_manager.cpp | 5 +- .../ordering/pbft/performance_manager.cpp | 11 +- platform/consensus/ordering/pbft/query.cpp | 61 +++- .../consensus/ordering/pbft/query_test.cpp | 10 +- .../ordering/pbft/response_manager.cpp | 47 ++-- .../ordering/pbft/response_manager.h | 2 +- .../ordering/pbft/viewchange_manager.cpp | 8 +- .../consensus/ordering/poe/algorithm/BUILD | 6 +- .../consensus/ordering/poe/algorithm/poe.cpp | 13 +- .../consensus/ordering/poe/algorithm/poe.h | 6 +- .../consensus/ordering/poe/framework/BUILD | 1 - .../ordering/poe/framework/consensus.cpp | 9 +- .../ordering/poe/framework/consensus.h | 2 +- platform/consensus/recovery/recovery.cpp | 5 +- platform/networkstrate/async_acceptor.cpp | 2 +- platform/networkstrate/consensus_manager.cpp | 137 +++++---- platform/networkstrate/consensus_manager.h | 4 + .../networkstrate/replica_communicator.cpp | 2 +- platform/proto/resdb.proto | 5 + platform/statistic/BUILD | 12 +- platform/statistic/stats.cpp | 261 ++++++++++-------- platform/statistic/stats.h | 77 +++--- scripts/deploy/config/kv_server.conf | 8 +- scripts/deploy/script/deploy.sh | 23 +- service/kv/kv_service.cpp | 1 + .../kv/api_tools/kv_client_txn_tools.cpp | 2 - third_party/BUILD | 2 +- 48 files changed, 613 insertions(+), 467 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index 6a7883f72..ea67f3e5e 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -199,7 +199,7 @@ http_archive( http_archive( name = "pybind11_bazel", strip_prefix = "pybind11_bazel-2.11.1.bzl.1", - urls = ["https://github.com/pybind/pybind11_bazel/archive/refs/tags/v2.11.1.bzl.1.zip"] + urls = ["https://github.com/pybind/pybind11_bazel/archive/refs/tags/v2.11.1.bzl.1.zip"], ) http_archive( @@ -244,4 +244,4 @@ http_archive( sha256 = "babcdfd2c744905a73d20de211b51367bda0d5200f11d654c4314b909d8c963c", strip_prefix = "asio-asio-1-26-0", url = "https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-26-0.zip", -) \ No newline at end of file +) diff --git a/api/BUILD b/api/BUILD index d69120c17..93b83cdf1 100644 --- a/api/BUILD +++ b/api/BUILD @@ -1,19 +1,20 @@ package(default_visibility = ["//visibility:public"]) + cc_binary( name = "pybind_kv.so", srcs = ["pybind_kv_service.cpp"], - linkshared =1, + linkshared = 1, linkstatic = 1, deps = [ "@//common/proto:signature_info_cc_proto", "@//interface/kv:kv_client", "@//platform/config:resdb_config_utils", - "@pybind11//:pybind11", + "@pybind11", ], ) + py_library( name = "pybind_kv_so", data = [":pybind_kv.so"], imports = ["."], ) - diff --git a/api/pybind_kv_service.cpp b/api/pybind_kv_service.cpp index bad50dc7f..40073d66f 100644 --- a/api/pybind_kv_service.cpp +++ b/api/pybind_kv_service.cpp @@ -1,11 +1,12 @@ #include #include +#include #include #include #include -#include #include + #include "common/proto/signature_info.pb.h" #include "interface/kv/kv_client.h" #include "platform/config/resdb_config_utils.h" @@ -16,35 +17,31 @@ using resdb::KVClient; using resdb::ReplicaInfo; using resdb::ResDBConfig; - - - std::string get(std::string key, std::string config_path) { - ResDBConfig config = GenerateResDBConfig(config_path); - config.SetClientTimeoutMs(100000); - KVClient client(config); - auto result_ptr = client.Get(key); - if (result_ptr) { - return *result_ptr; - } else { - return ""; - } + ResDBConfig config = GenerateResDBConfig(config_path); + config.SetClientTimeoutMs(100000); + KVClient client(config); + auto result_ptr = client.Get(key); + if (result_ptr) { + return *result_ptr; + } else { + return ""; + } } bool set(std::string key, std::string value, std::string config_path) { - ResDBConfig config = GenerateResDBConfig(config_path); - config.SetClientTimeoutMs(100000); - KVClient client(config); - int result = client.Set(key, value); - if (result == 0) { - return true; - } else { - return false; - } + ResDBConfig config = GenerateResDBConfig(config_path); + config.SetClientTimeoutMs(100000); + KVClient client(config); + int result = client.Set(key, value); + if (result == 0) { + return true; + } else { + return false; + } } PYBIND11_MODULE(pybind_kv, m) { - m.def("get", &get, "A function that gets a value from the key-value store"); - m.def("set", &set, "A function that sets a value in the key-value store"); + m.def("get", &get, "A function that gets a value from the key-value store"); + m.def("set", &set, "A function that sets a value in the key-value store"); } - diff --git a/benchmark/protocols/poe/BUILD b/benchmark/protocols/poe/BUILD index e7bbde3b1..f63f6fb8d 100644 --- a/benchmark/protocols/poe/BUILD +++ b/benchmark/protocols/poe/BUILD @@ -13,4 +13,3 @@ cc_binary( "//service/utils:server_factory", ], ) - diff --git a/benchmark/protocols/poe/kv_server_performance.cpp b/benchmark/protocols/poe/kv_server_performance.cpp index fa13e9a94..b2eb62dd9 100644 --- a/benchmark/protocols/poe/kv_server_performance.cpp +++ b/benchmark/protocols/poe/kv_server_performance.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { request.SerializeToString(&request_data); return request_data; }); - + auto server = std::make_unique(*config, std::move(performance_consens)); server->Run(); diff --git a/common/BUILD b/common/BUILD index 84acee118..4e6a18aa7 100644 --- a/common/BUILD +++ b/common/BUILD @@ -16,10 +16,10 @@ cc_library( ) cc_library( - name= "beast", + name = "beast", deps = [ - "@boost//:beast" - ] + "@boost//:beast", + ], ) cc_library( diff --git a/interface/common/resdb_txn_accessor.cpp b/interface/common/resdb_txn_accessor.cpp index 532d432a8..3b7751916 100644 --- a/interface/common/resdb_txn_accessor.cpp +++ b/interface/common/resdb_txn_accessor.cpp @@ -59,15 +59,12 @@ ResDBTxnAccessor::GetTxn(uint64_t min_seq, uint64_t max_seq) { ths.push_back(std::thread( [&](NetChannel* client) { std::string response_str; - int ret = -1; - for (int i = 0; i < 3 && ret < 0; ++i) { - ret = client->SendRequest(request, Request::TYPE_QUERY); - if (ret) { - return; - } - client->SetRecvTimeout(1000); - ret = client->RecvRawMessageStr(&response_str); + int ret = client->SendRequest(request, Request::TYPE_QUERY); + if (ret) { + return; } + client->SetRecvTimeout(1000); + ret = client->RecvRawMessageStr(&response_str); if (ret == 0) { std::unique_lock lck(mtx); recv_count[response_str]++; @@ -160,11 +157,12 @@ absl::StatusOr ResDBTxnAccessor::GetBlockNumbers() { std::string final_str; std::mutex mtx; std::condition_variable resp_cv; + bool success = false; std::unique_ptr client = GetNetChannel(replicas_[0].ip(), replicas_[0].port()); - LOG(ERROR) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port(); + LOG(INFO) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port(); std::string response_str; int ret = 0; @@ -175,7 +173,7 @@ absl::StatusOr ResDBTxnAccessor::GetBlockNumbers() { } client->SetRecvTimeout(100000); ret = client->RecvRawMessageStr(&response_str); - LOG(ERROR) << "receive str:" << ret << " len:" << response_str.size(); + LOG(INFO) << "receive str:" << ret << " len:" << response_str.size(); if (ret != 0) { continue; } diff --git a/interface/common/resdb_txn_accessor_test.cpp b/interface/common/resdb_txn_accessor_test.cpp index e0b4925ec..39c407bc2 100644 --- a/interface/common/resdb_txn_accessor_test.cpp +++ b/interface/common/resdb_txn_accessor_test.cpp @@ -62,9 +62,11 @@ TEST(ResDBTxnAccessorTest, GetTransactionsFail) { auto client = std::make_unique(ip, port); EXPECT_CALL(*client, SendRequest(EqualsProto(request), Request::TYPE_QUERY, _)) - .Times(AtLeast(1)).WillRepeatedly(Return(0)); + .Times(AtLeast(1)) + .WillRepeatedly(Return(0)); EXPECT_CALL(*client, RecvRawMessageStr) - .Times(AtLeast(1)).WillRepeatedly(Invoke([&](std::string* resp) { return -1; })); + .Times(AtLeast(1)) + .WillRepeatedly(Invoke([&](std::string* resp) { return -1; })); return client; })); absl::StatusOr>> resp = diff --git a/platform/consensus/ordering/common/algorithm/BUILD b/platform/consensus/ordering/common/algorithm/BUILD index 9abd8715e..1ba4e6fae 100644 --- a/platform/consensus/ordering/common/algorithm/BUILD +++ b/platform/consensus/ordering/common/algorithm/BUILD @@ -9,4 +9,3 @@ cc_library( "//common/crypto:signature_verifier", ], ) - diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.cpp b/platform/consensus/ordering/common/algorithm/protocol_base.cpp index 3c6c2fc3b..541941363 100644 --- a/platform/consensus/ordering/common/algorithm/protocol_base.cpp +++ b/platform/consensus/ordering/common/algorithm/protocol_base.cpp @@ -5,43 +5,36 @@ namespace resdb { namespace common { -ProtocolBase::ProtocolBase( - int id, - int f, - int total_num, - SingleCallFuncType single_call, - BroadcastCallFuncType broadcast_call, - CommitFuncType commit) : - id_(id), +ProtocolBase::ProtocolBase(int id, int f, int total_num, + SingleCallFuncType single_call, + BroadcastCallFuncType broadcast_call, + CommitFuncType commit) + : id_(id), f_(f), total_num_(total_num), - single_call_(single_call), - broadcast_call_(broadcast_call), + single_call_(single_call), + broadcast_call_(broadcast_call), commit_(commit) { - stop_ = false; + stop_ = false; } -ProtocolBase::ProtocolBase( int id, int f, int total_num) : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr){ +ProtocolBase::ProtocolBase(int id, int f, int total_num) + : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr) {} -} +ProtocolBase::~ProtocolBase() { Stop(); } -ProtocolBase::~ProtocolBase() { - Stop(); -} +void ProtocolBase::Stop() { stop_ = true; } -void ProtocolBase::Stop(){ - stop_ = true; -} +bool ProtocolBase::IsStop() { return stop_; } -bool ProtocolBase::IsStop(){ - return stop_; -} - -int ProtocolBase::SendMessage(int msg_type, const google::protobuf::Message& msg, int node_id) { +int ProtocolBase::SendMessage(int msg_type, + const google::protobuf::Message& msg, + int node_id) { return single_call_(msg_type, msg, node_id); } -int ProtocolBase::Broadcast(int msg_type, const google::protobuf::Message& msg) { +int ProtocolBase::Broadcast(int msg_type, + const google::protobuf::Message& msg) { return broadcast_call_(msg_type, msg); } @@ -49,5 +42,5 @@ int ProtocolBase::Commit(const google::protobuf::Message& msg) { return commit_(msg); } -} // namespace protocol +} // namespace common } // namespace resdb diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.h b/platform/consensus/ordering/common/algorithm/protocol_base.h index a93be8223..621ecd1c3 100644 --- a/platform/consensus/ordering/common/algorithm/protocol_base.h +++ b/platform/consensus/ordering/common/algorithm/protocol_base.h @@ -1,7 +1,9 @@ #pragma once -#include #include + +#include + #include "common/crypto/signature_verifier.h" namespace resdb { @@ -9,50 +11,52 @@ namespace common { class ProtocolBase { public: - typedef std::function SingleCallFuncType; - typedef std::function BroadcastCallFuncType; - typedef std::function CommitFuncType; + typedef std::function + SingleCallFuncType; + typedef std::function + BroadcastCallFuncType; + typedef std::function + CommitFuncType; - ProtocolBase( - int id, - int f, - int total_num, - SingleCallFuncType single_call, - BroadcastCallFuncType broadcast_call, - CommitFuncType commit - ); - - ProtocolBase( int id, int f, int total_num); + ProtocolBase(int id, int f, int total_num, SingleCallFuncType single_call, + BroadcastCallFuncType broadcast_call, CommitFuncType commit); + ProtocolBase(int id, int f, int total_num); virtual ~ProtocolBase(); void Stop(); - inline - void SetSingleCallFunc(SingleCallFuncType single_call) { single_call_ = single_call; } - - inline - void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) { broadcast_call_ = broadcast_call; } + inline void SetSingleCallFunc(SingleCallFuncType single_call) { + single_call_ = single_call; + } + + inline void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) { + broadcast_call_ = broadcast_call; + } - inline - void SetCommitFunc(CommitFuncType commit_func) { commit_ = commit_func; } + inline void SetCommitFunc(CommitFuncType commit_func) { + commit_ = commit_func; + } - inline - void SetSignatureVerifier(SignatureVerifier* verifier) { verifier_ = verifier;} + inline void SetSignatureVerifier(SignatureVerifier* verifier) { + verifier_ = verifier; + } - protected: - int SendMessage(int msg_type, const google::protobuf::Message& msg, int node_id); - int Broadcast(int msg_type, const google::protobuf::Message& msg); - int Commit(const google::protobuf::Message& msg); + protected: + int SendMessage(int msg_type, const google::protobuf::Message& msg, + int node_id); + int Broadcast(int msg_type, const google::protobuf::Message& msg); + int Commit(const google::protobuf::Message& msg); - bool IsStop(); + bool IsStop(); protected: int id_; int f_; int total_num_; - std::function single_call_; + std::function + single_call_; std::function broadcast_call_; std::function commit_; std::atomic stop_; @@ -60,5 +64,5 @@ class ProtocolBase { SignatureVerifier* verifier_; }; -} // namespace protocol +} // namespace common } // namespace resdb diff --git a/platform/consensus/ordering/common/framework/BUILD b/platform/consensus/ordering/common/framework/BUILD index 82e03a0fb..e4a5382d7 100644 --- a/platform/consensus/ordering/common/framework/BUILD +++ b/platform/consensus/ordering/common/framework/BUILD @@ -26,7 +26,6 @@ cc_library( ], ) - cc_library( name = "response_manager", srcs = ["response_manager.cpp"], diff --git a/platform/consensus/ordering/common/framework/consensus.cpp b/platform/consensus/ordering/common/framework/consensus.cpp index 683bce29e..caea1a463 100644 --- a/platform/consensus/ordering/common/framework/consensus.cpp +++ b/platform/consensus/ordering/common/framework/consensus.cpp @@ -46,56 +46,51 @@ Consensus::Consensus(const ResDBConfig& config, nullptr, std::move(executor))) { LOG(INFO) << "is running is performance mode:" << config_.IsPerformanceRunning(); - is_stop_ = false; + is_stop_ = false; global_stats_ = Stats::GetGlobalStats(); } -void Consensus::Init(){ - if(performance_manager_ == nullptr){ - performance_manager_ = - config_.IsPerformanceRunning() - ? std::make_unique( - config_, GetBroadCastClient(), GetSignatureVerifier()) - : nullptr; +void Consensus::Init() { + if (performance_manager_ == nullptr) { + performance_manager_ = + config_.IsPerformanceRunning() + ? std::make_unique( + config_, GetBroadCastClient(), GetSignatureVerifier()) + : nullptr; } - if(response_manager_ == nullptr){ - response_manager_ = - !config_.IsPerformanceRunning() - ? std::make_unique(config_, GetBroadCastClient(), - GetSignatureVerifier()) - : nullptr; + if (response_manager_ == nullptr) { + response_manager_ = + !config_.IsPerformanceRunning() + ? std::make_unique(config_, GetBroadCastClient(), + GetSignatureVerifier()) + : nullptr; } } -void Consensus::InitProtocol(ProtocolBase * protocol){ +void Consensus::InitProtocol(ProtocolBase* protocol) { protocol->SetSingleCallFunc( [&](int type, const google::protobuf::Message& msg, int node_id) { - return SendMsg(type, msg, node_id); + return SendMsg(type, msg, node_id); }); protocol->SetBroadcastCallFunc( [&](int type, const google::protobuf::Message& msg) { - return Broadcast(type, msg); + return Broadcast(type, msg); }); protocol->SetCommitFunc( - [&](const google::protobuf::Message& msg) { - return CommitMsg(msg); - }); + [&](const google::protobuf::Message& msg) { return CommitMsg(msg); }); } -Consensus::~Consensus(){ - is_stop_ = true; -} +Consensus::~Consensus() { is_stop_ = true; } -void Consensus::SetPerformanceManager(std::unique_ptr performance_manager){ +void Consensus::SetPerformanceManager( + std::unique_ptr performance_manager) { performance_manager_ = std::move(performance_manager); } -bool Consensus::IsStop(){ - return is_stop_; -} +bool Consensus::IsStop() { return is_stop_; } void Consensus::SetupPerformanceDataFunc(std::function func) { performance_manager_->SetDataFunc(func); @@ -131,9 +126,7 @@ std::vector Consensus::GetReplicas() { return config_.GetReplicaInfos(); } -int Consensus::CommitMsg(const google::protobuf::Message &txn) { - return 0; -} +int Consensus::CommitMsg(const google::protobuf::Message& txn) { return 0; } // The implementation of PBFT. int Consensus::ConsensusCommit(std::unique_ptr context, diff --git a/platform/consensus/ordering/common/framework/consensus.h b/platform/consensus/ordering/common/framework/consensus.h index 881dc72bb..bb0659237 100644 --- a/platform/consensus/ordering/common/framework/consensus.h +++ b/platform/consensus/ordering/common/framework/consensus.h @@ -49,12 +49,12 @@ class Consensus : public ConsensusManager { void SetCommunicator(ReplicaCommunicator* replica_communicator); - void InitProtocol(ProtocolBase * protocol); + void InitProtocol(ProtocolBase* protocol); - protected: - virtual int ProcessCustomConsensus(std::unique_ptr request); - virtual int ProcessNewTransaction(std::unique_ptr request); - virtual int CommitMsg(const google::protobuf::Message& msg); + protected: + virtual int ProcessCustomConsensus(std::unique_ptr request); + virtual int ProcessNewTransaction(std::unique_ptr request); + virtual int CommitMsg(const google::protobuf::Message& msg); protected: int SendMsg(int type, const google::protobuf::Message& msg, int node_id); @@ -65,7 +65,8 @@ class Consensus : public ConsensusManager { protected: void Init(); - void SetPerformanceManager(std::unique_ptr performance_manger); + void SetPerformanceManager( + std::unique_ptr performance_manger); protected: ReplicaCommunicator* replica_communicator_; diff --git a/platform/consensus/ordering/common/framework/performance_manager.cpp b/platform/consensus/ordering/common/framework/performance_manager.cpp index 089f3bd5e..b3861718f 100644 --- a/platform/consensus/ordering/common/framework/performance_manager.cpp +++ b/platform/consensus/ordering/common/framework/performance_manager.cpp @@ -37,7 +37,7 @@ using comm::CollectorResultCode; PerformanceManager::PerformanceManager( const ResDBConfig& config, ReplicaCommunicator* replica_communicator, SignatureVerifier* verifier) - : config_(config), + : config_(config), replica_communicator_(replica_communicator), batch_queue_("user request"), verifier_(verifier) { @@ -91,7 +91,6 @@ int PerformanceManager::StartEval() { } eval_started_ = true; for (int i = 0; i < 100000000; ++i) { - // for (int i = 0; i < 60000000000; ++i) { std::unique_ptr queue_item = std::make_unique(); queue_item->context = nullptr; queue_item->user_request = GenerateUserRequest(); @@ -119,51 +118,49 @@ int PerformanceManager::ProcessResponseMsg(std::unique_ptr context, return 0; } - //LOG(INFO) << "get response:" << request->seq() << " sender:"<sender_id(); + // LOG(INFO) << "get response:" << request->seq() << " + // sender:"<sender_id(); std::unique_ptr batch_response = nullptr; - CollectorResultCode ret = - AddResponseMsg(std::move(request), [&](std::unique_ptr request) { + CollectorResultCode ret = AddResponseMsg( + std::move(request), [&](std::unique_ptr request) { batch_response = std::move(request); return; }); if (ret == CollectorResultCode::STATE_CHANGED) { assert(batch_response); - SendResponseToClient(*batch_response); + SendResponseToClient(*batch_response); } return ret == CollectorResultCode::INVALID ? -2 : 0; } CollectorResultCode PerformanceManager::AddResponseMsg( std::unique_ptr request, - std::function)> response_call_back) { + std::function)> + response_call_back) { if (request == nullptr) { return CollectorResultCode::INVALID; } - //uint64_t seq = request->seq(); - - std::unique_ptr batch_response = std::make_unique(); + std::unique_ptr batch_response = + std::make_unique(); if (!batch_response->ParseFromString(request->data())) { - LOG(ERROR) << "parse response fail:"<data().size() - <<" seq:"<seq(); return CollectorResultCode::INVALID; + LOG(ERROR) << "parse response fail:" << request->data().size() + << " seq:" << request->seq(); + return CollectorResultCode::INVALID; } uint64_t seq = batch_response->local_id(); - //LOG(ERROR)<<"receive seq:"< lk(response_lock_[idx]); if (response_[idx].find(seq) == response_[idx].end()) { - //LOG(ERROR)<<"has done local seq:"<seq(); return CollectorResultCode::OK; } response_[idx][seq]++; - //LOG(ERROR)<<"get seq :"<seq()<<" local id:"<= config_.GetMinClientReceiveNum()) { - //LOG(ERROR)<<"get seq :"<seq()<<" local id:"< 0) { uint64_t run_time = GetCurrentTime() - create_time; - LOG(ERROR)<<"receive current:"<AddLatency(run_time); - } else { } - //send_num_-=10; send_num_--; } @@ -198,7 +195,6 @@ int PerformanceManager::BatchProposeMsg() { bool start = false; while (!stop_) { if (send_num_ > config_.GetMaxProcessTxn()) { - // LOG(ERROR)<<"wait send num:"< item = batch_queue_.Pop(config_.ClientBatchWaitTimeMS()); if (item == nullptr) { - if(start){ - LOG(ERROR)<<"no data"; + if (start) { + LOG(ERROR) << "no data"; } continue; } @@ -217,9 +213,7 @@ int PerformanceManager::BatchProposeMsg() { } } start = true; - for(int i = 0; i < 1;++i){ - int ret = DoBatch(batch_req); - } + DoBatch(batch_req); batch_req.clear(); } return 0; @@ -269,7 +263,6 @@ int PerformanceManager::DoBatch( global_stats_->BroadCastMsg(); send_num_++; sum_ += batch_req.size(); - //LOG(ERROR)<<"send num:"<SendMessage(request, GetPrimary()); } diff --git a/platform/consensus/ordering/common/framework/performance_manager.h b/platform/consensus/ordering/common/framework/performance_manager.h index 5a874baa9..7cacc847a 100644 --- a/platform/consensus/ordering/common/framework/performance_manager.h +++ b/platform/consensus/ordering/common/framework/performance_manager.h @@ -50,7 +50,7 @@ class PerformanceManager { std::unique_ptr request); void SetDataFunc(std::function func); - protected: + protected: virtual void SendMessage(const Request& request); private: diff --git a/platform/consensus/ordering/common/framework/response_manager.cpp b/platform/consensus/ordering/common/framework/response_manager.cpp index ae6b55cc0..983851413 100644 --- a/platform/consensus/ordering/common/framework/response_manager.cpp +++ b/platform/consensus/ordering/common/framework/response_manager.cpp @@ -238,5 +238,5 @@ int ResponseManager::DoBatch( return 0; } -} // namespace common +} // namespace common } // namespace resdb diff --git a/platform/consensus/ordering/common/framework/response_manager.h b/platform/consensus/ordering/common/framework/response_manager.h index 1c7043964..93f7ed9c0 100644 --- a/platform/consensus/ordering/common/framework/response_manager.h +++ b/platform/consensus/ordering/common/framework/response_manager.h @@ -32,7 +32,7 @@ #include "platform/statistic/stats.h" namespace resdb { -namespace common { +namespace common { class ResponseManager { public: @@ -81,5 +81,5 @@ class ResponseManager { std::mutex response_lock_[response_set_size_]; }; -} // common +} // namespace common } // namespace resdb diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp b/platform/consensus/ordering/pbft/checkpoint_manager.cpp index ef7483083..a5a24ca82 100644 --- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp +++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp @@ -312,11 +312,15 @@ void CheckPointManager::UpdateCheckPointStatus() { last_hash_ = GetHash(last_hash_, request->hash()); last_seq_++; } + bool is_recovery = request->is_recovery(); txn_db_->Put(std::move(request)); if (current_seq == last_ckpt_seq + water_mark) { last_ckpt_seq = current_seq; - BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, stable_seqs); + if (!is_recovery) { + BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, + stable_seqs); + } } } return; diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 081c11d26..4a1aacc09 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -41,7 +41,10 @@ Commitment::Commitment(const ResDBConfig& config, duplicate_manager_ = std::make_unique(config); message_manager_->SetDuplicateManager(duplicate_manager_.get()); - global_stats_->SetProps(config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(), config_.GetConfigData().enable_faulty_switch()); + global_stats_->SetProps( + config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(), + config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(), + config_.GetConfigData().enable_faulty_switch()); global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary()); } @@ -81,7 +84,8 @@ int Commitment::ProcessNewRequest(std::unique_ptr context, // << message_manager_->GetCurrentPrimary() // << " seq:" << user_request->seq() // << " hash:" << user_request->hash(); - LOG(ERROR)<<"NOT PRIMARY, Primary is "<GetCurrentPrimary(); + LOG(INFO) << "NOT PRIMARY, Primary is " + << message_manager_->GetCurrentPrimary(); replica_communicator_->SendMessage(*user_request, message_manager_->GetCurrentPrimary()); { @@ -117,7 +121,6 @@ int Commitment::ProcessNewRequest(std::unique_ptr context, global_stats_->IncClientRequest(); if (duplicate_manager_->CheckAndAddProposed(user_request->hash())) { - LOG(ERROR) << "duplicate check fail:"; return -2; } auto seq = message_manager_->AssignNextSeq(); @@ -155,14 +158,20 @@ int Commitment::ProcessNewRequest(std::unique_ptr context, // TODO check whether the sender is the primary. int Commitment::ProcessProposeMsg(std::unique_ptr context, std::unique_ptr request) { - if (global_stats_->IsFaulty() || context == nullptr || context->signature.signature().empty()) { + if (global_stats_->IsFaulty() || context == nullptr || + context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; } if (request->is_recovery()) { - if (static_cast(request->seq()) >= - message_manager_->GetNextSeq()) { + if (message_manager_->GetNextSeq() == 0 || + request->seq() == message_manager_->GetNextSeq()) { message_manager_->SetNextSeq(request->seq() + 1); + } else { + LOG(ERROR) << " recovery request not valid:" + << " current seq:" << message_manager_->GetNextSeq() + << " data seq:" << request->seq(); + return 0; } return message_manager_->AddConsensusMsg(context->signature, std::move(request)); @@ -187,7 +196,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr context, LOG(ERROR) << " check by the user func fail"; return -2; } - //global_stats_->GetTransactionDetails(std::move(request)); + // global_stats_->GetTransactionDetails(std::move(request)); BatchUserRequest batch_request; batch_request.ParseFromString(request->data()); batch_request.clear_createtime(); @@ -286,8 +295,8 @@ int Commitment::ProcessCommitMsg(std::unique_ptr context, CollectorResultCode ret = message_manager_->AddConsensusMsg(context->signature, std::move(request)); if (ret == CollectorResultCode::STATE_CHANGED) { - //LOG(ERROR)<data().size(); - //global_stats_->GetTransactionDetails(request->data()); + // LOG(ERROR)<data().size(); + // global_stats_->GetTransactionDetails(request->data()); global_stats_->RecordStateTime("commit"); } return ret == CollectorResultCode::INVALID ? -2 : 0; diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp index 09feae406..72103eab5 100644 --- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp +++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp @@ -208,7 +208,7 @@ int ConsensusManagerPBFT::InternalConsensusCommit( int ret = commitment_->ProcessNewRequest(std::move(context), std::move(request)); if (ret == -3) { - LOG(ERROR)<<"BAD RETURN"; + LOG(ERROR) << "BAD RETURN"; std::pair, std::unique_ptr> request_complained; { diff --git a/platform/consensus/ordering/pbft/message_manager.cpp b/platform/consensus/ordering/pbft/message_manager.cpp index 37d3b5de8..cc5e187c6 100644 --- a/platform/consensus/ordering/pbft/message_manager.cpp +++ b/platform/consensus/ordering/pbft/message_manager.cpp @@ -39,6 +39,9 @@ MessageManager::MessageManager( [&](std::unique_ptr request, std::unique_ptr resp_msg) { if (request->is_recovery()) { + if (checkpoint_manager_) { + checkpoint_manager_->AddCommitData(std::move(request)); + } return; } resp_msg->set_proxy_id(request->proxy_id()); @@ -231,8 +234,6 @@ TransactionStatue MessageManager::GetTransactionState(uint64_t seq) { } int MessageManager::GetReplicaState(ReplicaState* state) { - state->set_view(GetCurrentView()); - *state->mutable_replica_info() = config_.GetSelfInfo(); *state->mutable_replica_config() = config_.GetConfigData(); return 0; } diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp b/platform/consensus/ordering/pbft/performance_manager.cpp index a5ba979ff..26d0ebcaa 100644 --- a/platform/consensus/ordering/pbft/performance_manager.cpp +++ b/platform/consensus/ordering/pbft/performance_manager.cpp @@ -191,8 +191,17 @@ CollectorResultCode PerformanceManager::AddResponseMsg( return CollectorResultCode::INVALID; } + std::unique_ptr batch_response = + std::make_unique(); + if (!batch_response->ParseFromString(request->data())) { + LOG(ERROR) << "parse response fail:" << request->data().size() + << " seq:" << request->seq(); + return CollectorResultCode::INVALID; + } + + uint64_t seq = batch_response->local_id(); + int type = request->type(); - uint64_t seq = request->seq(); int resp_received_count = 0; int ret = collector_pool_->GetCollector(seq)->AddRequest( std::move(request), signature, false, diff --git a/platform/consensus/ordering/pbft/query.cpp b/platform/consensus/ordering/pbft/query.cpp index 72bb4ed5b..c2d60276b 100644 --- a/platform/consensus/ordering/pbft/query.cpp +++ b/platform/consensus/ordering/pbft/query.cpp @@ -50,24 +50,65 @@ int Query::ProcessGetReplicaState(std::unique_ptr context, int Query::ProcessQuery(std::unique_ptr context, std::unique_ptr request) { + if (config_.GetPublicKeyCertificateInfo() + .public_key() + .public_key_info() + .type() == CertificateKeyInfo::CLIENT) { + auto find_primary = [&]() { + auto config_data = config_.GetConfigData(); + for (const auto& r : config_data.region()) { + for (const auto& replica : r.replica_info()) { + if (replica.id() == 1) { + return replica; + } + } + } + }; + ReplicaInfo primary = find_primary(); + std::string ip = primary.ip(); + int port = primary.port(); + + LOG(ERROR) << "redirect to primary:" << ip << " port:" << port; + auto client = std::make_unique(ip, port); + if (client->SendRawMessage(*request) == 0) { + QueryResponse resp; + if (client->RecvRawMessage(&resp) == 0) { + if (context != nullptr && context->client != nullptr) { + LOG(ERROR) << "send response from primary:" + << resp.transactions_size(); + int ret = context->client->SendRawMessage(resp); + if (ret) { + LOG(ERROR) << "send resp fail ret:" << ret; + } + } + } + } + return 0; + } + QueryRequest query; if (!query.ParseFromString(request->data())) { LOG(ERROR) << "parse data fail"; return -2; } - // LOG(ERROR) << "request:" << query.DebugString(); QueryResponse response; - for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) { - Request* ret_request = message_manager_->GetRequest(i); - if (ret_request == nullptr) { - break; + if (query.max_seq() == 0 && query.min_seq() == 0) { + uint64_t mseq = message_manager_->GetNextSeq(); + response.set_max_seq(mseq - 1); + LOG(ERROR) << "get max seq:" << mseq; + } else { + for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) { + Request* ret_request = message_manager_->GetRequest(i); + if (ret_request == nullptr) { + break; + } + Request* txn = response.add_transactions(); + txn->set_data(ret_request->data()); + txn->set_hash(ret_request->hash()); + txn->set_seq(ret_request->seq()); + txn->set_proxy_id(ret_request->proxy_id()); } - Request* txn = response.add_transactions(); - txn->set_data(ret_request->data()); - txn->set_hash(ret_request->hash()); - txn->set_seq(ret_request->seq()); - txn->set_proxy_id(ret_request->proxy_id()); } if (context != nullptr && context->client != nullptr) { diff --git a/platform/consensus/ordering/pbft/query_test.cpp b/platform/consensus/ordering/pbft/query_test.cpp index 4cc1a1175..871dfe523 100644 --- a/platform/consensus/ordering/pbft/query_test.cpp +++ b/platform/consensus/ordering/pbft/query_test.cpp @@ -152,10 +152,12 @@ MATCHER_P(EqualsProtoNoConfigData, replica, "") { TEST_F(QueryTest, QueryState) { ReplicaState replica_state; - replica_state.set_view(1); - replica_state.mutable_replica_info()->set_id(1); - replica_state.mutable_replica_info()->set_ip("127.0.0.1"); - replica_state.mutable_replica_info()->set_port(1234); + replica_state.mutable_replica_config()->set_view_change_timeout_ms(100); + replica_state.mutable_replica_config()->set_client_batch_num(100); + replica_state.mutable_replica_config()->set_worker_num(64); + replica_state.mutable_replica_config()->set_input_worker_num(1); + replica_state.mutable_replica_config()->set_output_worker_num(1); + replica_state.mutable_replica_config()->set_tcp_batch_num(100); std::unique_ptr channel = std::make_unique("127.0.0.1", 0); diff --git a/platform/consensus/ordering/pbft/response_manager.cpp b/platform/consensus/ordering/pbft/response_manager.cpp index 6a7162d8e..f490c0039 100644 --- a/platform/consensus/ordering/pbft/response_manager.cpp +++ b/platform/consensus/ordering/pbft/response_manager.cpp @@ -26,7 +26,7 @@ namespace resdb { ResponseClientTimeout::ResponseClientTimeout(std::string hash_, - uint64_t time_) { + uint64_t time_) { this->hash = hash_; this->timeout_time = time_; } @@ -57,6 +57,7 @@ ResponseManager::ResponseManager(const ResDBConfig& config, verifier_(verifier) { stop_ = false; local_id_ = 1; + timeout_length_ = 5000000; if (config_.GetPublicKeyCertificateInfo() .public_key() @@ -65,9 +66,9 @@ ResponseManager::ResponseManager(const ResDBConfig& config, config_.IsTestMode()) { user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this); } - if(config_.GetConfigData().enable_viewchange()){ + if (config_.GetConfigData().enable_viewchange()) { checking_timeout_thread_ = - std::thread(&ResponseManager::MonitoringClientTimeOut, this); + std::thread(&ResponseManager::MonitoringClientTimeOut, this); } global_stats_ = Stats::GetGlobalStats(); send_num_ = 0; @@ -78,7 +79,7 @@ ResponseManager::~ResponseManager() { if (user_req_thread_.joinable()) { user_req_thread_.join(); } - if(checking_timeout_thread_.joinable()){ + if (checking_timeout_thread_.joinable()) { checking_timeout_thread_.join(); } } @@ -172,8 +173,21 @@ CollectorResultCode ResponseManager::AddResponseMsg( return CollectorResultCode::INVALID; } + std::string hash = request->hash(); + + std::unique_ptr batch_response = + std::make_unique(); + if (!batch_response->ParseFromString(request->data())) { + LOG(ERROR) << "parse response fail:" << request->data().size() + << " seq:" << request->seq(); + RemoveWaitingResponseRequest(hash); + return CollectorResultCode::INVALID; + } + + uint64_t seq = batch_response->local_id(); + request->set_seq(seq); + int type = request->type(); - uint64_t seq = request->seq(); int resp_received_count = 0; int ret = collector_pool_->GetCollector(seq)->AddRequest( std::move(request), signature, false, @@ -190,6 +204,7 @@ CollectorResultCode ResponseManager::AddResponseMsg( } if (resp_received_count > 0) { collector_pool_->Update(seq); + RemoveWaitingResponseRequest(hash); return CollectorResultCode::STATE_CHANGED; } return CollectorResultCode::OK; @@ -295,7 +310,8 @@ int ResponseManager::DoBatch( if (!config_.IsPerformanceRunning()) { LOG(ERROR) << "add context list:" << new_request->seq() - << " list size:" << context_list.size(); + << " list size:" << context_list.size() + << " local_id:" << local_id_; batch_request.set_local_id(local_id_); int ret = AddContextList(std::move(context_list), local_id_++); if (ret != 0) { @@ -318,13 +334,10 @@ int ResponseManager::DoBatch( batch_request.SerializeToString(new_request->mutable_data()); new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data())); new_request->set_proxy_id(config_.GetSelfInfo().id()); - /*for(int i=1; i<=4; i++){ - replica_communicator_->SendMessage(*new_request, i); - }*/ replica_communicator_->SendMessage(*new_request, GetPrimary()); send_num_++; - LOG(INFO) << "send msg to primary:" << GetPrimary() - << " batch size:" << batch_req.size(); + // LOG(INFO) << "send msg to primary:" << GetPrimary() + // << " batch size:" << batch_req.size(); AddWaitingResponseRequest(std::move(new_request)); return 0; } @@ -335,16 +348,16 @@ void ResponseManager::AddWaitingResponseRequest( return; } pm_lock_.lock(); - uint64_t time = GetCurrentTime() + this->timeout_length_; - client_timeout_min_heap_.push( - ResponseClientTimeout(request->hash(), time)); + assert(timeout_length_ > 0); + uint64_t time = GetCurrentTime() + timeout_length_; + client_timeout_min_heap_.push(ResponseClientTimeout(request->hash(), time)); waiting_response_batches_.insert( make_pair(request->hash(), std::move(request))); pm_lock_.unlock(); sem_post(&request_sent_signal_); } -void ResponseManager::RemoveWaitingResponseRequest(std::string hash) { +void ResponseManager::RemoveWaitingResponseRequest(const std::string& hash) { if (!config_.GetConfigData().enable_viewchange()) { return; } @@ -363,8 +376,7 @@ bool ResponseManager::CheckTimeOut(std::string hash) { return value; } -std::unique_ptr ResponseManager::GetTimeOutRequest( - std::string hash) { +std::unique_ptr ResponseManager::GetTimeOutRequest(std::string hash) { pm_lock_.lock(); auto value = std::move(waiting_response_batches_.find(hash)->second); pm_lock_.unlock(); @@ -390,7 +402,6 @@ void ResponseManager::MonitoringClientTimeOut() { if (CheckTimeOut(client_timeout.hash)) { auto request = GetTimeOutRequest(client_timeout.hash); if (request) { - LOG(ERROR) << "Client Request Timeout " << client_timeout.hash; replica_communicator_->BroadCast(*request); } } diff --git a/platform/consensus/ordering/pbft/response_manager.h b/platform/consensus/ordering/pbft/response_manager.h index b238e0df6..36412eaff 100644 --- a/platform/consensus/ordering/pbft/response_manager.h +++ b/platform/consensus/ordering/pbft/response_manager.h @@ -77,7 +77,7 @@ class ResponseManager { int GetPrimary(); void AddWaitingResponseRequest(std::unique_ptr request); - void RemoveWaitingResponseRequest(std::string hash); + void RemoveWaitingResponseRequest(const std::string& hash); bool CheckTimeOut(std::string hash); void ResponseTimer(std::string hash); void MonitoringClientTimeOut(); diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp b/platform/consensus/ordering/pbft/viewchange_manager.cpp index 53060c20e..3084c7a1a 100644 --- a/platform/consensus/ordering/pbft/viewchange_manager.cpp +++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp @@ -109,7 +109,7 @@ void ViewChangeManager::MayStart() { return; } started_ = true; - LOG(ERROR)<<"MAYSTART"; + LOG(ERROR) << "MAYSTART"; if (config_.GetPublicKeyCertificateInfo() .public_key() @@ -149,7 +149,7 @@ void ViewChangeManager::MayStart() { bool ViewChangeManager::ChangeStatue(ViewChangeStatus status) { if (status == ViewChangeStatus::READY_VIEW_CHANGE) { if (status_ != ViewChangeStatus::READY_VIEW_CHANGE) { - LOG(ERROR)<<"CHANGE STATUS"; + LOG(ERROR) << "CHANGE STATUS"; status_ = status; } } else { @@ -228,7 +228,7 @@ void ViewChangeManager::SetCurrentViewAndNewPrimary(uint64_t view_number) { config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id(); system_info_->SetPrimary(id); global_stats_->ChangePrimary(id); - LOG(ERROR)<<"View Change Happened"; + LOG(ERROR) << "View Change Happened"; } std::vector> ViewChangeManager::GetPrepareMsg( @@ -509,7 +509,7 @@ void ViewChangeManager::SendViewChangeMsg() { } void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash) { - LOG(ERROR)<<"ADDING COMPLAINT"; + LOG(ERROR) << "ADDING COMPLAINT"; std::lock_guard lk(vc_mutex_); if (complaining_clients_.count(proxy_id) == 0) { complaining_clients_[proxy_id].set_proxy_id(proxy_id); diff --git a/platform/consensus/ordering/poe/algorithm/BUILD b/platform/consensus/ordering/poe/algorithm/BUILD index 357f56d8b..335cbca0f 100644 --- a/platform/consensus/ordering/poe/algorithm/BUILD +++ b/platform/consensus/ordering/poe/algorithm/BUILD @@ -5,11 +5,11 @@ cc_library( srcs = ["poe.cpp"], hdrs = ["poe.h"], deps = [ - "//platform/statistic:stats", "//common:comm", - "//platform/consensus/ordering/poe/proto:proposal_cc_proto", "//common/crypto:signature_verifier", - "//platform/consensus/ordering/common/algorithm:protocol_base", "//platform/common/queue:lock_free_queue", + "//platform/consensus/ordering/common/algorithm:protocol_base", + "//platform/consensus/ordering/poe/proto:proposal_cc_proto", + "//platform/statistic:stats", ], ) diff --git a/platform/consensus/ordering/poe/algorithm/poe.cpp b/platform/consensus/ordering/poe/algorithm/poe.cpp index 8a9b3a317..6aa9d6941 100644 --- a/platform/consensus/ordering/poe/algorithm/poe.cpp +++ b/platform/consensus/ordering/poe/algorithm/poe.cpp @@ -10,7 +10,6 @@ namespace poe { PoE::PoE(int id, int f, int total_num, SignatureVerifier* verifier) : ProtocolBase(id, f, total_num), verifier_(verifier) { - LOG(ERROR) << "get proposal graph"; id_ = id; total_num_ = total_num; @@ -19,9 +18,7 @@ PoE::PoE(int id, int f, int total_num, SignatureVerifier* verifier) seq_ = 0; } -PoE::~PoE() { - is_stop_ = true; -} +PoE::~PoE() { is_stop_ = true; } bool PoE::IsStop() { return is_stop_; } @@ -41,7 +38,7 @@ bool PoE::ReceivePropose(std::unique_ptr txn) { int proposer = txn->proposer(); { std::unique_lock lk(mutex_); - data_[txn->hash()]=std::move(txn); + data_[txn->hash()] = std::move(txn); } Proposal proposal; @@ -58,14 +55,14 @@ bool PoE::ReceivePrepare(std::unique_ptr proposal) { std::unique_lock lk(mutex_); received_[proposal->hash()].insert(proposal->proposer()); auto it = data_.find(proposal->hash()); - if(it != data_.end()){ - if(received_[proposal->hash()].size()>=2*f_+1){ + if (it != data_.end()) { + if (received_[proposal->hash()].size() >= 2 * f_ + 1) { txn = std::move(it->second); data_.erase(it); } } } - if(txn != nullptr){ + if (txn != nullptr) { commit_(*txn); } return true; diff --git a/platform/consensus/ordering/poe/algorithm/poe.h b/platform/consensus/ordering/poe/algorithm/poe.h index 20cc71a93..65f7d018c 100644 --- a/platform/consensus/ordering/poe/algorithm/poe.h +++ b/platform/consensus/ordering/poe/algorithm/poe.h @@ -13,7 +13,7 @@ namespace resdb { namespace poe { -class PoE: public common::ProtocolBase { +class PoE : public common::ProtocolBase { public: PoE(int id, int f, int total_num, SignatureVerifier* verifier); ~PoE(); @@ -31,10 +31,10 @@ class PoE: public common::ProtocolBase { std::map > data_; int64_t seq_; - bool is_stop_; + bool is_stop_; SignatureVerifier* verifier_; Stats* global_stats_; }; -} // namespace cassandra +} // namespace poe } // namespace resdb diff --git a/platform/consensus/ordering/poe/framework/BUILD b/platform/consensus/ordering/poe/framework/BUILD index 7030d2a0d..833a121b8 100644 --- a/platform/consensus/ordering/poe/framework/BUILD +++ b/platform/consensus/ordering/poe/framework/BUILD @@ -13,4 +13,3 @@ cc_library( "//platform/consensus/ordering/poe/algorithm:poe", ], ) - diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp b/platform/consensus/ordering/poe/framework/consensus.cpp index b401adaf6..21763d464 100644 --- a/platform/consensus/ordering/poe/framework/consensus.cpp +++ b/platform/consensus/ordering/poe/framework/consensus.cpp @@ -35,7 +35,7 @@ namespace poe { Consensus::Consensus(const ResDBConfig& config, std::unique_ptr executor) - : common::Consensus(config, std::move(executor)){ + : common::Consensus(config, std::move(executor)) { int total_replicas = config_.GetReplicaNum(); int f = (total_replicas - 1) / 3; @@ -47,9 +47,8 @@ Consensus::Consensus(const ResDBConfig& config, .public_key() .public_key_info() .type() != CertificateKeyInfo::CLIENT) { - poe_ = std::make_unique( - config_.GetSelfInfo().id(), f, - total_replicas, GetSignatureVerifier()); + poe_ = std::make_unique(config_.GetSelfInfo().id(), f, total_replicas, + GetSignatureVerifier()); InitProtocol(poe_.get()); } } @@ -73,7 +72,7 @@ int Consensus::ProcessCustomConsensus(std::unique_ptr request) { } poe_->ReceivePrepare(std::move(proposal)); return 0; - } + } return 0; } diff --git a/platform/consensus/ordering/poe/framework/consensus.h b/platform/consensus/ordering/poe/framework/consensus.h index 72e56e181..df51be379 100644 --- a/platform/consensus/ordering/poe/framework/consensus.h +++ b/platform/consensus/ordering/poe/framework/consensus.h @@ -55,5 +55,5 @@ class Consensus : public common::Consensus { int send_num_[200]; }; -} // namespace cassandra +} // namespace poe } // namespace resdb diff --git a/platform/consensus/recovery/recovery.cpp b/platform/consensus/recovery/recovery.cpp index fb1f6d50a..51faad5ce 100644 --- a/platform/consensus/recovery/recovery.cpp +++ b/platform/consensus/recovery/recovery.cpp @@ -511,15 +511,18 @@ void Recovery::ReadLogsFromFiles( if (request_list.size() == 0) { ftruncate(fd, 0); } + uint64_t max_seq = 0; for (std::unique_ptr& recovery_data : request_list) { if (ckpt < recovery_data->request->seq()) { recovery_data->request->set_is_recovery(true); + max_seq = recovery_data->request->seq(); call_back(std::move(recovery_data->context), std::move(recovery_data->request)); } } - LOG(INFO) << "read log from files:" << path << " done"; + LOG(ERROR) << "read log from files:" << path << " done" + << " recovery max seq:" << max_seq; close(fd); } diff --git a/platform/networkstrate/async_acceptor.cpp b/platform/networkstrate/async_acceptor.cpp index 9fd85a069..b61a510b9 100644 --- a/platform/networkstrate/async_acceptor.cpp +++ b/platform/networkstrate/async_acceptor.cpp @@ -77,7 +77,7 @@ void AsyncAcceptor::Session::ReadDone() { delete recv_buffer_; } else { data_size_ = *reinterpret_cast(recv_buffer_); - if (data_size_ > 1e6) { + if (data_size_ > 1e10) { LOG(ERROR) << "read data size:" << data_size_ << " data size:" << sizeof(data_size_) << " close socket"; Close(); diff --git a/platform/networkstrate/consensus_manager.cpp b/platform/networkstrate/consensus_manager.cpp index 00b708109..b3fb10625 100644 --- a/platform/networkstrate/consensus_manager.cpp +++ b/platform/networkstrate/consensus_manager.cpp @@ -90,46 +90,9 @@ void ConsensusManager::HeartBeat() { std::mutex mutex; std::condition_variable cv; while (IsRunning()) { - auto keys = verifier_->GetAllPublicKeys(); - - std::vector replicas = GetAllReplicas(); - LOG(ERROR) << "all replicas:" << replicas.size(); - std::vector client_replicas = GetClientReplicas(); - HeartBeatInfo hb_info; - for (const auto& key : keys) { - *hb_info.add_public_keys() = key; - } - for (const auto& client : client_replicas) { - replicas.push_back(client); - } - auto client = GetReplicaClient(replicas, false); - if (client == nullptr) { - continue; - } - - // If it is not a client node, broadcost the current primary to the client. - if (config_.GetPublicKeyCertificateInfo() - .public_key() - .public_key_info() - .type() == CertificateKeyInfo::REPLICA) { - hb_info.set_primary(GetPrimary()); - hb_info.set_version(GetVersion()); - } - LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB" - << " is ready:" << is_ready_ - << " client size:" << client_replicas.size() - << " svr size:" << replicas.size(); - - Request request; - request.set_type(Request::TYPE_HEART_BEAT); - request.mutable_region_info()->set_region_id( - config_.GetConfigData().self_region_id()); - hb_info.SerializeToString(request.mutable_data()); - - int ret = client->SendHeartBeat(request); - if (ret <= 0) { - LOG(ERROR) << " server:" << config_.GetSelfInfo().id() - << " sends HB fail:" << ret; + { + std::unique_lock lk(hb_mutex_); + SendHeartBeat(); } std::unique_lock lk(mutex); cv.wait_for(lk, std::chrono::microseconds(sleep_time * 1000000), @@ -138,12 +101,61 @@ void ConsensusManager::HeartBeat() { if (config_.IsTestMode()) { sleep_time = 1; } else { - sleep_time = 60 * 2; + sleep_time = 60; } } } } +void ConsensusManager::SendHeartBeat() { + auto keys = verifier_->GetAllPublicKeys(); + + std::vector replicas = GetAllReplicas(); + LOG(ERROR) << "all replicas:" << replicas.size(); + std::vector client_replicas = GetClientReplicas(); + HeartBeatInfo hb_info; + hb_info.set_sender(config_.GetSelfInfo().id()); + hb_info.set_ip(config_.GetSelfInfo().ip()); + hb_info.set_port(config_.GetSelfInfo().port()); + hb_info.set_hb_version(version_); + for (const auto& key : keys) { + *hb_info.add_public_keys() = key; + hb_info.add_node_version(hb_[key.public_key_info().node_id()]); + } + for (const auto& client : client_replicas) { + replicas.push_back(client); + } + auto client = GetReplicaClient(replicas, false); + if (client == nullptr) { + return; + } + + // If it is not a client node, broadcost the current primary to the client. + if (config_.GetPublicKeyCertificateInfo() + .public_key() + .public_key_info() + .type() == CertificateKeyInfo::REPLICA) { + hb_info.set_primary(GetPrimary()); + hb_info.set_version(GetVersion()); + } + LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB" + << " is ready:" << is_ready_ + << " client size:" << client_replicas.size() + << " svr size:" << replicas.size(); + + Request request; + request.set_type(Request::TYPE_HEART_BEAT); + request.mutable_region_info()->set_region_id( + config_.GetConfigData().self_region_id()); + hb_info.SerializeToString(request.mutable_data()); + + int ret = client->SendHeartBeat(request); + if (ret <= 0) { + LOG(ERROR) << " server:" << config_.GetSelfInfo().id() + << " sends HB fail:" << ret; + } +} + // Porcess the packages received from the network. // context contains the client socket which can be used for sending response to // the client, the signature for the request will be filled inside the context @@ -158,35 +170,36 @@ int ConsensusManager::Process(std::unique_ptr context, return -1; } + std::unique_ptr request = std::make_unique(); + if (!request->ParseFromString(message.data())) { + LOG(ERROR) << "parse data info fail"; + return -1; + } + + if (request->type() == Request::TYPE_HEART_BEAT) { + return Dispatch(std::move(context), std::move(request)); + } + // Check if the certificate is valid. if (message.has_signature() && verifier_) { bool valid = verifier_->VerifyMessage(message.data(), message.signature()); if (!valid) { LOG(ERROR) << "request is not valid:" << message.signature().DebugString(); - LOG(ERROR) << " msg:" << message.data().size(); + LOG(ERROR) << " msg:" << message.data().size() + << " is recovery:" << request->is_recovery(); return -2; } } else { } - std::unique_ptr request = std::make_unique(); - if (!request->ParseFromString(message.data())) { - LOG(ERROR) << "parse data info fail"; - return -1; - } - - std::string tmp; - if (!request->SerializeToString(&tmp)) { - return -1; - } - // forward the signature to the request so that it can be included in the // request/response set if needed. context->signature = message.signature(); // LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id() // << " get request type:" << request->type() // << " from:" << request->sender_id(); + return Dispatch(std::move(context), std::move(request)); } @@ -202,6 +215,7 @@ int ConsensusManager::Dispatch(std::unique_ptr context, int ConsensusManager::ProcessHeartBeat(std::unique_ptr context, std::unique_ptr request) { + std::unique_lock lk(hb_mutex_); std::vector replicas = GetReplicas(); HeartBeatInfo hb_info; if (!hb_info.ParseFromString(request->data())) { @@ -212,7 +226,10 @@ int ConsensusManager::ProcessHeartBeat(std::unique_ptr context, LOG(ERROR) << "receive public size:" << hb_info.public_keys().size() << " primary:" << hb_info.primary() << " version:" << hb_info.version() - << " from region:" << request->region_info().region_id(); + << " from region:" << request->region_info().region_id() + << " sender:" << hb_info.sender() + << " last send:" << hb_info.hb_version() + << " current v:" << hb_[hb_info.sender()]; if (request->region_info().region_id() == config_.GetConfigData().self_region_id()) { @@ -261,6 +278,18 @@ int ConsensusManager::ProcessHeartBeat(std::unique_ptr context, } } } + + if (!hb_info.ip().empty() && hb_info.hb_version() > 0 && + hb_[hb_info.sender()] != hb_info.hb_version()) { + ReplicaInfo info; + info.set_ip(hb_info.ip()); + info.set_port(hb_info.port()); + info.set_id(hb_info.sender()); + // bc_client_->Flush(info); + hb_[hb_info.sender()] = hb_info.hb_version(); + SendHeartBeat(); + } + if (!is_ready_ && replica_num >= config_.GetMinDataReceiveNum()) { LOG(ERROR) << "============ Server " << config_.GetSelfInfo().id() << " is ready " diff --git a/platform/networkstrate/consensus_manager.h b/platform/networkstrate/consensus_manager.h index fffc9fc7b..57ecc836f 100644 --- a/platform/networkstrate/consensus_manager.h +++ b/platform/networkstrate/consensus_manager.h @@ -89,6 +89,7 @@ class ConsensusManager : public ServiceInterface { private: void HeartBeat(); + void SendHeartBeat(); void BroadCastThread(); protected: @@ -105,6 +106,9 @@ class ConsensusManager : public ServiceInterface { std::unique_ptr bc_client_; std::vector clients_; Stats* global_stats_; + uint64_t version_; + std::map hb_; + std::mutex hb_mutex_; }; } // namespace resdb diff --git a/platform/networkstrate/replica_communicator.cpp b/platform/networkstrate/replica_communicator.cpp index 7083fe6ec..de0961252 100644 --- a/platform/networkstrate/replica_communicator.cpp +++ b/platform/networkstrate/replica_communicator.cpp @@ -257,7 +257,7 @@ void ReplicaCommunicator::SendMessage(const google::protobuf::Message& message, } if (target_replica.ip().empty()) { - LOG(ERROR) << "no replica info node:"< + #include + #include "common/utils/utils.h" #include "proto/kv/kv.pb.h" @@ -67,16 +69,19 @@ Stats::Stats(int sleep_time) { global_thread_ = std::thread(&Stats::MonitorGlobal, this); // pass by reference - transaction_summary_.port=-1; + transaction_summary_.port = -1; - //Setup websocket here + // Setup websocket here make_faulty_.store(false); - transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min(); - transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min(); - transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min(); - transaction_summary_.execution_time=std::chrono::system_clock::time_point::min(); - transaction_summary_.txn_number=0; - + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.prepare_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.commit_state_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.execution_time = + std::chrono::system_clock::time_point::min(); + transaction_summary_.txn_number = 0; } void Stats::Stop() { stop_ = true; } @@ -86,107 +91,122 @@ Stats::~Stats() { if (global_thread_.joinable()) { global_thread_.join(); } - if(enable_resview && crow_thread_.joinable()){ + if (enable_resview && crow_thread_.joinable()) { crow_thread_.join(); } } -void Stats::CrowRoute(){ +void Stats::CrowRoute() { crow::SimpleApp app; - while(!stop_){ - try{ - CROW_ROUTE(app, "/consensus_data").methods("GET"_method)([this](const crow::request& req, crow::response& res){ - LOG(ERROR)<<"API 1"; - res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - res.body=consensus_history_.dump(); - res.end(); - }); - CROW_ROUTE(app, "/get_status").methods("GET"_method)([this](const crow::request& req, crow::response& res){ - LOG(ERROR)<<"API 2"; - res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - res.body= IsFaulty() ? "Faulty" : "Not Faulty"; - res.end(); - }); - CROW_ROUTE(app, "/make_faulty").methods("GET"_method)([this](const crow::request& req, crow::response& res){ - LOG(ERROR)<<"API 3"; - res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests from any origin - res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // Specify allowed methods - res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers - - // Send your response - if(enable_faulty_switch_){ - make_faulty_.store(!make_faulty_.load()); - } - res.body= "Success"; - res.end(); - }); - app.port(8500+transaction_summary_.port).multithreaded().run(); + while (!stop_) { + try { + CROW_ROUTE(app, "/consensus_data") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 1"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + res.body = consensus_history_.dump(); + res.end(); + }); + CROW_ROUTE(app, "/get_status") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 2"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + res.body = IsFaulty() ? "Faulty" : "Not Faulty"; + res.end(); + }); + CROW_ROUTE(app, "/make_faulty") + .methods("GET"_method)([this](const crow::request& req, + crow::response& res) { + LOG(ERROR) << "API 3"; + res.set_header("Access-Control-Allow-Origin", + "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", + "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header( + "Access-Control-Allow-Headers", + "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + if (enable_faulty_switch_) { + make_faulty_.store(!make_faulty_.load()); + } + res.body = "Success"; + res.end(); + }); + app.port(8500 + transaction_summary_.port).multithreaded().run(); sleep(1); - } - catch( const std::exception& e){ + } catch (const std::exception& e) { } } app.stop(); } -bool Stats::IsFaulty(){ - return make_faulty_.load(); -} +bool Stats::IsFaulty() { return make_faulty_.load(); } -void Stats::ChangePrimary(int primary_id){ - transaction_summary_.primary_id=primary_id; +void Stats::ChangePrimary(int primary_id) { + transaction_summary_.primary_id = primary_id; make_faulty_.store(false); } -void Stats::SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag){ - transaction_summary_.replica_id=replica_id; - transaction_summary_.ip=ip; - transaction_summary_.port=port; - enable_resview=resview_flag; - enable_faulty_switch_=faulty_flag; - if(resview_flag){ +void Stats::SetProps(int replica_id, std::string ip, int port, + bool resview_flag, bool faulty_flag) { + transaction_summary_.replica_id = replica_id; + transaction_summary_.ip = ip; + transaction_summary_.port = port; + enable_resview = resview_flag; + enable_faulty_switch_ = faulty_flag; + if (resview_flag) { crow_thread_ = std::thread(&Stats::CrowRoute, this); } } -void Stats::SetPrimaryId(int primary_id){ - transaction_summary_.primary_id=primary_id; +void Stats::SetPrimaryId(int primary_id) { + transaction_summary_.primary_id = primary_id; } -void Stats::RecordStateTime(std::string state){ - if(!enable_resview){ +void Stats::RecordStateTime(std::string state) { + if (!enable_resview) { return; } - if(state=="request" || state=="pre-prepare"){ - transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::now(); - } - else if(state=="prepare"){ - transaction_summary_.prepare_state_time=std::chrono::system_clock::now(); - } - else if(state=="commit"){ - transaction_summary_.commit_state_time=std::chrono::system_clock::now(); + if (state == "request" || state == "pre-prepare") { + transaction_summary_.request_pre_prepare_state_time = + std::chrono::system_clock::now(); + } else if (state == "prepare") { + transaction_summary_.prepare_state_time = std::chrono::system_clock::now(); + } else if (state == "commit") { + transaction_summary_.commit_state_time = std::chrono::system_clock::now(); } } -void Stats::GetTransactionDetails(BatchUserRequest batch_request){ - if(!enable_resview){ +void Stats::GetTransactionDetails(BatchUserRequest batch_request) { + if (!enable_resview) { return; } - transaction_summary_.txn_number=batch_request.seq(); + transaction_summary_.txn_number = batch_request.seq(); transaction_summary_.txn_command.clear(); transaction_summary_.txn_key.clear(); transaction_summary_.txn_value.clear(); for (auto& sub_request : batch_request.user_requests()) { KVRequest kv_request; - if(!kv_request.ParseFromString(sub_request.request().data())){ + if (!kv_request.ParseFromString(sub_request.request().data())) { break; } if (kv_request.cmd() == KVRequest::SET) { @@ -209,53 +229,70 @@ void Stats::GetTransactionDetails(BatchUserRequest batch_request){ } } -void Stats::SendSummary(){ - if(!enable_resview){ +void Stats::SendSummary() { + if (!enable_resview) { return; } - transaction_summary_.execution_time=std::chrono::system_clock::now(); - - //Convert Transaction Summary to JSON - summary_json_["replica_id"]=transaction_summary_.replica_id; - summary_json_["ip"]=transaction_summary_.ip; - summary_json_["port"]=transaction_summary_.port; - summary_json_["primary_id"]=transaction_summary_.primary_id; - summary_json_["propose_pre_prepare_time"]=transaction_summary_.request_pre_prepare_state_time.time_since_epoch().count(); - summary_json_["prepare_time"]=transaction_summary_.prepare_state_time.time_since_epoch().count(); - summary_json_["commit_time"]=transaction_summary_.commit_state_time.time_since_epoch().count(); - summary_json_["execution_time"]=transaction_summary_.execution_time.time_since_epoch().count(); - for(size_t i=0; iInc(PREPARE, 1); } num_prepare_++; - transaction_summary_.prepare_message_count_times_list.push_back(std::chrono::system_clock::now()); + transaction_summary_.prepare_message_count_times_list.push_back( + std::chrono::system_clock::now()); } void Stats::IncCommit() { @@ -431,12 +469,11 @@ void Stats::IncCommit() { prometheus_->Inc(COMMIT, 1); } num_commit_++; - transaction_summary_.commit_message_count_times_list.push_back(std::chrono::system_clock::now()); + transaction_summary_.commit_message_count_times_list.push_back( + std::chrono::system_clock::now()); } -void Stats::IncPendingExecute() { - pending_execute_++; -} +void Stats::IncPendingExecute() { pending_execute_++; } void Stats::IncExecute() { execute_++; } diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index 621aaf84f..0ca8dd1e2 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -19,17 +19,18 @@ #pragma once +#include + #include #include - -#include "platform/statistic/prometheus_handler.h" -#include "platform/proto/resdb.pb.h" -#include "proto/kv/kv.pb.h" -#include "platform/common/network/tcp_socket.h" #include + #include "boost/asio.hpp" #include "boost/beast.hpp" -#include +#include "platform/common/network/tcp_socket.h" +#include "platform/proto/resdb.pb.h" +#include "platform/statistic/prometheus_handler.h" +#include "proto/kv/kv.pb.h" namespace asio = boost::asio; namespace beast = boost::beast; @@ -37,44 +38,46 @@ using tcp = asio::ip::tcp; namespace resdb { -struct VisualData{ - //Set when initializing - int replica_id; - int primary_id; - std::string ip; - int port; - - //Set when new txn is received - int txn_number; - std::vector txn_command; - std::vector txn_key; - std::vector txn_value; - - //Request state if primary_id==replica_id, pre_prepare state otherwise - std::chrono::system_clock::time_point request_pre_prepare_state_time; - std::chrono::system_clock::time_point prepare_state_time; - std::vector prepare_message_count_times_list; - std::chrono::system_clock::time_point commit_state_time; - std::vector commit_message_count_times_list; - std::chrono::system_clock::time_point execution_time; +struct VisualData { + // Set when initializing + int replica_id; + int primary_id; + std::string ip; + int port; + + // Set when new txn is received + int txn_number; + std::vector txn_command; + std::vector txn_key; + std::vector txn_value; + + // Request state if primary_id==replica_id, pre_prepare state otherwise + std::chrono::system_clock::time_point request_pre_prepare_state_time; + std::chrono::system_clock::time_point prepare_state_time; + std::vector + prepare_message_count_times_list; + std::chrono::system_clock::time_point commit_state_time; + std::vector + commit_message_count_times_list; + std::chrono::system_clock::time_point execution_time; }; -class Stats{ +class Stats { public: static Stats* GetGlobalStats(int sleep_seconds = 5); void Stop(); - void RetrieveProgress(); - void SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag); - void SetPrimaryId(int primary_id); - void RecordStateTime(std::string state); - void GetTransactionDetails(BatchUserRequest batch_request); - void SendSummary(); - void CrowRoute(); - bool IsFaulty(); - void ChangePrimary(int primary_id); - + void RetrieveProgress(); + void SetProps(int replica_id, std::string ip, int port, bool resview_flag, + bool faulty_flag); + void SetPrimaryId(int primary_id); + void RecordStateTime(std::string state); + void GetTransactionDetails(BatchUserRequest batch_request); + void SendSummary(); + void CrowRoute(); + bool IsFaulty(); + void ChangePrimary(int primary_id); void AddLatency(uint64_t run_time); diff --git a/scripts/deploy/config/kv_server.conf b/scripts/deploy/config/kv_server.conf index a811f7681..500e34d6c 100644 --- a/scripts/deploy/config/kv_server.conf +++ b/scripts/deploy/config/kv_server.conf @@ -1,8 +1,8 @@ iplist=( -172.31.52.247 -172.31.54.193 -172.31.55.48 -172.31.53.140 +172.31.57.186 +172.31.57.186 +172.31.57.186 +172.31.57.186 172.31.57.186 ) diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh index 87ab7dc92..6da8deb8d 100755 --- a/scripts/deploy/script/deploy.sh +++ b/scripts/deploy/script/deploy.sh @@ -14,6 +14,7 @@ server=//service/kv:kv_service fi # obtain the src path +main_folder=resilientdb_app server_path=`echo "$server" | sed 's/:/\//g'` server_path=${server_path:1} server_name=`echo "$server" | awk -F':' '{print $NF}'` @@ -60,10 +61,12 @@ fi # commands functions function run_cmd(){ count=1 + idx=1 for ip in ${deploy_iplist[@]}; do - ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$1" & + ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "cd ${main_folder}/$idx; $1" & ((count++)) + ((idx++)) done while [ $count -gt 0 ]; do @@ -76,6 +79,15 @@ function run_one_cmd(){ ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$1" } +idx=1 +for ip in ${deploy_iplist[@]}; +do + run_one_cmd "mkdir -p ${main_folder}/$idx" & + ((count++)) + ((idx++)) +done + + run_cmd "killall -9 ${server_bin}" run_cmd "rm -rf ${server_bin}; rm ${server_bin}*.log; rm -rf server.config; rm -rf cert;" @@ -83,11 +95,13 @@ sleep 1 # upload config files and binary echo "upload configs" +idx=1 count=0 for ip in ${deploy_iplist[@]}; do - scp -i ${key} -r ${bin_path} ${output_path}/server.config ${output_path}/cert ubuntu@${ip}:/home/ubuntu/ & + scp -i ${key} -r ${bin_path} ${output_path}/server.config ${output_path}/cert ubuntu@${ip}:/home/ubuntu/${main_folder}/$idx & ((count++)) + ((idx++)) done while [ $count -gt 0 ]; do @@ -103,9 +117,10 @@ for ip in ${deploy_iplist[@]}; do private_key="cert/node_"${idx}".key.pri" cert="cert/cert_"${idx}".cert" - run_one_cmd "nohup ./${server_bin} server.config ${private_key} ${cert} ${grafna_port} > ${server_bin}.log 2>&1 &" & + run_one_cmd "cd ${main_folder}/$idx; nohup ./${server_bin} server.config ${private_key} ${cert} ${grafna_port} > ${server_bin}.log 2>&1 &" & ((count++)) ((idx++)) + ((grafna_port++)) done while [ $count -gt 0 ]; do @@ -120,7 +135,7 @@ do resp="" while [ "$resp" = "" ] do - resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "grep \"receive public size:${#iplist[@]}\" ${server_bin}.log"` + resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "cd ${main_folder}/$idx; grep \"receive public size:${#iplist[@]}\" ${server_bin}.log"` if [ "$resp" = "" ]; then sleep 1 fi diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp index 11272aa48..6cd20e1ef 100644 --- a/service/kv/kv_service.cpp +++ b/service/kv/kv_service.cpp @@ -60,6 +60,7 @@ int main(int argc, char** argv) { exit(0); } google::InitGoogleLogging(argv[0]); + FLAGS_minloglevel = 1; char* config_file = argv[1]; char* private_key_file = argv[2]; diff --git a/service/tools/kv/api_tools/kv_client_txn_tools.cpp b/service/tools/kv/api_tools/kv_client_txn_tools.cpp index 4ebff4de3..55f0f6c49 100644 --- a/service/tools/kv/api_tools/kv_client_txn_tools.cpp +++ b/service/tools/kv/api_tools/kv_client_txn_tools.cpp @@ -49,8 +49,6 @@ int main(int argc, char** argv) { ResDBTxnAccessor client(config); auto resp = client.GetTxn(min_seq, max_seq); - absl::StatusOr>> GetTxn( - uint64_t min_seq, uint64_t max_seq); if (!resp.ok()) { LOG(ERROR) << "get replica state fail"; exit(1); diff --git a/third_party/BUILD b/third_party/BUILD index b43e9230e..0ec7614b8 100644 --- a/third_party/BUILD +++ b/third_party/BUILD @@ -51,4 +51,4 @@ cc_library( deps = [ "@com_crowcpp_crow//:crow", ], -) \ No newline at end of file +)