-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathdisseminator.js
118 lines (94 loc) · 3.52 KB
/
disseminator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
'use strict';
var events = require('events');
var test = require('tape');
var Codec = require('../lib/codec');
var Disseminator = require('../lib/disseminator');
var Membership = require('../lib/membership');
var MessageType = require('../lib/message-type');
var Net = require('../lib/net');
var codec = new Codec();
test('Disseminator honors bytesAvailable, dissemination limit and priority', function t(assert) {
var membership = new events.EventEmitter();
var disseminator = new Disseminator({
swim: {
codec: codec,
membership: membership
}
});
var minDgramSize = Math.pow(2, 6);
var maxDgramSize = Math.pow(2, 9);
var numberOfUpdates = Math.ceil(Math.random() * 5);
var numberOfMembers = Math.ceil(Math.random() * 3);
var numberOfBuffers = 0;
var hostToCount = Object.create(null);
var buffers;
var bytesAvailable;
var update;
var length;
var i;
membership.size = function size() {
return numberOfMembers;
};
disseminator.start();
buffers = disseminator.getUpdatesUpTo(Infinity);
assert.strictEqual(buffers.length, 0);
for (i = 0; i < numberOfUpdates; i++) {
membership.emit(Membership.EventType.Update, {
host: 'localhost:' + i
});
}
bytesAvailable = Math.ceil(Math.random() * (maxDgramSize - minDgramSize)) + minDgramSize;
buffers = disseminator.getUpdatesUpTo(bytesAvailable);
while (buffers.length > 0) {
length = 0;
/* jshint loopfunc: true */
buffers.forEach(function verify(buffer) {
assert.strictEqual(Net.ReadMessageType.call(buffer, 0), MessageType.Update);
length += buffer.length;
update = codec.decode(buffer.slice(Net.MessageTypeSize));
Object.keys(hostToCount).forEach(function verifyPriority(host) {
assert.strictEqual(hostToCount[update.host] > hostToCount[host], false);
});
hostToCount[update.host] = (hostToCount[update.host] || 0) + 1;
});
/* jshint loopfunc: false */
assert.strictEqual(length < bytesAvailable, true);
numberOfBuffers += buffers.length;
bytesAvailable = Math.ceil(Math.random() * (maxDgramSize - minDgramSize)) + minDgramSize;
buffers = disseminator.getUpdatesUpTo(bytesAvailable);
}
assert.strictEqual(numberOfBuffers, numberOfUpdates *
Disseminator.Default.disseminationFormula(Disseminator.Default.disseminationFactor, numberOfMembers));
disseminator.stop();
assert.end();
});
test('Disseminator removes over disseminated updates on decrease of dissemination limit', function t(assert) {
var membership = new events.EventEmitter();
var disseminator = new Disseminator({
swim: {
codec: codec,
membership: membership
},
disseminationFactor: 1,
disseminationFormula: function disseminationFormula(factor, size) {
return factor * size;
}
});
var numberOfMembers = 10;
var buffers;
membership.size = function size() {
return numberOfMembers;
};
disseminator.start();
membership.emit(Membership.EventType.Update, {
host: 'localhost:22'
});
buffers = disseminator.getUpdatesUpTo(Infinity);
assert.strictEqual(buffers.length, 1);
numberOfMembers = 1;
membership.emit(Membership.EventType.Change);
buffers = disseminator.getUpdatesUpTo(Infinity);
assert.strictEqual(buffers.length, 0);
disseminator.stop();
assert.end();
});