Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gc something at destory "NameSlaveImpl" instance and format logs #803

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/nameserver/logdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ LogDB::~LogDB() {
if (thread_pool_) {
thread_pool_->Stop(true);
}
delete thread_pool_;
if (write_log_) fclose(write_log_);
for (FileCache::iterator it = read_log_.begin(); it != read_log_.end(); ++it) {
fclose((it->second).first);
Expand Down
27 changes: 19 additions & 8 deletions src/nameserver/master_slave.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ MasterSlaveImpl::MasterSlaveImpl() : slave_stub_(NULL), exiting_(false), master_
}
}


MasterSlaveImpl::~MasterSlaveImpl() {
// Wait for threads done task.
if (thread_pool_) {
thread_pool_->Stop(true);
}
delete thread_pool_;
delete rpc_client_;
delete logdb_;
}

void MasterSlaveImpl::Init(std::function<void (const std::string& log)> callback) {
log_callback_ = callback;
if (logdb_->GetLargestIdx(&current_idx_) == kReadError) {
Expand Down Expand Up @@ -167,7 +178,7 @@ void MasterSlaveImpl::Log(const std::string& entry, std::function<void (bool)> c
thread_pool_->AddTask(std::bind(&MasterSlaveImpl::PorcessCallbck,this,
current_idx_, true));
} else {
LOG(DEBUG, "%s insert callback index = %d", kLogPrefix.c_str(), current_idx_);
LOG(DEBUG, "%s insert callback index = %ld", kLogPrefix.c_str(), current_idx_);
thread_pool_->DelayTask(10000, std::bind(&MasterSlaveImpl::PorcessCallbck,
this, current_idx_, true));
cond_.Signal();
Expand Down Expand Up @@ -253,7 +264,7 @@ void MasterSlaveImpl::ReplicateLog() {
mu_.Unlock();
break;
}
LOG(DEBUG, "%s ReplicateLog sync_idx_ = %d, current_idx_ = %d",
LOG(DEBUG, "%s ReplicateLog sync_idx_ = %ld, current_idx_ = %ld",
kLogPrefix.c_str(), sync_idx_, current_idx_);
mu_.Unlock();
std::string entry;
Expand All @@ -268,21 +279,21 @@ void MasterSlaveImpl::ReplicateLog() {
while (!rpc_client_->SendRequest(slave_stub_,
&master_slave::MasterSlave_Stub::AppendLog,
&request, &response, 15, 1)) {
LOG(WARNING, "%s Replicate log failed index = %d, current_idx_ = %d",
LOG(WARNING, "%s Replicate log failed index = %ld, current_idx_ = %ld",
kLogPrefix.c_str(), sync_idx_ + 1, current_idx_);
sleep(5);
}
if (!response.success()) { // log mismatch
MutexLock lock(&mu_);
sync_idx_ = response.index() - 1;
LOG(INFO, "[Sync] set sync_idx_ to %d", kLogPrefix.c_str(), sync_idx_);
LOG(INFO, "%s set sync_idx_ to %ld", kLogPrefix.c_str(), sync_idx_);
continue;
}
thread_pool_->AddTask(std::bind(&MasterSlaveImpl::PorcessCallbck,
this, sync_idx_ + 1, false));
mu_.Lock();
sync_idx_++;
LOG(DEBUG, "%s Replicate log done. sync_idx_ = %d, current_idx_ = %d",
LOG(DEBUG, "%s Replicate log done. sync_idx_ = %ld, current_idx_ = %ld",
kLogPrefix.c_str(), sync_idx_ , current_idx_);
mu_.Unlock();
}
Expand All @@ -296,7 +307,7 @@ void MasterSlaveImpl::PorcessCallbck(int64_t index, bool timeout_check) {
std::map<int64_t, std::function<void (bool)> >::iterator it = callbacks_.find(index);
if (it != callbacks_.end()) {
callback = it->second;
LOG(DEBUG, "%s calling callback %d", kLogPrefix.c_str(), it->first);
LOG(DEBUG, "%s calling callback %ld", kLogPrefix.c_str(), it->first);
callbacks_.erase(it);
mu_.Unlock();
callback(true);
Expand All @@ -306,7 +317,7 @@ void MasterSlaveImpl::PorcessCallbck(int64_t index, bool timeout_check) {
}
if (timeout_check) {
if (!master_only_) {
LOG(WARNING, "%s ReplicateLog sync_idx_ = %d timeout, enter master-only mode",
LOG(WARNING, "%s ReplicateLog sync_idx_ = %ld timeout, enter master-only mode",
kLogPrefix.c_str(), index);
}
master_only_ = true;
Expand All @@ -320,7 +331,7 @@ void MasterSlaveImpl::PorcessCallbck(int64_t index, bool timeout_check) {
}

void MasterSlaveImpl::LogStatus() {
LOG(INFO, "%s sync_idx_ = %d, current_idx_ = %d, applied_idx_ = %d, callbacks_ size = %d",
LOG(INFO, "%s sync_idx_ = %ld, current_idx_ = %ld, applied_idx_ = %ld, callbacks_ size = %d",
kLogPrefix.c_str(), sync_idx_, current_idx_, applied_idx_, callbacks_.size());
StatusCode ret_a = logdb_->WriteMarker("applied_idx", applied_idx_);
StatusCode ret_s = logdb_->WriteMarker("sync_idx", sync_idx_);
Expand Down
2 changes: 1 addition & 1 deletion src/nameserver/master_slave.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RpcClient;
class MasterSlaveImpl : public Sync, public master_slave::MasterSlave {
public:
MasterSlaveImpl();
virtual ~MasterSlaveImpl() {};
virtual ~MasterSlaveImpl();
virtual void Init(std::function<void (const std::string& log)> callback);
virtual bool IsLeader(std::string* leader_addr = NULL);
virtual bool Log(const std::string& entry, int timeout_ms = 10000);
Expand Down
1 change: 1 addition & 0 deletions src/nameserver/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ NameSpace::NameSpace(bool standalone): version_(0), last_entry_id_(1),
options.block_cache = leveldb::NewLRUCache(FLAGS_namedb_cache_size * 1024L * 1024L);
leveldb::Status s = leveldb::DB::Open(options, FLAGS_namedb_path, &db_);
if (!s.ok()) {
delete db_;
db_ = NULL;
LOG(ERROR, "Open leveldb fail: %s", s.ToString().c_str());
exit(EXIT_FAILURE);
Expand Down
11 changes: 9 additions & 2 deletions src/utils/meta_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ void CheckChunkserverMeta(const std::vector<std::string>& store_path_list) {
options.create_if_missing = true;
leveldb::DB* metadb;
leveldb::Status s = leveldb::DB::Open(options, path + "/meta/", &metadb);
meta_dbs[path] = metadb;
if (!s.ok()) {
LOG(ERROR, "[MetaCheck] Open meta on %s failed: %s", path.c_str(), s.ToString().c_str());
CloseMetaStore(meta_dbs);
exit(EXIT_FAILURE);
return;
}
Expand All @@ -56,19 +58,20 @@ void CheckChunkserverMeta(const std::vector<std::string>& store_path_list) {
LOG(INFO, "[MetaCheck] %s Load meta version %d", path.c_str(), cur_version);
if (meta_version != EMPTY_META && cur_version != meta_version) {
LOG(ERROR, "Cannot handle this situation!!!");
CloseMetaStore(meta_dbs);
exit(EXIT_FAILURE);
}
meta_version = cur_version;
} else if (s.IsNotFound()) {
if (meta_version != EMPTY_META && meta_version != 0) {
LOG(ERROR, "Cannot handle this situation!!!");
CloseMetaStore(meta_dbs);
exit(EXIT_FAILURE);
}
meta_version = 0;
LOG(INFO, "No meta version %s", path.c_str());
}
}
meta_dbs[path] = metadb;
}
if (meta_version == CHUNKSERVER_META_VERSION) {
LOG(INFO, "[MetaCheck] Chunkserver meta check pass");
Expand All @@ -78,6 +81,7 @@ void CheckChunkserverMeta(const std::vector<std::string>& store_path_list) {
ChunkserverMetaV02V1(meta_dbs);
} else {
LOG(ERROR, "[MetaCheck] Cannot handle this situation!!!");
CloseMetaStore(meta_dbs);
exit(EXIT_FAILURE);
}
SetChunkserverMetaVersion(meta_dbs);
Expand Down Expand Up @@ -106,6 +110,7 @@ void ChunkserverMetaV02V1(const std::map<std::string, leveldb::DB*>& meta_dbs) {
}
if (!src_meta) {
LOG(ERROR, "[MetaCheck] Cannot find a valid meta store");
CloseMetaStore(meta_dbs);
exit(EXIT_FAILURE);
}

Expand All @@ -114,6 +119,7 @@ void ChunkserverMetaV02V1(const std::map<std::string, leveldb::DB*>& meta_dbs) {
BlockMeta meta;
if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
LOG(ERROR, "[MetaCheck] Parse BlockMeta failed: key = %s", it->key().ToString().c_str());
CloseMetaStore(meta_dbs);
exit(EXIT_FAILURE);
}
const std::string& path = meta.store_path();
Expand Down Expand Up @@ -146,6 +152,7 @@ void SetChunkserverMetaVersion(const std::map<std::string, leveldb::DB*>& meta_d
leveldb::Status s = ldb->Put(leveldb::WriteOptions(), meta_key, meta_str);
if (!s.ok()) {
LOG(ERROR, "[MetaCheck] Put meta failed %s", it->first.c_str());
CloseMetaStore(meta_dbs);
exit(EXIT_FAILURE);
}
LOG(INFO, "[MetaCheck] Set meta version %s = %d", it->first.c_str(), CHUNKSERVER_META_VERSION);
Expand All @@ -159,4 +166,4 @@ void CloseMetaStore(const std::map<std::string, leveldb::DB*>& meta_dbs) {
}

} // namespace bfs
} // namespace baidu
} // namespace baidu