-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththroughput.patch
133 lines (127 loc) · 4.16 KB
/
throughput.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
diff --git a/src/broker-pipe.cc b/src/broker-pipe.cc
index f948685..9c0cb48 100644
--- a/src/broker-pipe.cc
+++ b/src/broker-pipe.cc
@@ -4,12 +4,14 @@
#include <sys/select.h>
#include <utility>
#include <algorithm>
+#include <chrono>
#include <exception>
#include <iterator>
#include <limits>
#include <stdexcept>
#include <string>
#include <vector>
+#include <thread>
#include <mutex>
#include <cassert>
#include <iostream>
@@ -55,6 +57,9 @@ std::mutex cout_mtx;
using guard_type = std::unique_lock<std::mutex>;
+bool rate = false;
+size_t msg_count = 0;
+
void print_line(std::ostream& out, const std::string& line) {
guard_type guard{cout_mtx};
out << line << std::endl;
@@ -70,6 +75,9 @@ public:
size_t message_cap = std::numeric_limits<size_t>::max();
config() {
opt_group{custom_options_, "global"}
+ .add<bool>(rate, "rate,r",
+ "print the rate of messages once per second instead of the "
+ "message content")
.add(peers, "peers,p",
"list of peers we connect to on startup (host:port notation)")
.add(local_port, "local-port,l",
@@ -90,8 +98,10 @@ void publish_mode_blocking(broker::endpoint& ep, const std::string& topic_str,
auto out = ep.make_publisher(topic_str);
std::string line;
size_t i = 0;
- while (std::getline(std::cin, line) && i++ < cap)
+ while (std::getline(std::cin, line) && i++ < cap) {
out.publish(std::move(line));
+ ++msg_count;
+ }
}
void publish_mode_select(broker::endpoint& ep, const std::string& topic_str,
@@ -116,6 +126,7 @@ void publish_mode_select(broker::endpoint& ep, const std::string& topic_str,
else
out.publish(std::move(line));
i += num;
+ msg_count += num;
}
}
@@ -137,6 +148,7 @@ void publish_mode_stream(broker::endpoint& ep, const std::string& topic_str,
out.push(std::make_pair(topic_str, std::move(line)));
}
msgs += num;
+ msg_count += num;
},
[=](const size_t& msgs) {
return msgs == cap;
@@ -150,8 +162,12 @@ void subscribe_mode_blocking(broker::endpoint& ep, const std::string& topic_str,
size_t cap) {
auto in = ep.make_subscriber({topic_str});
std::string line;
- for (size_t i = 0; i < cap; ++i)
- print_line(std::cout, deep_to_string(in.get()));
+ for (size_t i = 0; i < cap; ++i) {
+ auto msg = in.get();
+ if (!rate)
+ print_line(std::cout, deep_to_string(std::move(msg)));
+ ++msg_count;
+ }
}
void subscribe_mode_select(broker::endpoint& ep, const std::string& topic_str,
@@ -168,9 +184,13 @@ void subscribe_mode_select(broker::endpoint& ep, const std::string& topic_str,
return;
}
auto num = std::min(cap - i, in.available());
- for (size_t j = 0; j < num; ++j)
- print_line(std::cout, deep_to_string(in.get()));
+ for (size_t j = 0; j < num; ++j) {
+ auto msg = in.get();
+ if (!rate)
+ print_line(std::cout, deep_to_string(std::move(msg)));
+ }
i += num;
+ msg_count += num;
}
}
@@ -182,7 +202,9 @@ void subscribe_mode_stream(broker::endpoint& ep, const std::string& topic_str,
msgs = 0;
},
[=](size_t& msgs, std::pair<topic, data> x) {
- print_line(std::cout, deep_to_string(x));
+ ++msg_count;
+ if (!rate)
+ print_line(std::cout, deep_to_string(x));
if (++msgs >= cap)
throw std::runtime_error("Reached cap");
},
@@ -248,6 +270,18 @@ int main(int argc, char** argv) {
guard_type guard{cout_mtx};
std::cerr << "*** invalid mode or implementation setting\n";
};
+ if (rate) {
+ auto rate_printer = std::thread{[]{
+ size_t msg_count_prev = msg_count;
+ while (true) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ size_t current = msg_count;
+ std::cout << current - msg_count_prev << std::endl;
+ msg_count_prev = current;
+ }
+ }};
+ rate_printer.detach();
+ }
using mode_fun = void (*)(broker::endpoint&, const std::string&, size_t);
mode_fun fs[] = {
publish_mode_blocking,
@@ -272,4 +306,3 @@ int main(int argc, char** argv) {
f(ep, cfg.topic, cfg.message_cap);
anon_send_exit(el, exit_reason::user_shutdown);
}
-