Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue writing after one chunkserver dead #843

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,13 @@ nameserver_test: src/nameserver/test/nameserver_impl_test.o \
src/nameserver/namespace.o src/nameserver/raft_impl.o \
src/nameserver/raft_node.o $(OBJS) -o $@ $(LDFLAGS)

block_mapping_test: src/nameserver/test/block_mapping_test.o src/nameserver/block_mapping.o
block_mapping_test: src/nameserver/test/block_mapping_test.o \
src/nameserver/block_mapping.o src/nameserver/chunkserver_manager.o \
src/nameserver/location_provider.o
$(CXX) src/nameserver/block_mapping.o src/nameserver/test/block_mapping_test.o \
src/nameserver/block_mapping_manager.o $(OBJS) -o $@ $(LDFLAGS)
src/nameserver/block_mapping_manager.o src/nameserver/chunkserver_manager.o \
src/nameserver/location_provider.o \
$(OBJS) -o $@ $(LDFLAGS)

logdb_test: src/nameserver/test/logdb_test.o src/nameserver/logdb.o
$(CXX) src/nameserver/logdb.o src/nameserver/test/logdb_test.o $(OBJS) -o $@ $(LDFLAGS)
Expand Down
23 changes: 23 additions & 0 deletions src/chunkserver/chunkserver_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,29 @@ void ChunkServerImpl::GetBlockInfo(::google::protobuf::RpcController* controller

}

void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* controller,
const PrepareForWriteRequest* request,
PrepareForWriteResponse* response,
::google::protobuf::Closure* done) {
response->set_sequence_id(request->sequence_id());
int64_t block_id = request->block_id();
int32_t seq = request->sliding_window_start_seq();
int64_t block_size = request->block_size();

LOG(INFO, "Prepare write for block #%ld from seq %d, size %ld",
block_id, seq, block_size);
StatusCode s;
Block* block = block_manager_->CreateBlock(block_id, &s);
if (s != kOK) {
LOG(INFO, "[PrepareForWrite] block #%ld created failed, reason %s",
block_id, StatusCode_Name(s).c_str());
} else {
block->PrepareForWrite(seq, block_size);
}
response->set_status(s);
done->Run();
}

bool ChunkServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
sofa::pbrpc::HTTPResponse& response) {
ChunkserverStat c_stat = counter_manager_.GetCounters();
Expand Down
4 changes: 4 additions & 0 deletions src/chunkserver/chunkserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class ChunkServerImpl : public ChunkServer {
const GetBlockInfoRequest* request,
GetBlockInfoResponse* response,
::google::protobuf::Closure* done);
virtual void PrepareForWrite(::google::protobuf::RpcController* controller,
const PrepareForWriteRequest* request,
PrepareForWriteResponse* response,
::google::protobuf::Closure* done);
bool WebService(const sofa::pbrpc::HTTPRequest& request,
sofa::pbrpc::HTTPResponse& response);
private:
Expand Down
4 changes: 4 additions & 0 deletions src/chunkserver/data_block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ int64_t Block::GetExpectedSize() const {
void Block::SetExpectedSize(int64_t expected_size) {
expected_size_ = expected_size;
}
void Block::PrepareForWrite(int32_t seq, int64_t size) {
recv_window_->SeekToOffset(seq);
meta_.set_block_size(size);
}
/// Append to block buffer
StatusCode Block::Append(int32_t seq, const char* buf, int64_t len) {
mu_.AssertHeld();
Expand Down
1 change: 1 addition & 0 deletions src/chunkserver/data_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Block {
void SetExpectedSize(int64_t expected_size);
/// Flush block to disk.
bool Close(bool sync);
void PrepareForWrite(int32_t seq, int64_t size);
void AddRef();
void DecRef();
int GetRef() const;
Expand Down
26 changes: 26 additions & 0 deletions src/chunkserver/test/data_block_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// found in the LICENSE file.
//

#define private public
#include "chunkserver/data_block.h"
#include "chunkserver/file_cache.h"
#include "chunkserver/disk.h"
Expand Down Expand Up @@ -102,6 +103,31 @@ TEST_F(DataBlockTest, WriteAndReadBlock) {
system("rm -rf ./block123");
}

TEST_F(DataBlockTest, PrepareForWrite) {
mkdir("./block123", 0755);
std::string file_path("./block123");
Disk disk(file_path, 1000000);
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
BlockMeta meta;
FileCache file_cache(10);
int64_t block_id = 123;
meta.set_block_id(block_id);
meta.set_store_path(file_path);
Block* block = new Block(meta, &disk, &file_cache);
block->AddRef();
ASSERT_TRUE(block != NULL);
ASSERT_EQ(block->recv_window_->GetBaseOffset(), 0);
ASSERT_EQ(block->meta_.block_size(), 0);
block->PrepareForWrite(50, 1000);
ASSERT_EQ(block->recv_window_->GetBaseOffset(), 50);
ASSERT_EQ(block->meta_.block_size(), 1000);
block->PrepareForWrite(300, 4000);
ASSERT_EQ(block->recv_window_->GetBaseOffset(), 300);
ASSERT_EQ(block->meta_.block_size(), 4000);
block->DecRef();
system("rm -rf ./block123");
}

}
}
Expand Down
54 changes: 52 additions & 2 deletions src/nameserver/block_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// found in the LICENSE file.

#include "block_mapping.h"
#include "chunkserver_manager.h"

#include <vector>
#include <gflags/gflags.h>
Expand Down Expand Up @@ -67,8 +68,12 @@ bool BlockMapping::GetLocatedBlock(int64_t id, std::vector<int32_t>* replica, in
if (replica->empty()) {
LOG(DEBUG, "Block #%ld lost all replica", id);
}
*size = block->block_size;
*status = block->recover_stat;
if (size) {
*size = block->block_size;
}
if (status) {
*status = block->recover_stat;
}
return true;
}

Expand Down Expand Up @@ -969,5 +974,50 @@ void BlockMapping::MarkIncomplete(int64_t block_id) {
}
}

void BlockMapping::AddRecoverBlock(int64_t block_id, int32_t cs_id,
int64_t start_offset, int64_t end_offset) {
RecoverInfo* recover_info = new RecoverInfo;
recover_info->set_block_id(block_id);
recover_info->set_cs_id(cs_id);
recover_info->set_start_offset(start_offset);
recover_info->set_end_offset(end_offset);
MutexLock lock(&mu_);
auto it = recover_writing_blocks_.find(block_id);
if (it != recover_writing_blocks_.end()) {
RecoverInfo* info = it->second;
LOG(WARNING, "Block #%ld recover to C%d from offset %ld to %ld ",
"already in recover map",
info->block_id(), info->cs_id(), info->start_offset(),
info->end_offset());
delete info;
it->second = recover_info;
} else {
recover_writing_blocks_[block_id] = recover_info;
}
}

void BlockMapping::PickRecoverWritingBlocks(Blocks* cs_block_map,
::google::protobuf::RepeatedPtrField<RecoverInfo>* recover_blocks) {
int32_t cs_id = cs_block_map->GetChunkServerId();
MutexLock lock(&mu_);
for (auto it = recover_writing_blocks_.begin();
it != recover_writing_blocks_.end(); ++it) {
int64_t block_id = it->first;
if (!cs_block_map->BlockExists(block_id)) {
continue;
}
RecoverInfo* info = it->second;
if (info->cs_id() == cs_id) {
continue;
}
RecoverInfo* block = recover_blocks->Add();
block->CopyFrom(*info);
LOG(INFO, "Pick writing block #%ld from C%ld recover to C%ld",
"start offset %ld end offset %ld",
info->block_id(), cs_id, info->cs_id(),
info->start_offset(), info->end_offset());
}
}

} // namespace bfs
} // namespace baidu
7 changes: 7 additions & 0 deletions src/nameserver/block_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct RecoverBlockSet {
std::map<int32_t, std::set<int64_t> > incomplete;
};

class Blocks;

class BlockMapping {
public:
BlockMapping(ThreadPool* thread_pool);
Expand All @@ -81,6 +83,10 @@ class BlockMapping {
void ListRecover(RecoverBlockSet* blocks);
int32_t GetCheckNum();
void MarkIncomplete(int64_t block_id);
void AddRecoverBlock(int64_t block_id, int32_t cs_id,
int64_t start_offset, int64_t end_offset);
void PickRecoverWritingBlocks(Blocks* cs_block_map,
::google::protobuf::RepeatedPtrField<RecoverInfo>* recover_blocks);
private:
void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id);
typedef std::map<int32_t, std::set<int64_t> > CheckList;
Expand Down Expand Up @@ -113,6 +119,7 @@ class BlockMapping {
std::set<int64_t> lo_pri_recover_;
std::set<int64_t> hi_pri_recover_;
std::set<int64_t> lost_blocks_;
std::map<int64_t, RecoverInfo*> recover_writing_blocks_;
};

} // namespace bfs
Expand Down
18 changes: 18 additions & 0 deletions src/nameserver/block_mapping_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,23 @@ void BlockMappingManager::MarkIncomplete(int64_t block_id) {
block_mapping_[bucket_offset]->MarkIncomplete(block_id);
}

void BlockMappingManager::AddRecoverBlock(int64_t block_id, int32_t cs_id,
int64_t start_offset,
int64_t end_offset) {
int32_t bucket_offset = GetBucketOffset(block_id);
block_mapping_[bucket_offset]->AddRecoverBlock(block_id, cs_id,
start_offset, end_offset);
}

void BlockMappingManager::PickRecoverWritingBlocks(Blocks* cs_block_map,
::google::protobuf::RepeatedPtrField<RecoverInfo>* recover_blocks) {
//TODO use flag
for (int i = 0; i < blockmapping_bucket_num_ &&
recover_blocks->size() < 100; i++) {
block_mapping_[i]->PickRecoverWritingBlocks(cs_block_map,
recover_blocks);
}
}

} //namespace bfs
} //namespace baidu
6 changes: 6 additions & 0 deletions src/nameserver/block_mapping_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
namespace baidu {
namespace bfs {

class Blocks;

class BlockMappingManager {
public :
BlockMappingManager(int32_t bucket_num);
Expand Down Expand Up @@ -40,6 +42,10 @@ public :
void GetRecoverNum(int32_t bucket_id, RecoverBlockNum* recover_num);
void ListRecover(RecoverBlockSet* recover_blocks);
void MarkIncomplete(int64_t block_id);
void AddRecoverBlock(int64_t block_id, int32_t cs_id, int64_t start_offset,
int64_t end_offset);
void PickRecoverWritingBlocks(Blocks* cs_block_map,
::google::protobuf::RepeatedPtrField<RecoverInfo>* recover_blocks);
private:
int32_t GetBucketOffset(int64_t block_id);
private:
Expand Down
49 changes: 45 additions & 4 deletions src/nameserver/chunkserver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,28 @@ int64_t Blocks::CheckLost(int64_t report_id, const std::set<int64_t>& blocks,
return report_id;
}

bool Blocks::BlockExists(int64_t block_id) {
// blocks in new_blocks_ maybe merged into blocks_,
// so we must look up new_blocks_ first
{
MutexLock lock(&new_blocks_mu_);
if (new_blocks_.find(block_id) != new_blocks_.end()) {
return true;
}
}
{
MutexLock lock(&block_mu_);
if (blocks_.find(block_id) != blocks_.end()) {
return true;
}
}
return false;
}

int32_t Blocks::GetChunkServerId() const {
return cs_id_;
}

ChunkServerManager::ChunkServerManager(ThreadPool* thread_pool, BlockMappingManager* block_mapping_manager)
: thread_pool_(thread_pool),
block_mapping_manager_(block_mapping_manager),
Expand Down Expand Up @@ -428,7 +450,8 @@ bool ChunkServerManager::GetChunkServerChains(int num,
}

bool ChunkServerManager::GetRecoverChains(const std::set<int32_t>& replica,
std::vector<std::string>* chains) {
std::vector<std::string>* chains,
int32_t select_num) {
mu_.AssertHeld();
std::map<int32_t, std::set<ChunkServerInfo*> >::iterator it = heartbeat_list_.begin();
std::vector<std::pair<double, ChunkServerInfo*> > loads;
Expand Down Expand Up @@ -490,8 +513,9 @@ bool ChunkServerManager::GetRecoverChains(const std::set<int32_t>& replica,
return false;
}
}
RandomSelect(&loads, FLAGS_recover_dest_limit);
for (int i = 0; i < static_cast<int>(loads.size()) && i < FLAGS_recover_dest_limit; ++i) {
RandomSelect(&loads, select_num);
for (int i = 0; i < static_cast<int>(loads.size()) &&
i < select_num; ++i) {
ChunkServerInfo* cs = loads[i].second;
chains->push_back(cs->address());
}
Expand Down Expand Up @@ -674,7 +698,9 @@ void ChunkServerManager::PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks
it != blocks.end(); ++it) {
MutexLock lock(&mu_);
recover_blocks->push_back(std::make_pair((*it).first, std::vector<std::string>()));
if (GetRecoverChains((*it).second, &(recover_blocks->back().second))) {
if (GetRecoverChains((*it).second,
&(recover_blocks->back().second),
FLAGS_recover_dest_limit)) {
//
} else {
block_mapping_manager_->ProcessRecoveredBlock(cs_id, (*it).first, kGetChunkServerError);
Expand All @@ -695,6 +721,21 @@ void ChunkServerManager::PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks
}
}

void ChunkServerManager::PickRecoverWritingBlocks(int32_t cs_id,
::google::protobuf::RepeatedPtrField<RecoverInfo>* recover_block_info) {
Blocks* block_map = GetBlockMap(cs_id);
if (!block_map) {
LOG(FATAL, "Get block map for C%d fail", cs_id);
}
block_mapping_manager_->PickRecoverWritingBlocks(block_map,
recover_block_info);
// covert cs_id to cs_addr
for (int i = 0; i < recover_block_info->size(); i++) {
RecoverInfo* info = recover_block_info->Mutable(i);
info->set_cs_addr(GetChunkServerAddr(info->cs_id()));
}
}

void ChunkServerManager::GetStat(int32_t* w_qps, int64_t* w_speed,
int32_t* r_qps, int64_t* r_speed, int64_t* recover_speed) {
if (w_qps) *w_qps = stats_.w_qps;
Expand Down
10 changes: 8 additions & 2 deletions src/nameserver/chunkserver_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class Blocks {
void MoveNew();
int64_t CheckLost(int64_t report_id, const std::set<int64_t>& blocks,
int64_t start, int64_t end, std::vector<int64_t>* lost);
bool BlockExists(int64_t block_id);
int32_t GetChunkServerId() const;
private:
Mutex block_mu_;
std::set<int64_t> blocks_;
Expand All @@ -55,7 +57,8 @@ class ChunkServerManager {
void ListChunkServers(::google::protobuf::RepeatedPtrField<ChunkServerInfo>* chunkservers);
bool GetChunkServerChains(int num, std::vector<std::pair<int32_t,std::string> >* chains,
const std::string& client_address);
bool GetRecoverChains(const std::set<int32_t>& replica, std::vector<std::string>* chains);
bool GetRecoverChains(const std::set<int32_t>& replica,
std::vector<std::string>* chains, int num);
int32_t AddChunkServer(const std::string& address, const std::string& ip,
const std::string& tag, int64_t quota);
bool KickChunkServer(int cs_id);
Expand All @@ -66,7 +69,10 @@ class ChunkServerManager {
void AddBlock(int32_t id, int64_t block_id);
void RemoveBlock(int32_t id, int64_t block_id);
void CleanChunkServer(ChunkServerInfo* cs, const std::string& reason);
void PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks, int* hi_num, bool hi_only);
void PickRecoverBlocks(int32_t cs_id, RecoverVec* recover_blocks,
int32_t* hi_num, bool hi_only);
void PickRecoverWritingBlocks(int32_t cs_id,
::google::protobuf::RepeatedPtrField<RecoverInfo>* recover_block_info);
void GetStat(int32_t* w_qps, int64_t* w_speed, int32_t* r_qps,
int64_t* r_speed, int64_t* recover_speed);
StatusCode ShutdownChunkServer(const::google::protobuf::RepeatedPtrField<std::string>& chunkserver_address);
Expand Down
Loading