Skip to content

Commit

Permalink
ResView Branch (#138)
Browse files Browse the repository at this point in the history
* "Added basic data collection, pointed out spots for web socket"

* "Added data gathering points and prints for testing"

* "Fixed bugs with compiling stats"

* "Changed message collection to be per message rather than queried periodically"

* "Added conversion of stats values to JSON, need to fix replica id setting"

* "Added more precision to timestamps in JSON"

* "Added new data to the summary view"

* "Got transaction detail collection working"

* "Changed to GETALLVALUES based on main repo"

* Changed Produced JSON to include txn_number

* "Added websocket to send to front end, slightly inconsistent"

* Add files via upload

* "Added ability to receive messages from front end"

* "Viewchange Update"

* "Removed possible infinite loop in sending summary"

* "Fixing file inclusion issue"

* "Added 2 new apis in same thread to save resources"

* "Adjusted make faulty endpoint"

* "Removed vestigial variables, turned off faulty switch for PR"

* "Fixed failing response manager test"
  • Loading branch information
Saipranav-Kotamreddy authored Mar 31, 2024
1 parent ca0ac0f commit 69ef583
Show file tree
Hide file tree
Showing 20 changed files with 552 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
build --cxxopt='-std=c++17' --copt=-O3 --jobs=40
#build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10"
build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10"
#build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10"

21 changes: 21 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,24 @@ http_archive(
strip_prefix = "json-3.9.1",
urls = ["https://github.com/nlohmann/json/archive/v3.9.1.tar.gz"],
)

http_archive(
name = "com_crowcpp_crow",
build_file = "//third_party:crow.BUILD",
sha256 = "f95128a8976fae6f2922823e07da59edae277a460776572a556a4b663ff5ee4b",
strip_prefix = "Crow-1.0-5",
url = "https://github.com/CrowCpp/Crow/archive/refs/tags/v1.0+5.zip",
)

bind(
name = "asio",
actual = "@com_chriskohlhoff_asio//:asio",
)

http_archive(
name = "com_chriskohlhoff_asio",
build_file = "//third_party:asio.BUILD",
sha256 = "babcdfd2c744905a73d20de211b51367bda0d5200f11d654c4314b909d8c963c",
strip_prefix = "asio-asio-1-26-0",
url = "https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-26-0.zip",
)
7 changes: 7 additions & 0 deletions common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ cc_library(
],
)

cc_library(
name= "beast",
deps = [
"@boost//:beast"
]
)

cc_library(
name = "boost_lockfree",
deps = [
Expand Down
10 changes: 5 additions & 5 deletions monitoring/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ scrape_configs:
- targets: ["localhost:9090"]
- job_name: "node_exporter1"
static_configs:
- targets: ["172.31.52.247:9100"]
- targets: ["localhost:9100"]
- job_name: "node_exporter2"
static_configs:
- targets: ["172.31.54.193:9100"]
- targets: ["localhost:9100"]
- job_name: "node_exporter3"
static_configs:
- targets: ["172.31.55.48:9100"]
- targets: ["localhost:9100"]
- job_name: "node_exporter4"
static_configs:
- targets: ["172.31.53.140:9100"]
- targets: ["localhost:9100"]
- job_name: "node_exporter5"
static_configs:
- targets: ["172.31.57.186:9100"]
- targets: ["localhost:9100"]
- job_name: "cpp_client1"
static_configs:
- targets: ["172.31.52.247:8090"]
Expand Down
3 changes: 2 additions & 1 deletion platform/consensus/execution/transaction_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// id:"<<request->proxy_id();
// std::unique_ptr<BatchUserResponse> batch_response =
// std::make_unique<BatchUserResponse>();

std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
Expand Down Expand Up @@ -217,6 +217,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request,
// std::make_unique<BatchUserResponse>();

std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_ && need_execute) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
Expand Down
1 change: 1 addition & 0 deletions platform/consensus/ordering/pbft/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ cc_library(
"//platform/consensus/execution:system_info",
"//platform/networkstrate:replica_communicator",
"//platform/proto:viewchange_message_cc_proto",
"//platform/statistic:stats",
],
)

Expand Down
17 changes: 16 additions & 1 deletion platform/consensus/ordering/pbft/commitment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Commitment::Commitment(const ResDBConfig& config,
global_stats_ = Stats::GetGlobalStats();
duplicate_manager_ = std::make_unique<DuplicateManager>(config);
message_manager_->SetDuplicateManager(duplicate_manager_.get());

global_stats_->SetProps(config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(), config_.GetConfigData().enable_faulty_switch());
global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary());
}

Commitment::~Commitment() {
Expand Down Expand Up @@ -78,6 +81,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context,
// << message_manager_->GetCurrentPrimary()
// << " seq:" << user_request->seq()
// << " hash:" << user_request->hash();
LOG(ERROR)<<"NOT PRIMARY, Primary is "<<message_manager_->GetCurrentPrimary();
replica_communicator_->SendMessage(*user_request,
message_manager_->GetCurrentPrimary());
{
Expand Down Expand Up @@ -134,6 +138,8 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context,
return -2;
}

global_stats_->RecordStateTime("request");

user_request->set_type(Request::TYPE_PRE_PREPARE);
user_request->set_current_view(message_manager_->GetCurrentView());
user_request->set_seq(*seq);
Expand All @@ -149,7 +155,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context,
// TODO check whether the sender is the primary.
int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
if (context == nullptr || context->signature.signature().empty()) {
if (global_stats_->IsFaulty() || context == nullptr || context->signature.signature().empty()) {
LOG(ERROR) << "user request doesn't contain signature, reject";
return -2;
}
Expand Down Expand Up @@ -181,6 +187,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
LOG(ERROR) << " check by the user func fail";
return -2;
}
//global_stats_->GetTransactionDetails(std::move(request));
BatchUserRequest batch_request;
batch_request.ParseFromString(request->data());
batch_request.clear_createtime();
Expand All @@ -202,6 +209,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
}

global_stats_->IncPropose();
global_stats_->RecordStateTime("pre-prepare");
std::unique_ptr<Request> prepare_request = resdb::NewRequest(
Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id());
prepare_request->clear_data();
Expand Down Expand Up @@ -253,6 +261,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context,
// LOG(ERROR) << "sign hash"
// << commit_request->data_signature().DebugString();
}
global_stats_->RecordStateTime("prepare");
replica_communicator_->BroadCast(*commit_request);
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
Expand All @@ -276,6 +285,11 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context,
// commit the request.
CollectorResultCode ret =
message_manager_->AddConsensusMsg(context->signature, std::move(request));
if (ret == CollectorResultCode::STATE_CHANGED) {
//LOG(ERROR)<<request->data().size();
//global_stats_->GetTransactionDetails(request->data());
global_stats_->RecordStateTime("commit");
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
}

Expand All @@ -287,6 +301,7 @@ int Commitment::PostProcessExecutedMsg() {
if (batch_resp == nullptr) {
continue;
}
global_stats_->SendSummary();
Request request;
request.set_hash(batch_resp->hash());
request.set_seq(batch_resp->seq());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
int ret = commitment_->ProcessNewRequest(std::move(context),
std::move(request));
if (ret == -3) {
LOG(ERROR)<<"BAD RETURN";
std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>
request_complained;
{
Expand Down
96 changes: 96 additions & 0 deletions platform/consensus/ordering/pbft/response_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@
#include "common/utils/utils.h"

namespace resdb {

ResponseClientTimeout::ResponseClientTimeout(std::string hash_,
uint64_t time_) {
this->hash = hash_;
this->timeout_time = time_;
}

ResponseClientTimeout::ResponseClientTimeout(
const ResponseClientTimeout& other) {
this->hash = other.hash;
this->timeout_time = other.timeout_time;
}

bool ResponseClientTimeout::operator<(
const ResponseClientTimeout& other) const {
return timeout_time > other.timeout_time;
}

ResponseManager::ResponseManager(const ResDBConfig& config,
ReplicaCommunicator* replica_communicator,
SystemInfo* system_info,
Expand All @@ -47,6 +65,10 @@ ResponseManager::ResponseManager(const ResDBConfig& config,
config_.IsTestMode()) {
user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
}
if(config_.GetConfigData().enable_viewchange()){
checking_timeout_thread_ =
std::thread(&ResponseManager::MonitoringClientTimeOut, this);
}
global_stats_ = Stats::GetGlobalStats();
send_num_ = 0;
}
Expand All @@ -56,6 +78,9 @@ ResponseManager::~ResponseManager() {
if (user_req_thread_.joinable()) {
user_req_thread_.join();
}
if(checking_timeout_thread_.joinable()){
checking_timeout_thread_.join();
}
}

// use system info
Expand Down Expand Up @@ -293,11 +318,82 @@ int ResponseManager::DoBatch(
batch_request.SerializeToString(new_request->mutable_data());
new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
new_request->set_proxy_id(config_.GetSelfInfo().id());
/*for(int i=1; i<=4; i++){
replica_communicator_->SendMessage(*new_request, i);
}*/
replica_communicator_->SendMessage(*new_request, GetPrimary());
send_num_++;
LOG(INFO) << "send msg to primary:" << GetPrimary()
<< " batch size:" << batch_req.size();
AddWaitingResponseRequest(std::move(new_request));
return 0;
}

void ResponseManager::AddWaitingResponseRequest(
std::unique_ptr<Request> request) {
if (!config_.GetConfigData().enable_viewchange()) {
return;
}
pm_lock_.lock();
uint64_t time = GetCurrentTime() + this->timeout_length_;
client_timeout_min_heap_.push(
ResponseClientTimeout(request->hash(), time));
waiting_response_batches_.insert(
make_pair(request->hash(), std::move(request)));
pm_lock_.unlock();
sem_post(&request_sent_signal_);
}

void ResponseManager::RemoveWaitingResponseRequest(std::string hash) {
if (!config_.GetConfigData().enable_viewchange()) {
return;
}
pm_lock_.lock();
if (waiting_response_batches_.find(hash) != waiting_response_batches_.end()) {
waiting_response_batches_.erase(waiting_response_batches_.find(hash));
}
pm_lock_.unlock();
}

bool ResponseManager::CheckTimeOut(std::string hash) {
pm_lock_.lock();
bool value =
(waiting_response_batches_.find(hash) != waiting_response_batches_.end());
pm_lock_.unlock();
return value;
}

std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(
std::string hash) {
pm_lock_.lock();
auto value = std::move(waiting_response_batches_.find(hash)->second);
pm_lock_.unlock();
return value;
}

void ResponseManager::MonitoringClientTimeOut() {
while (!stop_) {
sem_wait(&request_sent_signal_);
pm_lock_.lock();
if (client_timeout_min_heap_.empty()) {
pm_lock_.unlock();
continue;
}
auto client_timeout = client_timeout_min_heap_.top();
client_timeout_min_heap_.pop();
pm_lock_.unlock();

if (client_timeout.timeout_time > GetCurrentTime()) {
usleep(client_timeout.timeout_time - GetCurrentTime());
}

if (CheckTimeOut(client_timeout.hash)) {
auto request = GetTimeOutRequest(client_timeout.hash);
if (request) {
LOG(ERROR) << "Client Request Timeout " << client_timeout.hash;
replica_communicator_->BroadCast(*request);
}
}
}
}
} // namespace resdb
27 changes: 27 additions & 0 deletions platform/consensus/ordering/pbft/response_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#pragma once
#include <semaphore.h>

#include "platform/config/resdb_config.h"
#include "platform/consensus/ordering/pbft/lock_free_collector_pool.h"
Expand All @@ -27,6 +28,16 @@

namespace resdb {

class ResponseClientTimeout {
public:
ResponseClientTimeout(std::string hash_, uint64_t time_);
ResponseClientTimeout(const ResponseClientTimeout& other);
bool operator<(const ResponseClientTimeout& other) const;

std::string hash;
uint64_t timeout_time;
};

class ResponseManager {
public:
ResponseManager(const ResDBConfig& config,
Expand Down Expand Up @@ -65,6 +76,13 @@ class ResponseManager {
int BatchProposeMsg();
int GetPrimary();

void AddWaitingResponseRequest(std::unique_ptr<Request> request);
void RemoveWaitingResponseRequest(std::string hash);
bool CheckTimeOut(std::string hash);
void ResponseTimer(std::string hash);
void MonitoringClientTimeOut();
std::unique_ptr<Request> GetTimeOutRequest(std::string hash);

private:
ResDBConfig config_;
ReplicaCommunicator* replica_communicator_;
Expand All @@ -77,6 +95,15 @@ class ResponseManager {
SystemInfo* system_info_;
std::atomic<int> send_num_;
SignatureVerifier* verifier_;

std::thread checking_timeout_thread_;
std::map<std::string, std::unique_ptr<Request>> waiting_response_batches_;
std::priority_queue<ResponseClientTimeout> client_timeout_min_heap_;
std::mutex pm_lock_;
uint64_t timeout_length_;
sem_t request_sent_signal_;
uint64_t highest_seq_;
uint64_t highest_seq_primary_id_;
};

} // namespace resdb
Loading

0 comments on commit 69ef583

Please sign in to comment.