Skip to content

Commit

Permalink
Dir lock (baidu#527)
Browse files Browse the repository at this point in the history
(baidu#527) Add dir lock support
  • Loading branch information
Yang Ce authored and bluebore committed Mar 21, 2017
1 parent be53377 commit 7e59b83
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/nameserver/block_mapping_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,9 @@ void BlockMappingManager::MarkIncomplete(int64_t block_id) {
block_mapping_[bucket_offset]->MarkIncomplete(block_id);
}

bool BlockMappingManager::CheckBlocksClosed(const std::vector<int64_t>& blocks) {
return true;
}

} //namespace bfs
} //namespace baidu
1 change: 1 addition & 0 deletions src/nameserver/block_mapping_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public :
void GetRecoverNum(int32_t bucket_id, RecoverBlockNum* recover_num);
void ListRecover(RecoverBlockSet* recover_blocks);
void MarkIncomplete(int64_t block_id);
bool CheckBlocksClosed(const std::vector<int64_t>& blocks);
private:
int32_t GetBucketOffset(int64_t block_id);
private:
Expand Down
64 changes: 63 additions & 1 deletion src/nameserver/nameserver_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -999,12 +999,74 @@ void NameServerImpl::ChangeReplicaNum(::google::protobuf::RpcController* control
done->Run();
}

void NameServerImpl::LockDir(::google::protobuf::RpcController* controller,
const LockDirRequest* request,
LockDirResponse* response,
::google::protobuf::Closure* done) {
if (!is_leader_) {
response->set_status(kIsFollower);
done->Run();
return;
}
std::string path = NameSpace::NormalizePath(request->dir_path());
FileLockGuard lock_guard(new WriteLock(path));
StatusCode status = namespace_->GetDirLockStatus(path);
if (status != kDirLocked) {
//TODO log remote?
if (status == kDirUnlock) {
namespace_->SetDirLockStatus(path, kDirLocked, request->uuid());
status = kOK;
} else if (status == kDirLockCleaning) {
std::vector<int64_t> blocks;
namespace_->ListAllBlocks(path, &blocks);
if (block_mapping_manager_->CheckBlocksClosed(blocks)) {
//TODO log remote
namespace_->SetDirLockStatus(path, kDirUnlock);
status = kOK;
}
} // else status should be kBadParameter
} else {
//TODO log remote
namespace_->SetDirLockStatus(path, kDirLockCleaning);
status = kDirLockCleaning;
}
response->set_status(status);
done->Run();
}
void NameServerImpl::UnlockDir(::google::protobuf::RpcController* controller,
const UnlockDirRequest* request,
UnlockDirResponse* response,
::google::protobuf::Closure* done) {
if (!is_leader_) {
response->set_status(kIsFollower);
done->Run();
return;
}
std::string path = NameSpace::NormalizePath(request->dir_path());
FileLockGuard lock_guard(new WriteLock(path));
StatusCode status = namespace_->GetDirLockStatus(path);
if (status == kDirLocked) {
//TODO log remote
namespace_->SetDirLockStatus(path, kDirLockCleaning);
std::vector<int64_t> blocks;
//TODO add force lock interface which do not list children directory
namespace_->ListAllBlocks(path, &blocks);;
if (block_mapping_manager_->CheckBlocksClosed(blocks)) {
status = kDirUnlock;
} else {
status = kDirLockCleaning;
}
}
response->set_status(status);
done->Run();
}

void NameServerImpl::RebuildBlockMapCallback(const FileInfo& file_info) {
for (int i = 0; i < file_info.blocks_size(); i++) {
int64_t block_id = file_info.blocks(i);
int64_t version = file_info.version();
block_mapping_manager_->RebuildBlock(block_id, file_info.replicas(),
version, file_info.size());
version, file_info.size());
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/nameserver/nameserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ class NameServerImpl : public NameServer {
const ChmodRequest* request,
ChmodResponse* response,
::google::protobuf::Closure* done);
void LockDir(::google::protobuf::RpcController* controller,
const LockDirRequest* request,
LockDirResponse* response,
::google::protobuf::Closure* done);
void UnlockDir(::google::protobuf::RpcController* controller,
const UnlockDirRequest* request,
UnlockDirResponse* response,
::google::protobuf::Closure* done);
bool WebService(const sofa::pbrpc::HTTPRequest&, sofa::pbrpc::HTTPResponse&);

private:
Expand Down
13 changes: 13 additions & 0 deletions src/nameserver/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,19 @@ int64_t NameSpace::GetNewBlockId() {
return next_block_id_++;
}

StatusCode NameSpace::GetDirLockStatus(const std::string& path) {
return kDirUnlock;
}

void NameSpace::SetDirLockStatus(const std::string& path, StatusCode status,
const std::string& uuid) {

}

void NameSpace::ListAllBlocks(const std::string& path, std::vector<int64_t>* result) {

}

} // namespace bfs
} // namespace baidu
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */
4 changes: 4 additions & 0 deletions src/nameserver/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class NameSpace {
void TailSnapshot(int32_t ns_id, std::string* logstr);
void EraseNamespace();
int64_t GetNewBlockId();
StatusCode GetDirLockStatus(const std::string& path);
void SetDirLockStatus(const std::string& path, StatusCode status,
const std::string& uuid = "");
void ListAllBlocks(const std::string& path, std::vector<int64_t>* result);
private:
enum FileType {
kDefault = 0,
Expand Down
24 changes: 24 additions & 0 deletions src/proto/nameserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,28 @@ message SymlinkResponse {
optional StatusCode status = 2;
}

message LockDirRequest {
optional int64 sequence_id = 1;
optional string dir_path = 2;
optional string uuid = 3;
};

message LockDirResponse {
optional int64 sequence_id = 1;
optional StatusCode status = 2;
};

message UnlockDirRequest {
optional int64 sequence_id = 1;
optional string dir_path = 2;
optional string uuid = 3;
};

message UnlockDirResponse {
optional int64 sequence_id = 1;
optional StatusCode status = 2;
};

service NameServer {
rpc CreateFile(CreateFileRequest) returns(CreateFileResponse);
rpc AddBlock(AddBlockRequest) returns(AddBlockResponse);
Expand All @@ -363,5 +385,7 @@ service NameServer {
rpc SysStat(SysStatRequest) returns(SysStatResponse);
rpc Chmod(ChmodRequest) returns(ChmodResponse);
rpc Symlink(SymlinkRequest) returns(SymlinkResponse);
rpc LockDir(LockDirRequest) returns(LockDirResponse);
rpc UnlockDir(UnlockDirRequest) returns(UnlockDirResponse);
}

3 changes: 3 additions & 0 deletions src/proto/status_code.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ enum StatusCode {
kSyncMetaFailed = 801;
kSafeMode = 802;
kServiceStop = 803;
kDirLocked = 804;
kDirUnlock = 805;
kDirLockCleaning = 806;
}

enum ChunkServerStatus {
Expand Down
4 changes: 4 additions & 0 deletions src/sdk/bfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class FS {
virtual int32_t ListDirectory(const char* path, BfsFileInfo** filelist, int *num) = 0;
/// Delete Directory
virtual int32_t DeleteDirectory(const char* path, bool recursive) = 0;
/// Lock Directory
virtual int32_t LockDirectory(const char* path) = 0;
/// Unlock Directory
virtual int32_t UnlockDirectory(const char* path) = 0;
/// Du
virtual int32_t DiskUsage(const char* path, int64_t* du_size) = 0;
/// Access
Expand Down
28 changes: 28 additions & 0 deletions src/sdk/fs_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,29 @@ int32_t FSImpl::DeleteFile(const char* path) {
}
return OK;
}
int32_t FSImpl::LockDirectory(const char* path) {
//TODO Support set timeout for LockDirectory
LockDirRequest request;
LockDirResponse response;
request.set_dir_path(path);
request.set_uuid(GetUUID());
while (!nameserver_client_->SendRequest(&NameServer_Stub::LockDir,
&request, &response, 15, 1) || response.status() != kOK) {
sleep(5);
}
assert(response.status() == kOK);
return OK;
}
int32_t FSImpl::UnlockDirectory(const char* path) {
UnlockDirRequest request;
UnlockDirResponse response;
request.set_dir_path(path);
request.set_uuid(GetUUID());
nameserver_client_->SendRequest(&NameServer_Stub::UnlockDir,
&request, &response, 15, 1);
//Don't care return value of rpc
return OK;
}
int32_t FSImpl::Rename(const char* oldpath, const char* newpath) {
RenameRequest request;
RenameResponse response;
Expand Down Expand Up @@ -560,5 +583,10 @@ bool FS::OpenFileSystem(const char* nameserver, FS** fs, const FSOptions&) {
return true;
}

const std::string& FSImpl::GetUUID() {
static std::string uuid;
return uuid;
}

} // namespace bfs
} // namespace baidu
4 changes: 4 additions & 0 deletions src/sdk/fs_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class FSImpl : public FS {
int32_t CreateDirectory(const char* path);
int32_t ListDirectory(const char* path, BfsFileInfo** filelist, int *num);
int32_t DeleteDirectory(const char* path, bool recursive);
int32_t LockDirectory(const char* path);
int32_t UnlockDirectory(const char* path);
int32_t DiskUsage(const char* path, int64_t* du_size);
int32_t Access(const char* path, int32_t mode);
int32_t Stat(const char* path, BfsFileInfo* fileinfo);
Expand All @@ -53,6 +55,8 @@ class FSImpl : public FS {
int32_t SysStat(const std::string& stat_name, std::string* result);
int32_t ShutdownChunkServer(const std::vector<std::string>& cs_addr);
int32_t ShutdownChunkServerStat();
private:
const std::string& GetUUID();
private:
RpcClient* rpc_client_;
NameServerClient* nameserver_client_;
Expand Down

0 comments on commit 7e59b83

Please sign in to comment.