Skip to content

Commit

Permalink
thrift支持
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed Sep 23, 2017
1 parent fd751f7 commit 059f899
Show file tree
Hide file tree
Showing 97 changed files with 8,558 additions and 1,717 deletions.
5 changes: 1 addition & 4 deletions client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ set (SRC_LIST
client_command_handler.h
../../nebula/nebula/base/readline_gets.cc
../../nebula/nebula/base/readline_gets.h
../seqsvr/proto/cc/seqsvr.pb.cc
../seqsvr/proto/cc/seqsvr.pb.h
../seqsvr/base/message_handler_util.h
)

add_executable (client ${SRC_LIST})
target_link_libraries (client net base protobuf readline)
target_link_libraries (client seqsvr_base base seqsvr_gen-cpp2 boost_system boost_filesystem pthread glog gflags folly thrift thriftcpp2 thriftprotocol zstd ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} readline dl)

64 changes: 33 additions & 31 deletions client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,45 @@
#include "client/client.h"

#include <iostream>

#include "proto/cc/seqsvr.pb.h"
#include "nebula/net/zproto/api_message_box.h"

#include "nebula/net/rpc/zrpc_service_util.h"
#include "nebula/net/net_engine_manager.h"
#include <thrift/lib/cpp/async/TAsyncSocket.h>
#include <thrift/lib/cpp2/async/HeaderClientChannel.h>
#include "nebula/base/readline_gets.h"

#include "proto/gen-cpp2/seqsvr_types.h"
#include "proto/gen-cpp2/AllocService.h"

#include "client/client_command_handler.h"

using namespace apache::thrift;
//
//#include "proto/cc/seqsvr.pb.h"
//#include "nebula/net/zproto/api_message_box.h"
//
//#include "nebula/net/rpc/zrpc_service_util.h"
//#include "nebula/net/net_engine_manager.h"


bool Client::Initialize() {
// 注册服务
// RegisterService("seq_client", "rpc_client", "zrpc");
// 注册服务
RegisterService("alloc_client", "rpc_client", "zrpc");
// RegisterService("alloc_client", "rpc_client", "zrpc");

return BaseServer::Initialize();
return BaseDaemon::Initialize();
}

bool Client::Run() {
// auto net_engine_manager = nebula::NetEngineManager::GetInstance();
// 启动成功
try {
nebula::NetEngineManager::GetInstance()->Start();
} catch (std::exception& e) {
LOG(ERROR) << "Run - catch exception: " << e.what();
return false;
} catch (...) {
LOG(ERROR) << "Run - catch a invalid exception";
return false;
}
// try {
// nebula::NetEngineManager::GetInstance()->Start();
// } catch (std::exception& e) {
// LOG(ERROR) << "Run - catch exception: " << e.what();
// return false;
// } catch (...) {
// LOG(ERROR) << "Run - catch a invalid exception";
// return false;
// }

// GPerftoolsProfiler profiler;
// profiler.ProfilerStart();
Expand All @@ -58,14 +66,20 @@ bool Client::Run() {

// profiler.ProfilerStop();

nebula::NetEngineManager::GetInstance()->Stop();
// nebula::NetEngineManager::GetInstance()->Stop();

return true;
// return BaseServer::Run();
}

void Client::DoCommandLineLoop() {

// EventBase eb;
auto client = std::make_unique<seqsvr::AllocServiceAsyncClient>(
HeaderClientChannel::newChannel(
async::TAsyncSocket::newSocket(
&main_eb_, {"127.0.0.1", 10000})));

try {
while (true) {
auto line = ReadlineGets("nebula-im> ");
Expand All @@ -76,7 +90,7 @@ void Client::DoCommandLineLoop() {
std::vector<folly::StringPiece> cmds;
folly::split(" ", line, cmds);

if (-2 == DoClientCommand(cmds)) {
if (-2 == DoClientCommand(client.get(), cmds)) {
break;
}
}
Expand All @@ -90,15 +104,3 @@ int main(int argc, char* argv[]) {
return nebula::DoMain<Client>(argc, argv);
}

void DebugTest() {
zproto::GetRouteTableReq get_route_table_req;
std::string o;
get_route_table_req.SerializeToString(&o);
std::cout << o.length() << ", " << get_route_table_req.Utf8DebugString() << std::endl;

get_route_table_req.Clear();
bool rv = get_route_table_req.ParseFromArray(o.c_str(), o.length());
if (!rv) std::cout << "error!!!" << std::endl;
std::cout << o.length() << ", " << get_route_table_req.Utf8DebugString() << std::endl;
}

11 changes: 8 additions & 3 deletions client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@

#include <folly/io/async/EventBase.h>

#include "nebula/net/base_server.h"
#include "nebula/base/base_daemon.h"

class Client : public nebula::BaseServer {
class Client : public nebula::BaseDaemon {
public:
Client() = default;
~Client() override = default;

protected:
// From BaseServer
// From BaseDaemon
// 不使用自动配置框架
bool LoadConfig(const std::string& config_file) override {
return true;
}

bool Initialize() override;
bool Run() override;

Expand Down
114 changes: 15 additions & 99 deletions client/client_command_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "nebula/base/time_util.h"
// #include "nebula/base/id_util.h"
// #include "nebula/net/rpc/zrpc_service_util.h"
#include "base/message_handler_util.h"
// #include "base/message_handler_util.h"

typedef int (*ClientCommandHandlerFunc)(const std::vector<folly::StringPiece>&);
typedef int (*ClientCommandHandlerFunc)(seqsvr::AllocServiceAsyncClient*, const std::vector<folly::StringPiece>&);

//
struct CmdEntry {
Expand All @@ -35,7 +35,7 @@ struct CmdEntry {
ClientCommandHandlerFunc handler; // 命令处理函数
};

int DoFetchNextSeq(const std::vector<folly::StringPiece>& command_lines) {
int DoFetchNextSeq(seqsvr::AllocServiceAsyncClient* client, const std::vector<folly::StringPiece>& command_lines) {
uint32_t id = 0;
try {
id = folly::to<uint32_t>(command_lines[1]);
Expand All @@ -44,25 +44,15 @@ int DoFetchNextSeq(const std::vector<folly::StringPiece>& command_lines) {
return 0;
}

zproto::FetchNextSequenceReq fetch_next_sequence_req;
fetch_next_sequence_req.set_id(id);
fetch_next_sequence_req.set_version(0);

ZRpcClientCall<zproto::SequenceRsp>("alloc_client",
MakeRpcRequest(fetch_next_sequence_req),
[] (std::shared_ptr<ApiRpcOk<zproto::SequenceRsp>> seq_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "DoFetchNextSeq - rpc_error: " << rpc_error->ToString();
} else {
LOG(INFO) << "DoFetchNextSeq - seq_rsp: " << seq_rsp->ToString();
}
return 0;
});
seqsvr::Sequence _return;
client->sync_FetchNextSequence(_return, id, 0);

LOG(INFO) << "DoFetchNextSeq - seq_rsp: " << _return.sequence;

return 0;
}

int DoGetCurrentSeq(const std::vector<folly::StringPiece>& command_lines) {
int DoGetCurrentSeq(seqsvr::AllocServiceAsyncClient* client, const std::vector<folly::StringPiece>& command_lines) {
uint32_t id = 0;
try {
id = folly::to<uint32_t>(command_lines[1]);
Expand All @@ -71,87 +61,15 @@ int DoGetCurrentSeq(const std::vector<folly::StringPiece>& command_lines) {
return 0;
}

zproto::GetCurrentSequenceReq get_current_sequence_req;
get_current_sequence_req.set_id(id);
get_current_sequence_req.set_version(0);

ZRpcClientCall<zproto::SequenceRsp>("alloc_client",
MakeRpcRequest(get_current_sequence_req),
[] (std::shared_ptr<ApiRpcOk<zproto::SequenceRsp>> seq_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "DoGetCurrentSeq - rpc_error: " << rpc_error->ToString();
} else {
LOG(INFO) << "DoGetCurrentSeq - seq_rsp: " << seq_rsp->ToString();
}
return 0;
});
return 0;
}

int DoFetchNextSeqList(const std::vector<folly::StringPiece>& command_lines) {
zproto::FetchNextSequenceListReq fetch_next_sequence_list_req;
for (size_t i=1; i<command_lines.size(); ++i) {
uint32_t id = 0;
try {
id = folly::to<uint32_t>(command_lines[i]);
} catch (...) {
LOG(ERROR) << "DoFetchNextSeqList - user_id invalid, not a number: " << command_lines[1];
continue;
}
fetch_next_sequence_list_req.add_id_list(id);
}
if (fetch_next_sequence_list_req.id_list_size() == 0) {
LOG(ERROR) << "DoFetchNextSeqList - invalid fetch_next_sequence_list_req's id_list!!!";
return 0;
}
seqsvr::Sequence _return;
client->sync_GetCurrentSequence(_return, id, 0);

ZRpcClientCall<zproto::SequenceListRsp>("alloc_client",
MakeRpcRequest(fetch_next_sequence_list_req),
[] (std::shared_ptr<ApiRpcOk<zproto::SequenceListRsp>> seqs_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "DoFetchNextSeqList - rpc_error: " << rpc_error->ToString();
} else {
LOG(INFO) << "DoFetchNextSeqList - seq_rsp: " << seqs_rsp->ToString();
}
return 0;
});
return 0;
}
LOG(INFO) << "sync_GetCurrentSequence - seq_rsp: " << _return.sequence;

int DoGetCurrentSeqList(const std::vector<folly::StringPiece>& command_lines) {
zproto::GetCurrentSequenceListReq get_current_sequence_list_req;
for (size_t i=1; i<command_lines.size(); ++i) {
uint32_t id = 0;
try {
id = folly::to<uint32_t>(command_lines[i]);
} catch (...) {
LOG(ERROR) << "DoGetCurrentSeqList - user_id invalid, not a number: " << command_lines[1];
continue;
}
get_current_sequence_list_req.add_id_list(id);
}
if (get_current_sequence_list_req.id_list_size() == 0) {
LOG(ERROR) << "DoGetCurrentSeqList - invalid fetch_next_sequence_list_req's id_list!!!";
return 0;
}

ZRpcClientCall<zproto::SequenceListRsp>("alloc_client",
MakeRpcRequest(get_current_sequence_list_req),
[] (std::shared_ptr<ApiRpcOk<zproto::SequenceListRsp>> seqs_rsp,
ProtoRpcResponsePtr rpc_error) -> int {
if (rpc_error) {
LOG(ERROR) << "DoGetCurrentSeqList - rpc_error: " << rpc_error->ToString();
} else {
LOG(INFO) << "DoGetCurrentSeqList - seq_rsp: " << seqs_rsp->ToString();
}
return 0;
});
return 0;
}

int DoQuit(const std::vector<folly::StringPiece>& command_lines) {
int DoQuit(seqsvr::AllocServiceAsyncClient* client, const std::vector<folly::StringPiece>& command_lines) {
// exit(0);
return -2;
}
Expand All @@ -160,8 +78,6 @@ CmdEntry g_cmds[] = {
// login/logout
{"fetch_next_seq", "fetch_next_seq id", 2, 2, DoFetchNextSeq},
{"get_current_seq", "get_current_seq id", 2, 2, DoGetCurrentSeq},
{"fetch_next_seq_list", "fetch_next_seq_list id...", 2, 10, DoFetchNextSeqList},
{"get_current_seq_list", "get_current_seq_list id...", 2, 10, DoGetCurrentSeqList},
// quit
{"quit", "quit", 1, 0, DoQuit}
};
Expand All @@ -175,7 +91,7 @@ void PrintHelp() {
}


int DoClientCommand(const std::vector<folly::StringPiece>& command_lines) {
int DoClientCommand(seqsvr::AllocServiceAsyncClient* client, const std::vector<folly::StringPiece>& command_lines) {
if (command_lines.empty()) {
return 0;
}
Expand Down Expand Up @@ -211,7 +127,7 @@ int DoClientCommand(const std::vector<folly::StringPiece>& command_lines) {
}
}

return cmd->handler(command_lines);
return cmd->handler(client, command_lines);
}


Expand Down
4 changes: 3 additions & 1 deletion client/client_command_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <vector>
#include <folly/Range.h>

int DoClientCommand(const std::vector<folly::StringPiece>& command_lines);
#include "proto/gen-cpp2/AllocService.h"

int DoClientCommand(seqsvr::AllocServiceAsyncClient* client, const std::vector<folly::StringPiece>& command_lines);

#endif

2 changes: 2 additions & 0 deletions seqsvr/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
add_subdirectory(proto)
add_subdirectory(base)
add_subdirectory(seqsvr)
add_subdirectory(allocsvr)
add_subdirectory(storesvr)
Expand Down
24 changes: 7 additions & 17 deletions seqsvr/allocsvr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,19 @@
include_directories(. .. ../../nebula)

set (SRC_LIST
../base/section.h
../base/set.h
../base/router_table.cc
../base/router_table.h
lease_clerk.cc
lease_clerk.h
client_manager.cc
client_manager.h
allocsvr_manager.cc
allocsvr_manager.h
alloc_server.cc
alloc_server.h
zrpc_alloc_dispatcher.cc
zrpc_alloc_dispatcher.h
zrpc_alloc_service.cc
zrpc_alloc_service.h
alloc_service_impl.cc
alloc_service_impl.h

../proto/cc/seqsvr.pb.cc
../proto/cc/seqsvr.pb.h
../proto/proto/seqsvr.proto
../base/message_handler_util.h
)
alloc_service_handler.cc
alloc_service_handler.h
)

add_executable (allocsvr ${SRC_LIST})
target_link_libraries (allocsvr net base protobuf)
#target_link_libraries (allocsvr net base protobuf)
target_link_libraries (allocsvr seqsvr_base base seqsvr_gen-cpp2 boost_system boost_filesystem pthread glog gflags folly thrift thriftcpp2 thriftprotocol zstd ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} dl)

Loading

0 comments on commit 059f899

Please sign in to comment.