Skip to content

Commit

Permalink
Heads and indexers are only active cores (#180)
Browse files Browse the repository at this point in the history
* only keep heads in indexers as active cores

* also wakeup writer deps when encountered

* forgot to remove this line
  • Loading branch information
mafintosh authored Sep 20, 2024
1 parent 1f65718 commit 7316d77
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 18 deletions.
64 changes: 46 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ module.exports = class Autobase extends ReadyResource {
this.fastForwardTo = handlers.fastForward
}

this._bootstrapWriters = [] // might contain dups, but thats ok
this._bootstrapWritersChanged = false

this._checkWriters = []
this._appending = null
this._wakeup = new AutoWakeup(this)
Expand Down Expand Up @@ -375,6 +378,24 @@ module.exports = class Autobase extends ReadyResource {
this._coupler = new CoreCoupler(core.session, this._wakeupPeerBound)
}

_updateBootstrapWriters () {
const writers = this.linearizer.getBootstrapWriters()

// first clear all, but without applying it for churn reasons
for (const writer of this._bootstrapWriters) writer.isBootstrap = false

// all passed are bootstraps
for (const writer of writers) writer.setBootstrap(true)

// reset activity on old ones, all should be in sync now
for (const writer of this._bootstrapWriters) {
if (writer.isBootstrap === false) writer.setBootstrap(false)
}

this._bootstrapWriters = writers
this._bootstrapWritersChanged = false
}

async _openPreBump () {
this._presystem = this._openPreSystem()

Expand Down Expand Up @@ -503,6 +524,7 @@ module.exports = class Autobase extends ReadyResource {
if (this.reindexing) this._setReindexed()

this.queueFastForward()
this._updateBootstrapWriters()
}

async _catchup (nodes) {
Expand Down Expand Up @@ -654,23 +676,13 @@ module.exports = class Autobase extends ReadyResource {
while (this._checkWriters.length > 0) {
const w = this._checkWriters.pop()

if (!w.flushed()) {
w.core.setActive(true)
continue
}
if (!w.flushed()) continue

const unqueued = this._wakeup.unqueue(w.core.key, w.core.length)
this._coupler.remove(w.core)

if (!unqueued || w.isActiveIndexer) {
w.core.setActive(true)
continue
}

if (this.localWriter === w) {
this.localWriter.core.setActive(false)
continue
}
if (!unqueued || w.isActiveIndexer) continue
if (this.localWriter === w) continue

await this._closeWriter(w, false)
}
Expand Down Expand Up @@ -991,7 +1003,7 @@ module.exports = class Autobase extends ReadyResource {

const core = local
? this.local.session({ valueEncoding: messages.OplogMessage, encryptionKey: this.encryptionKey, active: false })
: this.store.get({ key, compat: false, writable: false, valueEncoding: messages.OplogMessage, encryptionKey: this.encryptionKey })
: this.store.get({ key, compat: false, writable: false, valueEncoding: messages.OplogMessage, encryptionKey: this.encryptionKey, active: false })

return core
}
Expand Down Expand Up @@ -1345,19 +1357,31 @@ module.exports = class Autobase extends ReadyResource {
this.system.sendWakeup(peer.remotePublicKey)
}

async _wakeupWriter (key) {
this._ensureWakeup(await this._getWriterByKey(key, -1, 0, true, false, null))
}

// ensure wakeup on an existing writer (the writer calls this in addition to above)
_ensureWakeup (w) {
if (w === null || w.isBootstrap === true) return
w.setBootstrap(true) // even if turn false at end of drain, hypercore makes them linger a bit so no churn
this._bootstrapWriters.push(w)
this._bootstrapWritersChanged = true
}

async _drainWakeup () { // TODO: parallel load the writers here later
if (this._needsWakeup === true) {
this._needsWakeup = false

for (const { key } of this._wakeup) {
await this._getWriterByKey(key, -1, 0, true, false, null)
await this._wakeupWriter(key)
}

if (this._needsWakeupHeads === true) {
this._needsWakeupHeads = false

for (const { key } of await this._getLocallyStoredHeads()) {
await this._getWriterByKey(key, -1, 0, true, false, null)
await this._wakeupWriter(key)
}
}
}
Expand All @@ -1369,7 +1393,7 @@ module.exports = class Autobase extends ReadyResource {
if (info && length < info.length) continue // stale hint
}

await this._getWriterByKey(key, -1, 0, true, false, null)
await this._wakeupWriter(key)
}

this._wakeupHints.clear()
Expand Down Expand Up @@ -1404,6 +1428,11 @@ module.exports = class Autobase extends ReadyResource {
else this.ack()
}

// keep bootstraps in sync with linearizer
if (this.updating === true || this._bootstrapWritersChanged === true) {
this._updateBootstrapWriters()
}

if (this.updating === true) {
this.updating = false
this.emit('update')
Expand Down Expand Up @@ -2335,7 +2364,6 @@ module.exports = class Autobase extends ReadyResource {
if (this._addCheckpoints) this._localDigest.pointer++
}

this.localWriter.core.setActive(true)
await this.local.append(blocks)

if (this._addCheckpoints) {
Expand Down
11 changes: 11 additions & 0 deletions lib/linearizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ module.exports = class Linearizer {
return heads
}

// TODO: might contain dups atm, nbd for how we use it, returns an array of writers you can "pull"
// to get the full dag view at any time
getBootstrapWriters () {
const writers = []

for (const head of this.heads) writers.push(head.writer)
for (let i = 0; i < this.consensus.indexers.length; i++) writers.push(this.consensus.indexers[i])

return writers
}

addHead (node) {
node.active()

Expand Down
7 changes: 7 additions & 0 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module.exports = class Writer extends ReadyResource {
this.range = null
this.nodes = new NodeBuffer(length)
this.node = null
this.isBootstrap = false // maintained by updateBootstrapWriters
this.isActiveIndexer = false
this.available = length
this.length = length
Expand All @@ -39,6 +40,11 @@ module.exports = class Writer extends ReadyResource {
this.range = null
}

setBootstrap (bool) {
this.isBootstrap = bool
this.core.setActive(bool)
}

async isInSystem () {
const bootstrapping = this.base.system.core.length === 0 && b4a.equals(this.core.key, this.base.key)

Expand Down Expand Up @@ -358,6 +364,7 @@ module.exports = class Writer extends ReadyResource {
const headWriter = await this.base._getWriterByKey(rawHead.key, -1, rawHead.length, true, false, null)

if (headWriter !== this && (headWriter === null || headWriter.length < rawHead.length)) {
this.base._ensureWakeup(headWriter)
return false
}

Expand Down

0 comments on commit 7316d77

Please sign in to comment.