Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ module.exports = class Autobase extends ReadyResource {
return this._primaryBootstrap === null ? this.local.discoveryKey : this._primaryBootstrap.discoveryKey
}

get isActiveIndexer () {
_isActiveIndexer () {
return this.localWriter ? this.localWriter.isActiveIndexer : false
}

Expand Down Expand Up @@ -908,6 +908,7 @@ module.exports = class Autobase extends ReadyResource {
}
}

// no guarantees about writer.isActiveIndexer property here
async _getWriterByKey (key, len, seen, allowGC, isAdded, system) {
assert(this._draining === true || (this.opening && !this.opened))

Expand Down Expand Up @@ -1012,6 +1013,19 @@ module.exports = class Autobase extends ReadyResource {
}

_updateLinearizer (indexers, heads) {
// only current active indexers are reset to true below
const wasActiveIndexer = this._isActiveIndexer()

for (const w of this.activeWriters) w.isActiveIndexer = false
for (const writer of indexers) writer.isActiveIndexer = true

if (this._isActiveIndexer() && !wasActiveIndexer) {
this._setLocalIndexer()
} else if (!this._isActiveIndexer() && wasActiveIndexer) {
this._unsetLocalIndexer()
this._clearLocalIndexer()
}

this.linearizer = new Linearizer(indexers, { heads, writers: this.activeWriters })
this._addCheckpoints = !!(this.localWriter && (this.localWriter.isActiveIndexer || this._isPending()))
this._updateAckThreshold()
Expand All @@ -1032,8 +1046,6 @@ module.exports = class Autobase extends ReadyResource {

this.activeWriters.add(bootstrap)
this._checkWriters.push(bootstrap)
if (bootstrap === this.localWriter) this._setLocalIndexer()
bootstrap.isActiveIndexer = true
bootstrap.inflateBackground()
await bootstrap.ready()
this._resumeWriter(bootstrap)
Expand All @@ -1053,26 +1065,20 @@ module.exports = class Autobase extends ReadyResource {
}

const indexers = []
let localIndexer = false
const wasActiveIndexer = !!this.isActiveIndexer

for (const head of sys.indexers) {
const writer = await this._getWriterByKey(head.key, head.length, 0, false, false, sys)
if (writer === this.localWriter) localIndexer = true
writer.isActiveIndexer = true
writer.inflateBackground()
indexers.push(writer)
}

for (const key of sys.pendingIndexers) {
if (b4a.equals(key, this.local.key)) localIndexer = true
}

if (localIndexer && !wasActiveIndexer) {
this._setLocalIndexer()
} else if (!localIndexer && wasActiveIndexer) {
this._unsetLocalIndexer()
this._clearLocalIndexer()
if (!this._isActiveIndexer()) {
for (const key of sys.pendingIndexers) {
if (b4a.equals(key, this.local.key)) {
this._setLocalIndexer()
break
}
}
}

this._updateLinearizer(indexers, sys.heads)
Expand All @@ -1087,18 +1093,7 @@ module.exports = class Autobase extends ReadyResource {

for (const w of this.activeWriters) {
const data = await this.system.get(w.core.key)

if (data) {
w.isRemoved = data.isRemoved
w.isIndexer = data.isIndexer
} else {
w.isRemoved = true
w.isIndexer = false
}

if (!w.isIndexer) {
w.isActiveIndexer = false
}
w.isRemoved = data ? data.isRemoved : false
}
}

Expand Down
14 changes: 8 additions & 6 deletions lib/system.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ module.exports = class SystemView extends ReadyResource {
if (this.pendingIndexers.length > 0) {
for (let i = 0; i < this.pendingIndexers.length; i++) {
if (!b4a.equals(this.pendingIndexers[i], h.key)) continue
this._updateIndexer(h.key, h.length, false, i)
this._updateIndexer(h.key, h.length, true, i)
return true
}
}
Expand All @@ -243,10 +243,10 @@ module.exports = class SystemView extends ReadyResource {
return false
}

_updateIndexer (key, length, isRemoved, i) {
_updateIndexer (key, length, isIndexer, i) {
const hex = b4a.toString(key, 'hex')

if (isRemoved) {
if (!isIndexer) {
const existing = this._indexerMap.get(hex)
if (existing) {
this.indexerUpdate = true
Expand All @@ -260,7 +260,7 @@ module.exports = class SystemView extends ReadyResource {
if (b4a.equals(this.pendingIndexers[i], key)) break
}

if (!isRemoved && length === 0) {
if (length === 0) {
if (i >= this.pendingIndexers.length) this.pendingIndexers.push(key)
return
}
Expand Down Expand Up @@ -290,6 +290,7 @@ module.exports = class SystemView extends ReadyResource {

async add (key, { isIndexer = false, length = this._seenLength(key) } = {}) {
let wasTracked = false
let wasIndexer = false

if (length === 0) { // a bit hacky atm due to cas limitations...
const node = await this._batch.get(key, { valueEncoding: Member, keyEncoding: MEMBERS })
Expand All @@ -310,6 +311,7 @@ module.exports = class SystemView extends ReadyResource {
const n = newer.value

if (!o.isRemoved) wasTracked = true
if (o.isIndexer) wasIndexer = true

if (length === 0 && o.length) length = o.length

Expand All @@ -319,7 +321,7 @@ module.exports = class SystemView extends ReadyResource {

if (!wasTracked) this.members++

if (isIndexer) this._updateIndexer(key, length, false, 0)
if (wasIndexer || isIndexer) this._updateIndexer(key, length, isIndexer, 0)
}

async remove (key) {
Expand All @@ -330,7 +332,7 @@ module.exports = class SystemView extends ReadyResource {
if (isIndexer) break
}

if (isIndexer) this._updateIndexer(key, null, true, 0)
if (isIndexer) this._updateIndexer(key, null, false, 0)

let wasTracked = false

Expand Down
1 change: 0 additions & 1 deletion lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ module.exports = class Writer extends ReadyResource {
this.range = null
this.nodes = new NodeBuffer(length)
this.node = null
this.isIndexer = false
this.isActiveIndexer = false
this.available = length
this.length = length
Expand Down
50 changes: 48 additions & 2 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ test('basic - promote writer to indexer', async t => {
t.is(b.linearizer.indexers.length, 1)

t.absent(b.isIndexer)
t.absent(b.isActiveIndexer)
t.absent(b.localWriter.isActiveIndexer)

await b.append(null)
await replicateAndSync([a, b])
Expand All @@ -1457,7 +1457,53 @@ test('basic - promote writer to indexer', async t => {
await t.execution(event)
t.is(b.linearizer.indexers.length, 2)
t.ok(b.isIndexer)
t.ok(b.isActiveIndexer)
t.ok(b.localWriter.isActiveIndexer)
})

test('basic - demote indexer to writer', async t => {
t.plan(13)

const { bases } = await create(2, t)

const [a, b] = bases

// add writer
await addWriterAndSync(a, b)

t.is(a.linearizer.indexers.length, 2)
t.is(b.linearizer.indexers.length, 2)

t.ok(b.isIndexer)
t.ok(b.localWriter.isActiveIndexer)

await b.append('message')
await confirm([a, b])

t.is(a.view.indexedLength, 1)
t.is(b.view.indexedLength, 1)

const event = new Promise(resolve => b.on('is-non-indexer', resolve))

// demote writer
await addWriter(a, b, false)

await confirm([a, b])

t.is(a.linearizer.indexers.length, 1)
t.is(b.linearizer.indexers.length, 1)

await replicateAndSync([a, b])

await t.execution(event)

t.is(b.linearizer.indexers.length, 1)
t.absent(b.isIndexer)
t.absent(b.localWriter.isActiveIndexer)

// flush active writer set
await a.append(null)

t.is(a.activeWriters.size, 1)
})

test('basic - add new indexer after removing', async t => {
Expand Down
46 changes: 28 additions & 18 deletions test/fast-forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -895,51 +895,61 @@ test('fast-forward - writer removed', async t => {
})

test('fast-forward - is indexer set correctly', async t => {
t.plan(9)
t.plan(11)

const { bases } = await create(3, t, {
const { bases } = await create(4, t, {
fastForward: true,
storage: () => tmpDir(t)
})

const [a, b, c] = bases
const [a, b, c, d] = bases

for (let i = 0; i < 200; i++) {
await a.append('a' + i)
}

// add writer
await addWriter(a, b, false)
await replicateAndSync([a, b])
await addWriterAndSync(a, b)
await replicateAndSync([a, b, c, d])

await b.append(null)
await replicateAndSync([a, b])
await addWriter(a, c, false)
await replicateAndSync([a, c])

await c.append(null)
await replicateAndSync([a, b, c, d])

// promote writer
await addWriter(a, b, true)
await addWriter(a, c, true)
await confirm([a, b])

t.is(a.linearizer.indexers.length, 2)
t.is(b.linearizer.indexers.length, 1)
t.is(a.linearizer.indexers.length, 3)
t.is(b.linearizer.indexers.length, 3)
t.is(c.linearizer.indexers.length, 2)

t.absent(b.isIndexer)
t.absent(b.isActiveIndexer)
t.absent(c.isIndexer)
t.absent(c.localWriter.isActiveIndexer)

for (let i = 200; i < 400; i++) {
await a.append('a' + i)
}

// c has ff'd past addWriter
await replicateAndSync([a, c])
await confirm([a, b])

await replicateAndSync([a, d])

t.is(a.view.getBackingCore().session.length, 400)

t.is(c.linearizer.indexers.length, 2)

const event = new Promise(resolve => b.on('is-indexer', resolve))
const event = new Promise(resolve => c.on('is-indexer', resolve))

await replicateAndSync([b, c])
await replicateAndSync([c, d])

t.is(c.linearizer.indexers.length, 3)
t.ok(c.isIndexer)
t.ok(c.localWriter.isActiveIndexer)

t.is(b.linearizer.indexers.length, 2)
t.ok(b.isIndexer)
t.ok(b.isActiveIndexer)
await t.execution(event)
})

Expand Down