Skip to content

Commit 2d2fcbe

Browse files
committed
Impl LoadBalance
1 parent 8d33691 commit 2d2fcbe

6 files changed

+68
-5
lines changed

event/hloop.h

+9
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,15 @@ HV_INLINE uint32_t reconn_setting_calc_delay(reconn_setting_t* reconn) {
627627
return reconn->cur_delay;
628628
}
629629

630+
//-----------------LoadBalance-------------------------------------
631+
typedef enum {
632+
LB_RoundRobin,
633+
LB_Random,
634+
LB_LeastConnections,
635+
LB_IpHash,
636+
LB_UrlHash,
637+
} load_balance_e;
638+
630639
//-----------------rudp---------------------------------------------
631640
#if WITH_KCP
632641
#define WITH_RUDP 1

evpp/EventLoop.h

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class EventLoop : public Status {
3232
loop_ = hloop_new(HLOOP_FLAG_AUTO_FREE);
3333
is_loop_owner = true;
3434
}
35+
connectionNum = 0;
3536
setStatus(kInitialized);
3637
}
3738

@@ -212,6 +213,8 @@ class EventLoop : public Status {
212213
if (ev && ev->cb) ev->cb(ev.get());
213214
}
214215

216+
public:
217+
std::atomic<uint32_t> connectionNum; // for LB_LeastConnections
215218
private:
216219
hloop_t* loop_;
217220
bool is_loop_owner;

evpp/EventLoopThreadPool.h

+20-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define HV_EVENT_LOOP_THREAD_POOL_HPP_
33

44
#include "EventLoopThread.h"
5+
#include "hbase.h"
56

67
namespace hv {
78

@@ -27,9 +28,25 @@ class EventLoopThreadPool : public Status {
2728
thread_num_ = num;
2829
}
2930

30-
EventLoopPtr nextLoop() {
31-
if (loop_threads_.empty()) return NULL;
32-
return loop_threads_[++next_loop_idx_ % loop_threads_.size()]->loop();
31+
EventLoopPtr nextLoop(load_balance_e lb = LB_RoundRobin) {
32+
int numLoops = loop_threads_.size();
33+
if (numLoops == 0) return NULL;
34+
int idx = 0;
35+
if (lb == LB_RoundRobin) {
36+
if (++next_loop_idx_ >= numLoops) next_loop_idx_ = 0;
37+
idx = next_loop_idx_;
38+
} else if (lb == LB_Random) {
39+
idx = hv_rand(0, numLoops - 1);
40+
} else if (lb == LB_LeastConnections) {
41+
for (int i = 1; i < numLoops; ++i) {
42+
if (loop_threads_[i]->loop()->connectionNum < loop_threads_[idx]->loop()->connectionNum) {
43+
idx = i;
44+
}
45+
}
46+
} else {
47+
// Not Implemented
48+
}
49+
return loop_threads_[idx]->loop();
3350
}
3451

3552
EventLoopPtr loop(int idx = -1) {

evpp/TcpClient_test.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
#include "TcpClient.h"
1111
#include "htime.h"
1212

13+
#define TEST_RECONNECT 1
14+
#define TEST_TLS 0
15+
1316
using namespace hv;
1417

1518
int main(int argc, char* argv[]) {
@@ -52,13 +55,21 @@ int main(int argc, char* argv[]) {
5255
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
5356
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
5457
};
58+
59+
#if TEST_RECONNECT
5560
// reconnect: 1,2,4,8,10,10,10...
5661
reconn_setting_t reconn;
5762
reconn_setting_init(&reconn);
5863
reconn.min_delay = 1000;
5964
reconn.max_delay = 10000;
6065
reconn.delay_policy = 2;
6166
cli.setReconnect(&reconn);
67+
#endif
68+
69+
#if TEST_TLS
70+
cli.withTLS();
71+
#endif
72+
6273
cli.start();
6374

6475
// press Enter to stop

evpp/TcpServer.h

+12-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TcpServerTmpl {
2020
tls = false;
2121
unpack_setting.mode = UNPACK_MODE_NONE;
2222
max_connections = 0xFFFFFFFF;
23+
load_balance = LB_RoundRobin;
2324
}
2425

2526
virtual ~TcpServerTmpl() {
@@ -46,6 +47,10 @@ class TcpServerTmpl {
4647
max_connections = num;
4748
}
4849

50+
void setLoadBalance(load_balance_e lb) {
51+
load_balance = lb;
52+
}
53+
4954
// NOTE: totalThreadNum = 1 acceptor_thread + N worker_threads (N can be 0)
5055
void setThreadNum(int num) {
5156
worker_threads.setThreadNum(num);
@@ -168,6 +173,10 @@ class TcpServerTmpl {
168173
}
169174
};
170175
channel->onclose = [server, &channel]() {
176+
EventLoop* worker_loop = currentThreadEventLoop;
177+
assert(worker_loop != NULL);
178+
--worker_loop->connectionNum;
179+
171180
channel->status = SocketChannel::CLOSED;
172181
if (server->onConnection) {
173182
server->onConnection(channel);
@@ -190,11 +199,11 @@ class TcpServerTmpl {
190199
TcpServerTmpl* server = (TcpServerTmpl*)hevent_userdata(connio);
191200
// NOTE: detach from acceptor loop
192201
hio_detach(connio);
193-
// Load Banlance: Round-Robin
194-
EventLoopPtr worker_loop = server->worker_threads.nextLoop();
202+
EventLoopPtr worker_loop = server->worker_threads.nextLoop(server->load_balance);
195203
if (worker_loop == NULL) {
196204
worker_loop = server->acceptor_thread.loop();
197205
}
206+
++worker_loop->connectionNum;
198207
worker_loop->runInLoop(std::bind(&TcpServerTmpl::newConnEvent, connio));
199208
}
200209

@@ -209,6 +218,7 @@ class TcpServerTmpl {
209218
std::function<void(const TSocketChannelPtr&, Buffer*)> onWriteComplete;
210219

211220
uint32_t max_connections;
221+
load_balance_e load_balance;
212222

213223
private:
214224
// id => TSocketChannelPtr

evpp/TcpServer_test.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
using namespace hv;
1313

14+
#define TEST_TLS 0
15+
1416
int main(int argc, char* argv[]) {
1517
if (argc < 2) {
1618
printf("Usage: %s port\n", argv[0]);
@@ -40,6 +42,17 @@ int main(int argc, char* argv[]) {
4042
channel->write(buf);
4143
};
4244
srv.setThreadNum(4);
45+
srv.setLoadBalance(LB_LeastConnections);
46+
47+
#if TEST_TLS
48+
hssl_ctx_opt_t ssl_opt;
49+
memset(&ssl_opt, 0, sizeof(hssl_ctx_opt_t));
50+
ssl_opt.crt_file = "cert/server.crt";
51+
ssl_opt.key_file = "cert/server.key";
52+
ssl_opt.verify_peer = 0;
53+
srv.withTLS(&ssl_opt);
54+
#endif
55+
4356
srv.start();
4457

4558
// press Enter to stop

0 commit comments

Comments
 (0)