Skip to content

Commit 4235045

Browse files
Support demoting indexers to non-indexers (#179)
* add test for demoting indexer * system updates indexers when demoted * remove unused writer.isIndexer prop * update writer state after making linearizer * make isActiveIndexer private prop * only update isActiveIndexer in makeLinearizer * review * only unset active writers after processing * isActiveIndexer only set in one place
1 parent e34377a commit 4235045

File tree

5 files changed

+107
-55
lines changed

5 files changed

+107
-55
lines changed

Diff for: index.js

+23-28
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ module.exports = class Autobase extends ReadyResource {
195195
return this._primaryBootstrap === null ? this.local.discoveryKey : this._primaryBootstrap.discoveryKey
196196
}
197197

198-
get isActiveIndexer () {
198+
_isActiveIndexer () {
199199
return this.localWriter ? this.localWriter.isActiveIndexer : false
200200
}
201201

@@ -908,6 +908,7 @@ module.exports = class Autobase extends ReadyResource {
908908
}
909909
}
910910

911+
// no guarantees about writer.isActiveIndexer property here
911912
async _getWriterByKey (key, len, seen, allowGC, isAdded, system) {
912913
assert(this._draining === true || (this.opening && !this.opened))
913914

@@ -1012,6 +1013,19 @@ module.exports = class Autobase extends ReadyResource {
10121013
}
10131014

10141015
_updateLinearizer (indexers, heads) {
1016+
// only current active indexers are reset to true below
1017+
const wasActiveIndexer = this._isActiveIndexer()
1018+
1019+
for (const w of this.activeWriters) w.isActiveIndexer = false
1020+
for (const writer of indexers) writer.isActiveIndexer = true
1021+
1022+
if (this._isActiveIndexer() && !wasActiveIndexer) {
1023+
this._setLocalIndexer()
1024+
} else if (!this._isActiveIndexer() && wasActiveIndexer) {
1025+
this._unsetLocalIndexer()
1026+
this._clearLocalIndexer()
1027+
}
1028+
10151029
this.linearizer = new Linearizer(indexers, { heads, writers: this.activeWriters })
10161030
this._addCheckpoints = !!(this.localWriter && (this.localWriter.isActiveIndexer || this._isPending()))
10171031
this._updateAckThreshold()
@@ -1032,8 +1046,6 @@ module.exports = class Autobase extends ReadyResource {
10321046

10331047
this.activeWriters.add(bootstrap)
10341048
this._checkWriters.push(bootstrap)
1035-
if (bootstrap === this.localWriter) this._setLocalIndexer()
1036-
bootstrap.isActiveIndexer = true
10371049
bootstrap.inflateBackground()
10381050
await bootstrap.ready()
10391051
this._resumeWriter(bootstrap)
@@ -1053,26 +1065,20 @@ module.exports = class Autobase extends ReadyResource {
10531065
}
10541066

10551067
const indexers = []
1056-
let localIndexer = false
1057-
const wasActiveIndexer = !!this.isActiveIndexer
10581068

10591069
for (const head of sys.indexers) {
10601070
const writer = await this._getWriterByKey(head.key, head.length, 0, false, false, sys)
1061-
if (writer === this.localWriter) localIndexer = true
1062-
writer.isActiveIndexer = true
10631071
writer.inflateBackground()
10641072
indexers.push(writer)
10651073
}
10661074

1067-
for (const key of sys.pendingIndexers) {
1068-
if (b4a.equals(key, this.local.key)) localIndexer = true
1069-
}
1070-
1071-
if (localIndexer && !wasActiveIndexer) {
1072-
this._setLocalIndexer()
1073-
} else if (!localIndexer && wasActiveIndexer) {
1074-
this._unsetLocalIndexer()
1075-
this._clearLocalIndexer()
1075+
if (!this._isActiveIndexer()) {
1076+
for (const key of sys.pendingIndexers) {
1077+
if (b4a.equals(key, this.local.key)) {
1078+
this._setLocalIndexer()
1079+
break
1080+
}
1081+
}
10761082
}
10771083

10781084
this._updateLinearizer(indexers, sys.heads)
@@ -1087,18 +1093,7 @@ module.exports = class Autobase extends ReadyResource {
10871093

10881094
for (const w of this.activeWriters) {
10891095
const data = await this.system.get(w.core.key)
1090-
1091-
if (data) {
1092-
w.isRemoved = data.isRemoved
1093-
w.isIndexer = data.isIndexer
1094-
} else {
1095-
w.isRemoved = true
1096-
w.isIndexer = false
1097-
}
1098-
1099-
if (!w.isIndexer) {
1100-
w.isActiveIndexer = false
1101-
}
1096+
w.isRemoved = data ? data.isRemoved : false
11021097
}
11031098
}
11041099

Diff for: lib/system.js

+8-6
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ module.exports = class SystemView extends ReadyResource {
230230
if (this.pendingIndexers.length > 0) {
231231
for (let i = 0; i < this.pendingIndexers.length; i++) {
232232
if (!b4a.equals(this.pendingIndexers[i], h.key)) continue
233-
this._updateIndexer(h.key, h.length, false, i)
233+
this._updateIndexer(h.key, h.length, true, i)
234234
return true
235235
}
236236
}
@@ -243,10 +243,10 @@ module.exports = class SystemView extends ReadyResource {
243243
return false
244244
}
245245

246-
_updateIndexer (key, length, isRemoved, i) {
246+
_updateIndexer (key, length, isIndexer, i) {
247247
const hex = b4a.toString(key, 'hex')
248248

249-
if (isRemoved) {
249+
if (!isIndexer) {
250250
const existing = this._indexerMap.get(hex)
251251
if (existing) {
252252
this.indexerUpdate = true
@@ -260,7 +260,7 @@ module.exports = class SystemView extends ReadyResource {
260260
if (b4a.equals(this.pendingIndexers[i], key)) break
261261
}
262262

263-
if (!isRemoved && length === 0) {
263+
if (length === 0) {
264264
if (i >= this.pendingIndexers.length) this.pendingIndexers.push(key)
265265
return
266266
}
@@ -290,6 +290,7 @@ module.exports = class SystemView extends ReadyResource {
290290

291291
async add (key, { isIndexer = false, length = this._seenLength(key) } = {}) {
292292
let wasTracked = false
293+
let wasIndexer = false
293294

294295
if (length === 0) { // a bit hacky atm due to cas limitations...
295296
const node = await this._batch.get(key, { valueEncoding: Member, keyEncoding: MEMBERS })
@@ -310,6 +311,7 @@ module.exports = class SystemView extends ReadyResource {
310311
const n = newer.value
311312

312313
if (!o.isRemoved) wasTracked = true
314+
if (o.isIndexer) wasIndexer = true
313315

314316
if (length === 0 && o.length) length = o.length
315317

@@ -319,7 +321,7 @@ module.exports = class SystemView extends ReadyResource {
319321

320322
if (!wasTracked) this.members++
321323

322-
if (isIndexer) this._updateIndexer(key, length, false, 0)
324+
if (wasIndexer || isIndexer) this._updateIndexer(key, length, isIndexer, 0)
323325
}
324326

325327
async remove (key) {
@@ -330,7 +332,7 @@ module.exports = class SystemView extends ReadyResource {
330332
if (isIndexer) break
331333
}
332334

333-
if (isIndexer) this._updateIndexer(key, null, true, 0)
335+
if (isIndexer) this._updateIndexer(key, null, false, 0)
334336

335337
let wasTracked = false
336338

Diff for: lib/writer.js

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ module.exports = class Writer extends ReadyResource {
2020
this.range = null
2121
this.nodes = new NodeBuffer(length)
2222
this.node = null
23-
this.isIndexer = false
2423
this.isActiveIndexer = false
2524
this.available = length
2625
this.length = length

Diff for: test/basic.js

+48-2
Original file line numberDiff line numberDiff line change
@@ -1440,7 +1440,7 @@ test('basic - promote writer to indexer', async t => {
14401440
t.is(b.linearizer.indexers.length, 1)
14411441

14421442
t.absent(b.isIndexer)
1443-
t.absent(b.isActiveIndexer)
1443+
t.absent(b.localWriter.isActiveIndexer)
14441444

14451445
await b.append(null)
14461446
await replicateAndSync([a, b])
@@ -1457,7 +1457,53 @@ test('basic - promote writer to indexer', async t => {
14571457
await t.execution(event)
14581458
t.is(b.linearizer.indexers.length, 2)
14591459
t.ok(b.isIndexer)
1460-
t.ok(b.isActiveIndexer)
1460+
t.ok(b.localWriter.isActiveIndexer)
1461+
})
1462+
1463+
test('basic - demote indexer to writer', async t => {
1464+
t.plan(13)
1465+
1466+
const { bases } = await create(2, t)
1467+
1468+
const [a, b] = bases
1469+
1470+
// add writer
1471+
await addWriterAndSync(a, b)
1472+
1473+
t.is(a.linearizer.indexers.length, 2)
1474+
t.is(b.linearizer.indexers.length, 2)
1475+
1476+
t.ok(b.isIndexer)
1477+
t.ok(b.localWriter.isActiveIndexer)
1478+
1479+
await b.append('message')
1480+
await confirm([a, b])
1481+
1482+
t.is(a.view.indexedLength, 1)
1483+
t.is(b.view.indexedLength, 1)
1484+
1485+
const event = new Promise(resolve => b.on('is-non-indexer', resolve))
1486+
1487+
// demote writer
1488+
await addWriter(a, b, false)
1489+
1490+
await confirm([a, b])
1491+
1492+
t.is(a.linearizer.indexers.length, 1)
1493+
t.is(b.linearizer.indexers.length, 1)
1494+
1495+
await replicateAndSync([a, b])
1496+
1497+
await t.execution(event)
1498+
1499+
t.is(b.linearizer.indexers.length, 1)
1500+
t.absent(b.isIndexer)
1501+
t.absent(b.localWriter.isActiveIndexer)
1502+
1503+
// flush active writer set
1504+
await a.append(null)
1505+
1506+
t.is(a.activeWriters.size, 1)
14611507
})
14621508

14631509
test('basic - add new indexer after removing', async t => {

Diff for: test/fast-forward.js

+28-18
Original file line numberDiff line numberDiff line change
@@ -895,51 +895,61 @@ test('fast-forward - writer removed', async t => {
895895
})
896896

897897
test('fast-forward - is indexer set correctly', async t => {
898-
t.plan(9)
898+
t.plan(11)
899899

900-
const { bases } = await create(3, t, {
900+
const { bases } = await create(4, t, {
901901
fastForward: true,
902902
storage: () => tmpDir(t)
903903
})
904904

905-
const [a, b, c] = bases
905+
const [a, b, c, d] = bases
906906

907907
for (let i = 0; i < 200; i++) {
908908
await a.append('a' + i)
909909
}
910910

911911
// add writer
912-
await addWriter(a, b, false)
913-
await replicateAndSync([a, b])
912+
await addWriterAndSync(a, b)
913+
await replicateAndSync([a, b, c, d])
914914

915-
await b.append(null)
916-
await replicateAndSync([a, b])
915+
await addWriter(a, c, false)
916+
await replicateAndSync([a, c])
917+
918+
await c.append(null)
919+
await replicateAndSync([a, b, c, d])
917920

918921
// promote writer
919-
await addWriter(a, b, true)
922+
await addWriter(a, c, true)
923+
await confirm([a, b])
920924

921-
t.is(a.linearizer.indexers.length, 2)
922-
t.is(b.linearizer.indexers.length, 1)
925+
t.is(a.linearizer.indexers.length, 3)
926+
t.is(b.linearizer.indexers.length, 3)
927+
t.is(c.linearizer.indexers.length, 2)
923928

924-
t.absent(b.isIndexer)
925-
t.absent(b.isActiveIndexer)
929+
t.absent(c.isIndexer)
930+
t.absent(c.localWriter.isActiveIndexer)
926931

927932
for (let i = 200; i < 400; i++) {
928933
await a.append('a' + i)
929934
}
930935

931936
// c has ff'd past addWriter
932-
await replicateAndSync([a, c])
937+
await confirm([a, b])
938+
939+
await replicateAndSync([a, d])
940+
941+
t.is(a.view.getBackingCore().session.length, 400)
933942

934943
t.is(c.linearizer.indexers.length, 2)
935944

936-
const event = new Promise(resolve => b.on('is-indexer', resolve))
945+
const event = new Promise(resolve => c.on('is-indexer', resolve))
937946

938-
await replicateAndSync([b, c])
947+
await replicateAndSync([c, d])
948+
949+
t.is(c.linearizer.indexers.length, 3)
950+
t.ok(c.isIndexer)
951+
t.ok(c.localWriter.isActiveIndexer)
939952

940-
t.is(b.linearizer.indexers.length, 2)
941-
t.ok(b.isIndexer)
942-
t.ok(b.isActiveIndexer)
943953
await t.execution(event)
944954
})
945955

0 commit comments

Comments
 (0)