Skip to content

Commit

Permalink
Concurrent load disk when cs starts #880
Browse files Browse the repository at this point in the history
  • Loading branch information
lylei committed Mar 28, 2017
1 parent 09abd6d commit ee8fb06
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 15 deletions.
39 changes: 35 additions & 4 deletions src/chunkserver/block_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,46 @@ int64_t BlockManager::DiskQuota() const {

// TODO: concurrent load
bool BlockManager::LoadStorage() {
bool ret = true;
// return value from Disk::LoadStorage:
// 0: initial state, 1: done, -1: error occured
std::vector<int> ret_vals;
ret_vals.resize(disks_.size(), 0);
ThreadPool tp(disks_.size());
int disk_index = 0;
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
Disk* disk = it->second;
ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock,
std::function<bool (int64_t, Disk*, BlockMeta)> callback = std::bind(&BlockManager::AddBlock,
this, std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
disk_quota_ += disk->GetQuota();
std::placeholders::_3);
tp.AddTask(std::bind(&Disk::LoadStorage, disk, callback, &(ret_vals[disk_index])));
++disk_index;
}
bool ret = true;
while (true) {
sleep(1);
bool done = true;
for (auto it = ret_vals.begin(); it != ret_vals.end(); ++it) {
if (*it < 0) {
ret = false;
break;
} else if (*it == 0) {
done = false;
break;
}
}
if (done || !ret) {
break;
}
}
if (ret) {
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
Disk* disk = it->second;
disk_quota_ += disk->GetQuota();
}
}
tp.Stop(false);
LOG(INFO, "LoadStorage done. Quota = %ld", disk_quota_);
return ret;
}

Expand Down
16 changes: 10 additions & 6 deletions src/chunkserver/disk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ Disk::~Disk() {
metadb_ = NULL;
}

bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback) {
void Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val) {
MutexLock lock(&mu_);
int64_t start_load_time = common::timer::get_micros();
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status s = leveldb::DB::Open(options, path_ + "meta/", &metadb_);
if (!s.ok()) {
LOG(WARNING, "Load blocks fail: %s", s.ToString().c_str());
return false;
*ret_val = -1;
return;
}

std::string version_key(8, '\0');
Expand All @@ -61,7 +62,8 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
if (1 != sscanf(it->key().data(), "%ld", &block_id)) {
LOG(WARNING, "Unknown key: %s\n", it->key().ToString().c_str());
delete it;
return false;
*ret_val = -1;
return;
}
BlockMeta meta;
if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
Expand Down Expand Up @@ -97,13 +99,15 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
}
delete it;
int64_t end_load_time = common::timer::get_micros();
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld",
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000, namespace_version_);
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld, size %s",
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000,
namespace_version_, common::HumanReadableString(counters_.data_size.Get()).c_str());
if (namespace_version_ == 0 && block_num > 0) {
LOG(WARNING, "Namespace version lost!");
}
quota_ += counters_.data_size.Get();
return true;
*ret_val = 1;
return;
}

std::string Disk::Path() const {
Expand Down
2 changes: 1 addition & 1 deletion src/chunkserver/disk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Disk {
Disk(const std::string& path, int64_t quota);
~Disk();
std::string Path() const;
bool LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback);
void LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val);
int64_t NamespaceVersion() const;
bool SetNamespaceVersion(int64_t version);
void Seek(int64_t block_id, std::vector<leveldb::Iterator*>* iters);
Expand Down
16 changes: 12 additions & 4 deletions src/chunkserver/test/data_block_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ TEST_F(DataBlockTest, CreateBlock) {
mkdir("./block123", 0755);
std::string file_path("./block123");
Disk disk(file_path, 1000000);
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
int ret_val;
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AddBlock,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3);
disk.LoadStorage(callback, &ret_val);
BlockMeta meta;
FileCache file_cache(10);
int64_t block_id = 123;
Expand All @@ -51,8 +55,12 @@ TEST_F(DataBlockTest, WriteAndReadBlock) {
mkdir("./block123", 0755);
std::string file_path("./block123");
Disk disk(file_path, 1000000);
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
int ret_val;
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AddBlock,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3);
disk.LoadStorage(callback, &ret_val);
FileCache file_cache(10);
int64_t block_id = 123;
meta.set_block_id(block_id);
Expand Down

0 comments on commit ee8fb06

Please sign in to comment.