Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dir lock #885

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ endif
ifdef FUSE_LL_PATH
BIN += bfs_ll_mount
endif
TESTS = namespace_test block_mapping_test location_provider_test logdb_test \
TESTS = namespace_test block_mapping_test \
block_mapping_manager_test location_provider_test logdb_test \
file_lock_manager_test file_lock_test chunkserver_impl_test \
file_cache_test block_manager_test data_block_test
TEST_OBJS = src/nameserver/test/namespace_test.o \
src/nameserver/test/block_mapping_test.o \
src/nameserver/test/block_mapping_manager_test.o \
src/nameserver/test/logdb_test.o \
src/nameserver/test/location_provider_test.o \
src/nameserver/test/kv_client.o \
Expand Down Expand Up @@ -137,6 +139,15 @@ block_mapping_test: src/nameserver/test/block_mapping_test.o src/nameserver/bloc
$(CXX) src/nameserver/block_mapping.o src/nameserver/test/block_mapping_test.o \
src/nameserver/block_mapping_manager.o $(OBJS) -o $@ $(LDFLAGS)

block_mapping_manager_test: src/nameserver/test/block_mapping_manager_test.o \
src/nameserver/block_mapping_manager.o \
src/nameserver/block_mapping.o
$(CXX) src/nameserver/block_mapping.o \
src/nameserver/block_mapping_manager.o \
src/nameserver/test/block_mapping_manager_test.o \
$(OBJS) -o $@ $(LDFLAGS)


logdb_test: src/nameserver/test/logdb_test.o src/nameserver/logdb.o
$(CXX) src/nameserver/logdb.o src/nameserver/test/logdb_test.o $(OBJS) -o $@ $(LDFLAGS)

Expand Down
15 changes: 15 additions & 0 deletions src/client/bfs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

DECLARE_string(flagfile);
DECLARE_string(nameserver_nodes);
DECLARE_int32(sdk_dir_lock_timeout);

void print_usage() {
printf("Use:\nbfs_client <command> path\n");
Expand All @@ -41,6 +42,8 @@ void print_usage() {
printf("\t stat : list current stat of the file system\n");
printf("\t ln <src> <dst>: create symlink\n");
printf("\t chmod <mode> <path> : change file mode bits\n");
printf("\t lockdir <path> : lock the dir\n");
printf("\t unlockdir <path> : unlock the dir\n");
}

int BfsTouchz(baidu::bfs::FS* fs, int argc, char* argv[]) {
Expand Down Expand Up @@ -514,6 +517,14 @@ int BfsShutdownStat(baidu::bfs::FS* fs) {
return 0;
}

int BfsLockDir(baidu::bfs::FS* fs, int argc, char* argv[]) {
return fs->LockDirectory(argv[0], FLAGS_sdk_dir_lock_timeout);
}

int BfsUnlockDir(baidu::bfs::FS* fs, int argc, char* argv[]) {
return fs->UnlockDirectory(argv[0], true);
}

/// bfs client main
int main(int argc, char* argv[]) {
FLAGS_flagfile = "./bfs.flag";
Expand Down Expand Up @@ -569,6 +580,10 @@ int main(int argc, char* argv[]) {
ret = BfsShutdownStat(fs);
} else if (strcmp(argv[1], "ln") == 0) {
ret = BfsLink(fs, argc - 2, argv + 2);
} else if (strcmp(argv[1], "lockdir") == 0) {
ret = BfsLockDir(fs, argc - 2, argv + 2);
} else if (strcmp(argv[1], "unlockdir") == 0) {
ret = BfsUnlockDir(fs, argc - 2, argv + 2);
} else {
fprintf(stderr, "Unknown command: %s\n", argv[1]);
}
Expand Down
1 change: 1 addition & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ DEFINE_int32(sdk_thread_num, 10, "Sdk thread num");
DEFINE_int32(sdk_file_reada_len, 1024*1024, "Read ahead buffer len");
DEFINE_int32(sdk_createblock_retry, 5, "Create block retry times before fail");
DEFINE_int32(sdk_write_retry_times, 5, "Write retry times before fail");
DEFINE_int32(sdk_dir_lock_timeout, 30, "Timeout for get dir lock");


/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
10 changes: 10 additions & 0 deletions src/nameserver/block_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,5 +969,15 @@ void BlockMapping::MarkIncomplete(int64_t block_id) {
}
}

RecoverStat BlockMapping::GetRecoverStat(int64_t block_id) {
MutexLock lock(&mu_);
NSBlock* block = NULL;
if (!GetBlockPtr(block_id, &block)) {
return kAny;
} else {
return block->recover_stat;
}
}

} // namespace bfs
} // namespace baidu
1 change: 1 addition & 0 deletions src/nameserver/block_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class BlockMapping {
void ListRecover(RecoverBlockSet* blocks);
int32_t GetCheckNum();
void MarkIncomplete(int64_t block_id);
RecoverStat GetRecoverStat(int64_t block_id);
private:
void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id);
typedef std::map<int32_t, std::set<int64_t> > CheckList;
Expand Down
7 changes: 7 additions & 0 deletions src/nameserver/block_mapping_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ void BlockMappingManager::MarkIncomplete(int64_t block_id) {
}

bool BlockMappingManager::CheckBlocksClosed(const std::vector<int64_t>& blocks) {
for (auto it = blocks.begin(); it != blocks.end(); ++it) {
int32_t bucket_offset = GetBucketOffset(*it);
RecoverStat stat = block_mapping_[bucket_offset]->GetRecoverStat(*it);
if (stat == kBlockWriting || stat == kIncomplete) {
return false;
}
}
return true;
}

Expand Down
103 changes: 80 additions & 23 deletions src/nameserver/nameserver_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
NameServerLog log;
std::vector<int64_t> blocks_to_remove;
FileLockGuard file_lock(new WriteLock(path));
StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remove, &log);
StatusCode status =
namespace_->CreateFile(path, flags, mode, replica_num,
&blocks_to_remove, request->uuid(), &log);
for (size_t i = 0; i < blocks_to_remove.size(); i++) {
block_mapping_manager_->RemoveBlock(blocks_to_remove[i]);
}
Expand Down Expand Up @@ -781,7 +783,8 @@ void NameServerImpl::Rename(::google::protobuf::RpcController* controller,
FileInfo remove_file;
NameServerLog log;
FileLockGuard file_lock_guard(new WriteLock(oldpath, newpath));
StatusCode status = namespace_->Rename(oldpath, newpath, &need_unlink, &remove_file, &log);
StatusCode status = namespace_->Rename(oldpath, newpath, &need_unlink,
&remove_file, request->uuid(), &log);
response->set_status(status);
if (status != kOK) {
done->Run();
Expand Down Expand Up @@ -846,7 +849,8 @@ void NameServerImpl::Unlink(::google::protobuf::RpcController* controller,
FileInfo file_info;
NameServerLog log;
FileLockGuard file_lock_guard(new WriteLock(path));
StatusCode status = namespace_->RemoveFile(path, &file_info, &log);
StatusCode status = namespace_->RemoveFile(path, &file_info,
request->uuid(), &log);
sofa::pbrpc::RpcController* ctl = reinterpret_cast<sofa::pbrpc::RpcController*>(controller);
LOG(INFO, "Sdk %s unlink file %s returns %s",
ctl->RemoteAddress().c_str(), path.c_str(), StatusCode_Name(status).c_str());
Expand Down Expand Up @@ -907,7 +911,8 @@ void NameServerImpl::DeleteDirectory(::google::protobuf::RpcController* controll
std::vector<FileInfo>* removed = new std::vector<FileInfo>;
NameServerLog log;
FileLockGuard file_lock_guard(new WriteLock(path));
StatusCode ret_status = namespace_->DeleteDirectory(path, recursive, removed, &log);
StatusCode ret_status = namespace_->DeleteDirectory(path, recursive, removed,
request->uuid(), &log);
sofa::pbrpc::RpcController* ctl = reinterpret_cast<sofa::pbrpc::RpcController*>(controller);
LOG(INFO, "Sdk %s delete directory %s returns %s",
ctl->RemoteAddress().c_str(), path.c_str(), StatusCode_Name(ret_status).c_str());
Expand Down Expand Up @@ -1008,31 +1013,72 @@ void NameServerImpl::LockDir(::google::protobuf::RpcController* controller,
done->Run();
return;
}

std::string path = NameSpace::NormalizePath(request->dir_path());
FileLockGuard lock_guard(new WriteLock(path));
StatusCode status = namespace_->GetDirLockStatus(path);
std::string parent_path(path, 0, path.find_last_of("/"));
FileInfo info;
if (!namespace_->CheckDirLockPermission(parent_path, request->uuid(), &info)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这info是干啥的啊,而且为啥是在外面剥掉一层才传进去。。

LOG(INFO, "%s has no permission, parent is locked by %s",
request->uuid().c_str(), info.dir_lock_holder_uuid().c_str());
response->set_status(kNoPermission);
done->Run();
}

std::string holder;
StatusCode status = namespace_->GetDirLockStatus(path, &holder);
LOG(INFO, "%s try lock dir %s", request->uuid().c_str(), path.c_str());
bool need_log_remote = false;
NameServerLog log;
if (status != kDirLocked) {
//TODO log remote?
if (status == kDirUnlock) {
namespace_->SetDirLockStatus(path, kDirLocked, request->uuid());
namespace_->SetDirLockStatus(path, kDirLocked, request->uuid(), &log);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个if的嵌套我没太懂啊。。。为啥不能是if(locked)() else if (cleaning) else if (unlock)?

LOG(INFO, "%s lock dir %s", request->uuid().c_str(), path.c_str());
status = kOK;
need_log_remote = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.entries_size()是不是就行了。而且LogRemote检查是不是更好

} else if (status == kDirLockCleaning) {
std::vector<int64_t> blocks;
namespace_->ListAllBlocks(path, &blocks);
if (block_mapping_manager_->CheckBlocksClosed(blocks)) {
//TODO log remote
namespace_->SetDirLockStatus(path, kDirUnlock);
namespace_->SetDirLockStatus(path, kDirUnlock,
request->uuid(), &log);
LOG(INFO, "%s lock dir %s",
request->uuid().c_str(), path.c_str());
status = kOK;
need_log_remote = true;
}
} // else status should be kBadParameter
} else {
//TODO log remote
namespace_->SetDirLockStatus(path, kDirLockCleaning);
status = kDirLockCleaning;
if (holder != request->uuid()) {
// must set dir lock stat to kDirLockCleaning before ListAllBlocks
namespace_->SetDirLockStatus(path, kDirLockCleaning,
request->uuid(), &log);
status = kDirLockCleaning;
need_log_remote = true;
std::vector<int64_t> blocks;
namespace_->ListAllBlocks(path, &blocks);
for (auto it = blocks.begin(); it != blocks.end(); ++it) {
block_mapping_manager_->MarkIncomplete(*it);
}
LOG(INFO, "%s try clean %s dir lock",
request->uuid().c_str(), path.c_str());
} else {
// double lock by the same sdk, ignore it
status = kOK;
}
}

response->set_status(status);
done->Run();
if (need_log_remote) {
LogRemote(log, std::bind(&NameServerImpl::SyncLogCallback, this,
controller, request, response, done,
(std::vector<FileInfo>*)NULL, lock_guard,
std::placeholders::_1));
} else {
done->Run();
}
}

void NameServerImpl::UnlockDir(::google::protobuf::RpcController* controller,
const UnlockDirRequest* request,
UnlockDirResponse* response,
Expand All @@ -1046,19 +1092,29 @@ void NameServerImpl::UnlockDir(::google::protobuf::RpcController* controller,
FileLockGuard lock_guard(new WriteLock(path));
StatusCode status = namespace_->GetDirLockStatus(path);
if (status == kDirLocked) {
//TODO log remote
namespace_->SetDirLockStatus(path, kDirLockCleaning);
std::vector<int64_t> blocks;
//TODO add force lock interface which do not list children directory
namespace_->ListAllBlocks(path, &blocks);;
if (block_mapping_manager_->CheckBlocksClosed(blocks)) {
NameServerLog log;
if (request->force_unlock()) {
namespace_->SetDirLockStatus(path, kDirUnlock, "", &log);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块价格注释解释下为啥有这么个选项吧

status = kDirUnlock;
} else {
status = kDirLockCleaning;
namespace_->SetDirLockStatus(path, kDirLockCleaning, "", &log);
std::vector<int64_t> blocks;
namespace_->ListAllBlocks(path, &blocks);;
if (block_mapping_manager_->CheckBlocksClosed(blocks)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是为啥啊。。发起unlock的用户感觉不需要知道当前目录的状态吧

status = kDirUnlock;
} else {
status = kDirLockCleaning;
}
}
response->set_status(status);
LogRemote(log, std::bind(&NameServerImpl::SyncLogCallback, this,
controller, request, response, done,
(std::vector<FileInfo>*)NULL, lock_guard,
std::placeholders::_1));
} else {
response->set_status(status);
done->Run();
}
response->set_status(status);
done->Run();
}

void NameServerImpl::RebuildBlockMapCallback(const FileInfo& file_info) {
Expand Down Expand Up @@ -1615,7 +1671,8 @@ void NameServerImpl::CallMethod(const ::google::protobuf::MethodDescriptor* meth
std::make_pair("PushBlockReport", work_thread_pool_),
std::make_pair("SysStat", read_thread_pool_),
std::make_pair("Chmod", work_thread_pool_),
std::make_pair("Symlink", work_thread_pool_)
std::make_pair("Symlink", work_thread_pool_),
std::make_pair("LockDir", work_thread_pool_)

};
static int method_num = sizeof(ThreadPoolOfMethod) /
Expand Down
Loading