Skip to content

Commit

Permalink
Use ip address for internal communication #907 (#908)
Browse files Browse the repository at this point in the history
  • Loading branch information
bluebore authored and Yang Ce committed Jun 20, 2017
1 parent d540d18 commit 43ce0d8
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
30 changes: 16 additions & 14 deletions src/nameserver/chunkserver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void ChunkServerManager::CleanChunkServer(ChunkServerInfo* cs, const std::string
std::set<int64_t> blocks;
it->second->CleanUp(&blocks);
LOG(INFO, "Remove ChunkServer C%d %s %s, cs_num=%d",
cs->id(), cs->address().c_str(), reason.c_str(), chunkserver_num_);
cs->id(), cs->ipaddress().c_str(), reason.c_str(), chunkserver_num_);
cs->set_status(kCsCleaning);
mu_.Unlock();
block_mapping_manager_->DealWithDeadNode(id, blocks);
Expand All @@ -143,7 +143,7 @@ void ChunkServerManager::CleanChunkServer(ChunkServerInfo* cs, const std::string
cs->set_recover_speed(0);
if (std::find(chunkservers_to_offline_.begin(),
chunkservers_to_offline_.end(),
cs->address()) == chunkservers_to_offline_.end()) {
cs->ipaddress()) == chunkservers_to_offline_.end()) {
if (cs->is_dead()) {
cs->set_status(kCsOffLine);
} else {
Expand Down Expand Up @@ -195,7 +195,7 @@ void ChunkServerManager::DeadCheck() {
ChunkServerInfo* cs = *node;
it->second.erase(node++);
LOG(INFO, "[DeadCheck] ChunkServer dead C%d %s, cs_num=%d",
cs->id(), cs->address().c_str(), chunkserver_num_);
cs->id(), cs->ipaddress().c_str(), chunkserver_num_);
cs->set_is_dead(true);
if (cs->status() == kCsActive || cs->status() == kCsReadonly) {
cs->set_status(kCsWaitClean);
Expand All @@ -205,7 +205,7 @@ void ChunkServerManager::DeadCheck() {
thread_pool_->AddTask(task);
} else {
LOG(INFO, "[DeadCheck] ChunkServer C%d %s is being clean",
cs->id(), cs->address().c_str());
cs->id(), cs->ipaddress().c_str());
}
}
assert(it->second.empty());
Expand Down Expand Up @@ -392,7 +392,7 @@ bool ChunkServerManager::GetChunkServerChains(int num,
sit != set.end(); ++sit) {
ChunkServerInfo* cs = *sit;
if (cs->status() == kCsReadonly) {
LOG(DEBUG, "Alloc ignore Chunkserver %s: is in offline progress", cs->address().c_str());
LOG(DEBUG, "Alloc ignore Chunkserver %s: is in offline progress", cs->ipaddress().c_str());
continue;
}
double load = cs->load();
Expand All @@ -402,7 +402,7 @@ bool ChunkServerManager::GetChunkServerChains(int num,
loads.push_back(std::make_pair(load - local_factor, cs));
} else {
LOG(DEBUG, "Alloc ignore: ChunkServer %s data %ld/%ld buffer %d",
cs->address().c_str(), cs->data_size(),
cs->ipaddress().c_str(), cs->data_size(),
cs->disk_quota(), cs->buffers());
}
}
Expand All @@ -423,7 +423,7 @@ bool ChunkServerManager::GetChunkServerChains(int num,
} else {
for (int i = 0; i < num; ++i) {
ChunkServerInfo* cs = loads[i].second;
chains->push_back(std::make_pair(cs->id(), cs->address()));
chains->push_back(std::make_pair(cs->id(), cs->ipaddress()));
}
}
return true;
Expand Down Expand Up @@ -474,7 +474,7 @@ bool ChunkServerManager::GetRecoverChains(const std::set<int32_t>& replica,
loads.push_back(std::make_pair(load, cs));
} else {
LOG(DEBUG, "Recover alloc ignore: ChunkServer %s data %ld/%ld buffer %d",
cs->address().c_str(), cs->data_size(),
cs->ipaddress().c_str(), cs->data_size(),
cs->disk_quota(), cs->buffers());
}
}
Expand All @@ -495,7 +495,7 @@ bool ChunkServerManager::GetRecoverChains(const std::set<int32_t>& replica,
RandomSelect(&loads, FLAGS_recover_dest_limit);
for (int i = 0; i < static_cast<int>(loads.size()) && i < FLAGS_recover_dest_limit; ++i) {
ChunkServerInfo* cs = loads[i].second;
chains->push_back(cs->address());
chains->push_back(cs->ipaddress());
}
return true;
}
Expand All @@ -513,20 +513,20 @@ int ChunkServerManager::SelectChunkServerByZone(int num,
if (FLAGS_select_chunkserver_by_tag && !tag.empty()) {
if (!tag_set.insert(tag).second) {
LOG(DEBUG, "Ignore by tag: %s %s",
tag.c_str(), cs->address().c_str());
tag.c_str(), cs->ipaddress().c_str());
continue;
}
}
LOG(DEBUG, "Local zone %s tag %s C%d ",
cs->zone().c_str(), cs->tag().empty() ? "null" : cs->tag().c_str(), cs->id());
chains->push_back(std::make_pair(cs->id(), cs->address()));
chains->push_back(std::make_pair(cs->id(), cs->ipaddress()));
if (static_cast<int>(chains->size()) + (remote_server ? 1 : 0) >= num) {
break;
}
}
}
if (remote_server) {
chains->push_back(std::make_pair(remote_server->id(), remote_server->address()));
chains->push_back(std::make_pair(remote_server->id(), remote_server->ipaddress()));
LOG(INFO, "Remote zone %s C%d ",
remote_server->zone().c_str(), remote_server->id());
}
Expand Down Expand Up @@ -559,7 +559,7 @@ bool ChunkServerManager::UpdateChunkServer(int cs_id, const std::string& tag, in
}

int32_t ChunkServerManager::AddChunkServer(const std::string& address,
const std::string& ip,
const std::string& ipaddress,
const std::string& tag,
int64_t quota) {
mu_.AssertHeld();
Expand All @@ -570,6 +570,7 @@ int32_t ChunkServerManager::AddChunkServer(const std::string& address,
info->set_start_time(std::string(buf));
info->set_id(id);
info->set_address(address);
info->set_ipaddress(ipaddress);
info->set_tag(tag);
info->set_disk_quota(quota);
if (std::find(chunkservers_to_offline_.begin(), chunkservers_to_offline_.end(),
Expand All @@ -580,6 +581,7 @@ int32_t ChunkServerManager::AddChunkServer(const std::string& address,
}
info->set_kick(false);
std::string host = address.substr(0, address.find(':'));
std::string ip = ipaddress.substr(0, ipaddress.find(':'));
LocationProvider loc(host, ip);
info->set_zone(loc.GetZone());
info->set_datacenter(loc.GetDataCenter());
Expand All @@ -602,7 +604,7 @@ std::string ChunkServerManager::GetChunkServerAddr(int32_t id) {
MutexLock lock(&mu_, "GetChunkServerAddr", 10);
ChunkServerInfo* cs = NULL;
if (GetChunkServerPtr(id, &cs) && !cs->is_dead()) {
return cs->address();
return cs->ipaddress();
}
return "";
}
Expand Down
11 changes: 6 additions & 5 deletions src/nameserver/nameserver_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ void NameServerImpl::Register(::google::protobuf::RpcController* controller,
sofa::pbrpc::RpcController* sofa_cntl =
reinterpret_cast<sofa::pbrpc::RpcController*>(controller);
const std::string& address = request->chunkserver_addr();
const std::string& ip_address = sofa_cntl->RemoteAddress();
const std::string cs_ip = ip_address.substr(ip_address.find(':'));
std::string port = address.substr(address.find(':'));
std::string ip_address = sofa_cntl->RemoteAddress();
ip_address = ip_address.substr(0, ip_address.find(':')) + port;
LOG(INFO, "Register ip: %s", ip_address.c_str());
int64_t version = request->namespace_version();
if (version != namespace_->Version()) {
Expand All @@ -189,7 +190,7 @@ void NameServerImpl::Register(::google::protobuf::RpcController* controller,
chunkserver_manager_->RemoveChunkServer(address);
} else {
LOG(INFO, "Register from %s, version= %ld", address.c_str(), version);
if (chunkserver_manager_->HandleRegister(cs_ip, request, response)) {
if (chunkserver_manager_->HandleRegister(ip_address, request, response)) {
LeaveReadOnly();
}
}
Expand Down Expand Up @@ -1387,8 +1388,8 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
table_str += "</td><td>";
table_str += common::NumToString(chunkserver.id());
table_str += "</td><td>";
table_str += "<a href=\"http://" + chunkserver.address() + "/dfs\">"
+ chunkserver.address() + "</a>";
table_str += "<a href=\"http://" + chunkserver.ipaddress() + "/dfs\">"
+ chunkserver.ipaddress() + "</a>";
table_str += "</td><td>";
table_str += common::NumToString(chunkserver.block_num());
table_str += "</td><td>";
Expand Down
1 change: 1 addition & 0 deletions src/proto/nameserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ option cc_generic_services = true;
message ChunkServerInfo {
optional int32 id = 1;
optional string address = 2;
optional string ipaddress = 18;
optional string start_time = 17;
optional int32 last_heartbeat = 3;
optional int64 data_size = 4;
Expand Down

0 comments on commit 43ce0d8

Please sign in to comment.