-
Notifications
You must be signed in to change notification settings - Fork 20
/
sample_async_server.cc
102 lines (91 loc) · 2.89 KB
/
sample_async_server.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#include <sample.grpc.pb.h>
#include <grpc++/grpc++.h>
#include <memory>
#include <iostream>
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using sample::SampleRequest;
using sample::SampleResponse;
using sample::SampleService;
class ServerImpl final {
public:
~ServerImpl() {
_server->Shutdown();
_queue->Shutdown();
}
void Run() {
std::string server_address{"localhost:2511"};
// Build server
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&_service);
_queue = builder.AddCompletionQueue();
_server = builder.BuildAndStart();
// Run server
std::cout << "Server listening on " << server_address << std::endl;
HandleRPCs();
}
private:
class CallData {
public:
CallData(SampleService::AsyncService* service, ServerCompletionQueue* queue)
: _service{service}, _queue{queue}, _responder{&_context}, _status{CallStatus::CREATE} {
Proceed();
}
void Proceed() {
switch (_status) {
case CallStatus::CREATE: {
_status = CallStatus::PROCESS;
_service->RequestSampleMethod(&_context, &_request, &_responder, _queue, _queue, this);
break;
}
case CallStatus::PROCESS: {
new CallData{_service, _queue};
_response.set_response_sample_field("Hello " + _request.request_sample_field());
_status = CallStatus::FINISH;
_responder.Finish(_response, Status::OK, this);
break;
}
default: {
delete this;
}
}
}
private:
SampleService::AsyncService* _service;
ServerCompletionQueue* _queue;
ServerContext _context;
SampleRequest _request;
SampleResponse _response;
ServerAsyncResponseWriter<SampleResponse> _responder;
enum class CallStatus {
CREATE, PROCESS, FINISH
};
CallStatus _status;
};
void HandleRPCs() {
new CallData{&_service, _queue.get()};
void* tag;
bool ok;
while (true) {
if (_queue->Next(&tag, &ok) && ok) {
static_cast<CallData*>(tag)->Proceed();
} else {
std::cerr << "Something went wrong" << std::endl;
abort();
}
}
}
SampleService::AsyncService _service;
std::unique_ptr<ServerCompletionQueue> _queue;
std::unique_ptr<Server> _server;
};
int main(int argc, char** argv) {
ServerImpl server;
server.Run();
return 0;
}