-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbidichat-callback-server.cc
158 lines (133 loc) · 3.66 KB
/
bidichat-callback-server.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#include <iostream>
#include <string>
#include <thread>
#include <list>
#include <queue>
#include <condition_variable>
#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include "protos/bidichat.grpc.pb.h"
using bidichat::Chat;
using bidichat::Message;
using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::Status;
class ClientHandler;
// ChatImpl extends some grpc 'CallbackService'. grpc calls into the 'Chat' method when a new client connects
// I have chosen to use this class to maintain a list of all the connected clients and have added methods here
// for communicating between them
class ChatServer final : public Chat::CallbackService
{
public:
void AddClient(ClientHandler *client);
void NewMessage(Message message);
private:
// grpc calls this method when a new client connects
grpc::ServerBidiReactor<Message, Message> *Chat(CallbackServerContext *context);
std::mutex mtx;
std::list<ClientHandler *> client_list;
};
// One instance of this per client. This is what sends and receives messages to that specific client
class ClientHandler : public grpc::ServerBidiReactor<Message, Message>
{
public:
ClientHandler(ChatServer *server);
void OnDone() override { delete this; }
void OnWriteDone(bool ok) override;
void OnReadDone(bool ok) override;
void SendNewMessage(Message new_message);
void WriteThread();
private:
std::thread writer_thread;
Message current_write;
Message current_read;
ChatServer *server;
std::queue<Message> message_queue;
std::condition_variable new_messages_cond;
std::mutex mtx;
bool currently_writing;
};
ClientHandler::ClientHandler(ChatServer *server) : server(server)
{
StartRead(¤t_read);
writer_thread = std::thread(&ClientHandler::WriteThread, this);
}
void ClientHandler::WriteThread()
{
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
new_messages_cond.wait(lock,
[this]
{ return !message_queue.empty() and !currently_writing; });
current_write = message_queue.front();
message_queue.pop();
lock.unlock();
currently_writing = true;
StartWrite(¤t_write);
}
}
void ClientHandler::OnWriteDone(bool ok)
{
if (ok)
{
std::lock_guard<std::mutex> guard(mtx);
currently_writing = false;
new_messages_cond.notify_one();
}
}
void ClientHandler::OnReadDone(bool ok)
{
if (ok)
{
server->NewMessage(current_read);
StartRead(¤t_read);
}
else
{
Finish(Status::OK);
}
}
void ClientHandler::SendNewMessage(Message new_message)
{
std::lock_guard<std::mutex> guard(mtx);
message_queue.push(new_message);
new_messages_cond.notify_one();
}
void ChatServer::AddClient(ClientHandler *client)
{
std::lock_guard<std::mutex> guard(mtx);
client_list.push_back(client);
}
void ChatServer::NewMessage(Message message)
{
std::lock_guard<std::mutex> guard(mtx);
for (auto client : client_list)
{
client->SendNewMessage(message);
}
}
grpc::ServerBidiReactor<Message, Message> *ChatServer::Chat(CallbackServerContext *context)
{
ClientHandler *client = new ClientHandler(this);
AddClient(client);
return client;
}
/*
int main(int argc, char **argv)
{
std::string server_address("0.0.0.0:50051");
ChatServer service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
return 0;
}
*/