diff --git a/src/chunkserver/block_manager.cc b/src/chunkserver/block_manager.cc index ca04604b..9dd58b68 100644 --- a/src/chunkserver/block_manager.cc +++ b/src/chunkserver/block_manager.cc @@ -163,17 +163,47 @@ int64_t BlockManager::DiskQuota() const { return disk_quota_; } -// TODO: concurrent load bool BlockManager::LoadStorage() { - bool ret = true; + // return value from Disk::LoadStorage: + // 0: initial state, 1: done, -1: error occured + std::vector ret_vals; + ret_vals.resize(disks_.size(), 0); + ThreadPool tp(disks_.size()); + int disk_index = 0; for (auto it = disks_.begin(); it != disks_.end(); ++it) { Disk* disk = it->second; - ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock, + std::function callback = std::bind(&BlockManager::AddBlock, this, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)); - disk_quota_ += disk->GetQuota(); + std::placeholders::_3); + tp.AddTask(std::bind(&Disk::LoadStorage, disk, callback, &(ret_vals[disk_index]))); + ++disk_index; + } + bool ret = true; + while (true) { + sleep(1); + bool done = true; + for (auto it = ret_vals.begin(); it != ret_vals.end(); ++it) { + if (*it < 0) { + ret = false; + break; + } else if (*it == 0) { + done = false; + break; + } + } + if (done || !ret) { + break; + } + } + if (ret) { + for (auto it = disks_.begin(); it != disks_.end(); ++it) { + Disk* disk = it->second; + disk_quota_ += disk->GetQuota(); + } } + tp.Stop(false); + LOG(INFO, "LoadStorage done. Quota = %ld", disk_quota_); return ret; } diff --git a/src/chunkserver/disk.cc b/src/chunkserver/disk.cc index ad01b04a..cd04e3a5 100644 --- a/src/chunkserver/disk.cc +++ b/src/chunkserver/disk.cc @@ -35,7 +35,7 @@ Disk::~Disk() { metadb_ = NULL; } -bool Disk::LoadStorage(std::function callback) { +void Disk::LoadStorage(std::function callback, int* ret_val) { MutexLock lock(&mu_); int64_t start_load_time = common::timer::get_micros(); leveldb::Options options; @@ -43,7 +43,8 @@ bool Disk::LoadStorage(std::function callback) leveldb::Status s = leveldb::DB::Open(options, path_ + "meta/", &metadb_); if (!s.ok()) { LOG(WARNING, "Load blocks fail: %s", s.ToString().c_str()); - return false; + *ret_val = -1; + return; } std::string version_key(8, '\0'); @@ -61,7 +62,8 @@ bool Disk::LoadStorage(std::function callback) if (1 != sscanf(it->key().data(), "%ld", &block_id)) { LOG(WARNING, "Unknown key: %s\n", it->key().ToString().c_str()); delete it; - return false; + *ret_val = -1; + return; } BlockMeta meta; if (!meta.ParseFromArray(it->value().data(), it->value().size())) { @@ -97,13 +99,15 @@ bool Disk::LoadStorage(std::function callback) } delete it; int64_t end_load_time = common::timer::get_micros(); - LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld", - path_.c_str(), block_num, (end_load_time - start_load_time) / 1000, namespace_version_); + LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld, size %s", + path_.c_str(), block_num, (end_load_time - start_load_time) / 1000, + namespace_version_, common::HumanReadableString(counters_.data_size.Get()).c_str()); if (namespace_version_ == 0 && block_num > 0) { LOG(WARNING, "Namespace version lost!"); } quota_ += counters_.data_size.Get(); - return true; + *ret_val = 1; + return; } std::string Disk::Path() const { diff --git a/src/chunkserver/disk.h b/src/chunkserver/disk.h index bfe68530..37249441 100644 --- a/src/chunkserver/disk.h +++ b/src/chunkserver/disk.h @@ -36,7 +36,7 @@ class Disk { Disk(const std::string& path, int64_t quota); ~Disk(); std::string Path() const; - bool LoadStorage(std::function callback); + void LoadStorage(std::function callback, int* ret_val); int64_t NamespaceVersion() const; bool SetNamespaceVersion(int64_t version); void Seek(int64_t block_id, std::vector* iters); diff --git a/src/chunkserver/test/data_block_test.cc b/src/chunkserver/test/data_block_test.cc index e84b4bd9..38e65824 100644 --- a/src/chunkserver/test/data_block_test.cc +++ b/src/chunkserver/test/data_block_test.cc @@ -28,8 +28,12 @@ TEST_F(DataBlockTest, CreateBlock) { mkdir("./block123", 0755); std::string file_path("./block123"); Disk disk(file_path, 1000000); - disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + int ret_val; + std::function callback = std::bind(AddBlock, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3); + disk.LoadStorage(callback, &ret_val); BlockMeta meta; FileCache file_cache(10); int64_t block_id = 123; @@ -51,8 +55,12 @@ TEST_F(DataBlockTest, WriteAndReadBlock) { mkdir("./block123", 0755); std::string file_path("./block123"); Disk disk(file_path, 1000000); - disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + int ret_val; + std::function callback = std::bind(AddBlock, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3); + disk.LoadStorage(callback, &ret_val); FileCache file_cache(10); int64_t block_id = 123; meta.set_block_id(block_id);