Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent load disk when cs starts #880 #882

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions src/chunkserver/block_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,47 @@ int64_t BlockManager::DiskQuota() const {
return disk_quota_;
}

// TODO: concurrent load
bool BlockManager::LoadStorage() {
bool ret = true;
// return value from Disk::LoadStorage:
// 0: initial state, 1: done, -1: error occured
std::vector<int> ret_vals;
ret_vals.resize(disks_.size(), 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以直接写成
std::vector ret_vals(disks_.size(), 0);

ThreadPool tp(disks_.size());
int disk_index = 0;
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
Disk* disk = it->second;
ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock,
std::function<bool (int64_t, Disk*, BlockMeta)> callback = std::bind(&BlockManager::AddBlock,
this, std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
disk_quota_ += disk->GetQuota();
std::placeholders::_3);
tp.AddTask(std::bind(&Disk::LoadStorage, disk, callback, &(ret_vals[disk_index])));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用不着这么多层括号吧, &ret_vals[disk_index]应该就是可以的

++disk_index;
}
bool ret = true;
while (true) {
sleep(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉没必要上来就sleep一秒, 说不定已经完成了呢
sleep可以放循环的最后

bool done = true;
for (auto it = ret_vals.begin(); it != ret_vals.end(); ++it) {
if (*it < 0) {
ret = false;
break;
} else if (*it == 0) {
done = false;
break;
}
}
if (done || !ret) {
break;
}
}
if (ret) {
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
Disk* disk = it->second;
disk_quota_ += disk->GetQuota();
}
}
tp.Stop(false);
LOG(INFO, "LoadStorage done. Quota = %ld", disk_quota_);
return ret;
}

Expand Down
16 changes: 10 additions & 6 deletions src/chunkserver/disk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ Disk::~Disk() {
metadb_ = NULL;
}

bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback) {
void Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val) {
MutexLock lock(&mu_);
int64_t start_load_time = common::timer::get_micros();
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status s = leveldb::DB::Open(options, path_ + "meta/", &metadb_);
if (!s.ok()) {
LOG(WARNING, "Load blocks fail: %s", s.ToString().c_str());
return false;
*ret_val = -1;
return;
}

std::string version_key(8, '\0');
Expand All @@ -61,7 +62,8 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
if (1 != sscanf(it->key().data(), "%ld", &block_id)) {
LOG(WARNING, "Unknown key: %s\n", it->key().ToString().c_str());
delete it;
return false;
*ret_val = -1;
return;
}
BlockMeta meta;
if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
Expand Down Expand Up @@ -97,13 +99,15 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
}
delete it;
int64_t end_load_time = common::timer::get_micros();
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld",
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000, namespace_version_);
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld, size %s",
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000,
namespace_version_, common::HumanReadableString(counters_.data_size.Get()).c_str());
if (namespace_version_ == 0 && block_num > 0) {
LOG(WARNING, "Namespace version lost!");
}
quota_ += counters_.data_size.Get();
return true;
*ret_val = 1;
return;
}

std::string Disk::Path() const {
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/disk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Disk {
Disk(const std::string& path, int64_t quota);
~Disk();
std::string Path() const;
bool LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback);
void LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val);
int64_t NamespaceVersion() const;
bool SetNamespaceVersion(int64_t version);
void Seek(int64_t block_id, std::vector<leveldb::Iterator*>* iters);
Expand Down
16 changes: 12 additions & 4 deletions src/chunkserver/test/data_block_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ TEST_F(DataBlockTest, CreateBlock) {
mkdir("./block123", 0755);
std::string file_path("./block123");
Disk disk(file_path, 1000000);
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
int ret_val;
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AddBlock,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3);
disk.LoadStorage(callback, &ret_val);
BlockMeta meta;
FileCache file_cache(10);
int64_t block_id = 123;
Expand All @@ -51,8 +55,12 @@ TEST_F(DataBlockTest, WriteAndReadBlock) {
mkdir("./block123", 0755);
std::string file_path("./block123");
Disk disk(file_path, 1000000);
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
int ret_val;
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AddBlock,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3);
disk.LoadStorage(callback, &ret_val);
FileCache file_cache(10);
int64_t block_id = 123;
meta.set_block_id(block_id);
Expand Down