1
+ #include < disruptorplus/single_threaded_claim_strategy.hpp>
2
+ #include < disruptorplus/multi_threaded_claim_strategy.hpp>
3
+ #include < disruptorplus/blocking_wait_strategy.hpp>
4
+ #include < disruptorplus/spin_wait_strategy.hpp>
5
+ #include < disruptorplus/sequence_barrier_group.hpp>
6
+ #include < disruptorplus/ring_buffer.hpp>
7
+
8
+ #include < thread>
9
+ #include < chrono>
10
+ #include < cstdint>
11
+ #include < iostream>
12
+
13
+ namespace
14
+ {
15
+ template <typename WaitStrategy, template <typename T> class ClaimStrategy >
16
+ uint64_t CalculateOpsPerSecond (size_t bufferSize, uint64_t iterationCount, int consumerCount)
17
+ {
18
+ WaitStrategy waitStrategy;
19
+ std::vector<std::unique_ptr<disruptorplus::sequence_barrier<WaitStrategy>>> consumedBarriers (consumerCount);
20
+ ClaimStrategy<WaitStrategy> claimStrategy (bufferSize, waitStrategy);
21
+ disruptorplus::sequence_barrier_group<WaitStrategy> parallelConsumers (waitStrategy);
22
+ disruptorplus::ring_buffer<uint64_t > buffer (bufferSize);
23
+
24
+ for (int i = 0 ; i < consumerCount; ++i)
25
+ {
26
+ consumedBarriers[i].reset (new disruptorplus::sequence_barrier<WaitStrategy>(waitStrategy));
27
+ if (i + 1 != consumerCount)
28
+ {
29
+ parallelConsumers.add (*consumedBarriers[i]);
30
+ }
31
+ }
32
+ claimStrategy.add_claim_barrier (*consumedBarriers.back ());
33
+
34
+ const uint64_t expectedResult = (iterationCount * (iterationCount - 1 )) / 2 ;
35
+
36
+ std::vector<uint64_t > results (consumerCount);
37
+
38
+ std::vector<std::thread> consumers;
39
+ consumers.reserve (consumerCount);
40
+
41
+ // Consumers
42
+ for (int consumerIndex = 0 ; consumerIndex < consumerCount; ++consumerIndex)
43
+ {
44
+ if (consumerIndex + 1 != consumerCount)
45
+ {
46
+ consumers.emplace_back ([&, consumerIndex]()
47
+ {
48
+ uint64_t sum = 0 ;
49
+ disruptorplus::sequence_t nextToRead = 0 ;
50
+ uint64_t itemsRemaining = iterationCount;
51
+ auto & barrier = *consumedBarriers[consumerIndex];
52
+ while (itemsRemaining > 0 )
53
+ {
54
+ const auto available = claimStrategy.wait_until_published (nextToRead, nextToRead - 1 );
55
+ do
56
+ {
57
+ sum += buffer[nextToRead];
58
+ --itemsRemaining;
59
+ } while (nextToRead++ != available);
60
+ barrier.publish (available);
61
+ }
62
+
63
+ results[consumerIndex] = sum;
64
+ });
65
+ }
66
+ else
67
+ {
68
+ consumers.emplace_back ([&, consumerIndex]()
69
+ {
70
+ uint64_t sum = 0 ;
71
+ disruptorplus::sequence_t nextToRead = 0 ;
72
+ uint64_t itemsRemaining = iterationCount;
73
+ auto & barrier = *consumedBarriers[consumerIndex];
74
+ while (itemsRemaining > 0 )
75
+ {
76
+ const auto available = parallelConsumers.wait_until_published (nextToRead);
77
+ do
78
+ {
79
+ sum += buffer[nextToRead];
80
+ --itemsRemaining;
81
+ } while (nextToRead++ != available);
82
+ barrier.publish (available);
83
+ }
84
+
85
+ results[consumerIndex] = sum;
86
+ });
87
+ }
88
+ }
89
+
90
+ const auto start = std::chrono::high_resolution_clock::now ();
91
+
92
+ // Publisher
93
+ for (uint64_t i = 0 ; i < iterationCount; ++i)
94
+ {
95
+ const auto seq = claimStrategy.claim_one ();
96
+ buffer[seq] = i;
97
+ claimStrategy.publish (seq);
98
+ }
99
+
100
+ bool resultsOk = true ;
101
+ for (int i = 0 ; i < consumerCount; ++i)
102
+ {
103
+ consumers[i].join ();
104
+ if (results[i] != expectedResult)
105
+ {
106
+ resultsOk = false ;
107
+ }
108
+ }
109
+
110
+ if (!resultsOk)
111
+ {
112
+ throw std::domain_error (" Unexpected test result." );
113
+ }
114
+
115
+ const auto timeTaken = std::chrono::high_resolution_clock::now () - start;
116
+ const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count ();
117
+
118
+ return (iterationCount * 1000 * 1000 ) / timeTakenUS;
119
+ }
120
+ }
121
+
122
+ int main ()
123
+ {
124
+ const int consumerCount = 3 ;
125
+ const size_t bufferSize = 64 * 1024 ;
126
+ const uint64_t iterationCount = 10 * 1000 * 1000 ;
127
+ const uint32_t runCount = 5 ;
128
+
129
+ std::cout << " Diamond Throughput Benchmark" << std::endl
130
+ << " Consumer count: " << consumerCount << std::endl
131
+ << " Buffer size: " << bufferSize << std::endl
132
+ << " Iteration count: " << iterationCount << std::endl
133
+ << " Run count: " << runCount << std::endl;
134
+
135
+ try
136
+ {
137
+ #define BENCHMARK (CS,WS ) \
138
+ do { \
139
+ std::cout << #CS " /" #WS << std::endl; \
140
+ for (uint32_t run = 1 ; run <= runCount; ++run) \
141
+ { \
142
+ const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount, consumerCount); \
143
+ std::cout << " run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
144
+ } \
145
+ } while (false )
146
+
147
+ BENCHMARK (single_threaded_claim_strategy, spin_wait_strategy);
148
+ BENCHMARK (single_threaded_claim_strategy, blocking_wait_strategy);
149
+ BENCHMARK (multi_threaded_claim_strategy, spin_wait_strategy);
150
+ BENCHMARK (multi_threaded_claim_strategy, blocking_wait_strategy);
151
+ }
152
+ catch (std::exception & e)
153
+ {
154
+ std::cout << " error: " << e.what () << std::endl;
155
+ return 1 ;
156
+ }
157
+
158
+ return 0 ;
159
+ }
0 commit comments