diff --git a/proto/extensions.proto b/proto/extensions.proto index 7bab5f61e..344b334b4 100644 --- a/proto/extensions.proto +++ b/proto/extensions.proto @@ -1,7 +1,13 @@ syntax = "proto3"; message ProjectExtension { - repeated bytes authCoreKeys = 1; + repeated bytes wantCoreKeys = 1; + + repeated bytes authCoreKeys = 2; + repeated bytes configCoreKeys = 3; + repeated bytes dataCoreKeys = 4; + repeated bytes blobIndexCoreKeys = 5; + repeated bytes blobCoreKeys = 6; } message HaveExtension { diff --git a/src/capabilities.js b/src/capabilities.js index 8b683c95c..ec6027a74 100644 --- a/src/capabilities.js +++ b/src/capabilities.js @@ -1,6 +1,7 @@ import { currentSchemaVersions } from '@mapeo/schema' import mapObject from 'map-obj' -import { kCreateWithDocId } from './datatype/index.js' +import { kCreateWithDocId, kDataStore } from './datatype/index.js' +import { TypedEmitter } from 'tiny-typed-emitter' // Randomly generated 8-byte encoded as hex export const COORDINATOR_ROLE_ID = 'f7c150f5a3a9a855' @@ -140,7 +141,15 @@ export const DEFAULT_CAPABILITIES = { }, } -export class Capabilities { +/** + * @typedef {object} CapabilitiesEvents + * @property {(docs: import('@mapeo/schema').Role[]) => void} update - Emitted when new role records are indexed + */ + +/** + * @extends {TypedEmitter} + */ +export class Capabilities extends TypedEmitter { #dataType #coreOwnership #coreManager @@ -165,11 +174,14 @@ export class Capabilities { * @param {Buffer} opts.deviceKey public key of this device */ constructor({ dataType, coreOwnership, coreManager, projectKey, deviceKey }) { + super() this.#dataType = dataType this.#coreOwnership = coreOwnership this.#coreManager = coreManager this.#projectCreatorAuthCoreId = projectKey.toString('hex') this.#ownDeviceId = deviceKey.toString('hex') + + dataType[kDataStore].on('role', this.emit.bind(this, 'update')) } /** diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 431ec882c..788c6fccb 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -349,24 +349,12 @@ export class CoreManager extends TypedEmitter { */ #sendCoreKeys(peer, namespaces) { const message = ProjectExtension.create() - let hasKeys = false for (const ns of namespaces) { - for (const { core, key } of this.getCores(ns)) { - let peerHasKeyAlready = false - for (const peer of core.peers) { - if (peer.remotePublicKey.equals(peer.remotePublicKey)) { - peerHasKeyAlready = true - break - } - } - if (peerHasKeyAlready) continue - message.authCoreKeys.push(key) - hasKeys = true + for (const { key } of this.getCores(ns)) { + message[`${ns}CoreKeys`].push(key) } } - if (hasKeys) { - this.#projectExtension.send(message, peer) - } + this.#projectExtension.send(message, peer) } /** diff --git a/src/core-ownership.js b/src/core-ownership.js index 49f708820..415308919 100644 --- a/src/core-ownership.js +++ b/src/core-ownership.js @@ -4,17 +4,31 @@ import { parseVersionId } from '@mapeo/schema' import { defaultGetWinner } from '@mapeo/sqlite-indexer' import assert from 'node:assert' import sodium from 'sodium-universal' -import { kTable, kSelect, kCreateWithDocId } from './datatype/index.js' +import { + kTable, + kSelect, + kCreateWithDocId, + kDataStore, +} from './datatype/index.js' import { eq, or } from 'drizzle-orm' import mapObject from 'map-obj' import { discoveryKey } from 'hypercore-crypto' import pDefer from 'p-defer' +import { TypedEmitter } from 'tiny-typed-emitter' /** * @typedef {import('./types.js').CoreOwnershipWithSignatures} CoreOwnershipWithSignatures */ -export class CoreOwnership { +/** + * @typedef {object} CoreOwnershipEvents + * @property {(docs: import('@mapeo/schema').CoreOwnership[]) => void} update - Emitted when new coreOwnership records are indexed + */ + +/** + * @extends {TypedEmitter} + */ +export class CoreOwnership extends TypedEmitter { #dataType #ownershipWriteDone /** @@ -31,11 +45,14 @@ export class CoreOwnership { * @param {import('./types.js').KeyPair} opts.identityKeypair */ constructor({ dataType, coreKeypairs, identityKeypair }) { + super() this.#dataType = dataType - const authWriterCore = dataType.writerCore + const authWriterCore = dataType[kDataStore].writerCore const deferred = pDefer() this.#ownershipWriteDone = deferred.promise + dataType[kDataStore].on('coreOwnership', this.emit.bind(this, 'update')) + const writeOwnership = () => { if (authWriterCore.length > 0) { deferred.resolve() @@ -49,7 +66,7 @@ export class CoreOwnership { if (authWriterCore.opened) { writeOwnership() } else { - authWriterCore.on('ready', writeOwnership) + authWriterCore.once('ready', writeOwnership) } } diff --git a/src/datastore/index.js b/src/datastore/index.js index 68e96dc36..3f5dbc1e8 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -1,7 +1,14 @@ -import { encode, decode, getVersionId, parseVersionId } from '@mapeo/schema' +import { + encode, + decode, + getVersionId, + parseVersionId, + decodeBlockPrefix, +} from '@mapeo/schema' import MultiCoreIndexer from 'multi-core-indexer' import pDefer from 'p-defer' import { discoveryKey } from 'hypercore-crypto' +import { TypedEmitter } from 'tiny-typed-emitter' /** * @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc @@ -26,11 +33,19 @@ const NAMESPACE_SCHEMAS = /** @type {const} */ ({ * @typedef {typeof NAMESPACE_SCHEMAS} NamespaceSchemas */ +/** + * @template {MapeoDoc['schemaName']} TSchemaName + * @typedef {{ + * [S in TSchemaName]: (docs: Extract[]) => void + * }} DataStoreEvents + */ + /** * @template {keyof NamespaceSchemas} [TNamespace=keyof NamespaceSchemas] * @template {NamespaceSchemas[TNamespace][number]} [TSchemaName=NamespaceSchemas[TNamespace][number]] + * @extends {TypedEmitter>} */ -export class DataStore { +export class DataStore extends TypedEmitter { #coreManager #namespace #batch @@ -40,6 +55,7 @@ export class DataStore { #pendingIndex = new Map() /** @type {Set['promise']>} */ #pendingAppends = new Set() + #emitUpdates /** * @param {object} opts @@ -47,11 +63,14 @@ export class DataStore { * @param {TNamespace} opts.namespace * @param {(entries: MultiCoreIndexer.Entry<'binary'>[]) => Promise} opts.batch * @param {MultiCoreIndexer.StorageParam} opts.storage + * @param {TSchemaName[]} [opts.emitUpdates] - List of schemas to emit updates for. Emitting an update is expensive because it requires decoding an entry, so should only be done schemas that will not have many documents in the database */ - constructor({ coreManager, namespace, batch, storage }) { + constructor({ coreManager, namespace, batch, storage, emitUpdates }) { + super() this.#coreManager = coreManager this.#namespace = namespace this.#batch = batch + this.#emitUpdates = emitUpdates && new Set(emitUpdates) this.#writerCore = coreManager.getWriterCore(namespace).core const cores = coreManager.getCores(namespace).map((cr) => cr.core) this.#coreIndexer = new MultiCoreIndexer(cores, { @@ -92,18 +111,45 @@ export class DataStore { async #handleEntries(entries) { await this.#batch(entries) await Promise.all(this.#pendingAppends) + const toEmit = + /** @type {{ [S in TSchemaName]: Array> }} */ + ({}) // Writes to the writerCore need to wait until the entry is indexed before // returning, so we check if any incoming entry has a pending promise for (const entry of entries) { - if (!entry.key.equals(this.#writerCore.key)) continue - const versionId = getVersionId({ + const versionObj = { coreDiscoveryKey: discoveryKey(entry.key), index: entry.index, - }) + } + // We do this here rather than in IndexWriter (which is already decoding + // the entries and could return decoded entries from the batch function or + // emit events itself) because IndexWriter will eventually be in a worker + // thread, and my assumption is that sending a decoded doc over a + // MessageChannel from the worker is more expensive than decoding the + // entry here. This also avoids setting up RPC calls with the worker. + if (this.#emitUpdates) { + try { + const { schemaName } = decodeBlockPrefix(entry.block) + // @ts-ignore + if (!this.#emitUpdates.has(schemaName)) return + // @ts-ignore + toEmit[schemaName] = toEmit[schemaName] || [] + // @ts-ignore + toEmit[schemaName].push(decode(entry.block, versionObj)) + } catch (e) { + // Ignore docs we can't decode + } + } + if (!entry.key.equals(this.#writerCore.key)) continue + const versionId = getVersionId(versionObj) const pending = this.#pendingIndex.get(versionId) if (!pending) continue pending.resolve() } + for (const [schemaName, docs] of Object.entries(toEmit)) { + // @ts-ignore + this.emit(schemaName, docs) + } } /** diff --git a/src/datatype/index.d.ts b/src/datatype/index.d.ts index 9397857b8..cf01a07b7 100644 --- a/src/datatype/index.d.ts +++ b/src/datatype/index.d.ts @@ -29,6 +29,7 @@ type MapeoDocTablesMap = { export const kCreateWithDocId: unique symbol export const kSelect: unique symbol export const kTable: unique symbol +export const kDataStore: unique symbol type OmitUnion = T extends any ? Omit : never type ExcludeSchema< @@ -60,7 +61,7 @@ export class DataType< get [kTable](): TTable - get writerCore(): Hypercore<'binary', Buffer> + get [kDataStore](): TDataStore [kCreateWithDocId]( docId: string, diff --git a/src/datatype/index.js b/src/datatype/index.js index c072a1f41..80b898eb5 100644 --- a/src/datatype/index.js +++ b/src/datatype/index.js @@ -47,6 +47,7 @@ function generateDate() { export const kCreateWithDocId = Symbol('kCreateWithDocId') export const kSelect = Symbol('select') export const kTable = Symbol('table') +export const kDataStore = Symbol('dataStore') /** * @template {import('../datastore/index.js').DataStore} TDataStore @@ -91,8 +92,8 @@ export class DataType { return this.#table } - get writerCore() { - return this.#dataStore.writerCore + get [kDataStore]() { + return this.#dataStore } /** diff --git a/src/generated/extensions.d.ts b/src/generated/extensions.d.ts index da0ed8aec..43d4b4c46 100644 --- a/src/generated/extensions.d.ts +++ b/src/generated/extensions.d.ts @@ -1,7 +1,12 @@ /// import _m0 from "protobufjs/minimal.js"; export interface ProjectExtension { + wantCoreKeys: Buffer[]; authCoreKeys: Buffer[]; + configCoreKeys: Buffer[]; + dataCoreKeys: Buffer[]; + blobIndexCoreKeys: Buffer[]; + blobCoreKeys: Buffer[]; } export interface HaveExtension { discoveryKey: Buffer; @@ -24,15 +29,35 @@ export declare const ProjectExtension: { encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer; decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension; create]: never; }; - } & { [K_1 in Exclude]: never; }>(base?: I): ProjectExtension; + wantCoreKeys?: Buffer[] & Buffer[] & { [K in Exclude]: never; }; + authCoreKeys?: Buffer[] & Buffer[] & { [K_1 in Exclude]: never; }; + configCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude]: never; }; + dataCoreKeys?: Buffer[] & Buffer[] & { [K_3 in Exclude]: never; }; + blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_4 in Exclude]: never; }; + blobCoreKeys?: Buffer[] & Buffer[] & { [K_5 in Exclude]: never; }; + } & { [K_6 in Exclude]: never; }>(base?: I): ProjectExtension; fromPartial]: never; }; - } & { [K_3 in Exclude]: never; }>(object: I_1): ProjectExtension; + wantCoreKeys?: Buffer[] & Buffer[] & { [K_7 in Exclude]: never; }; + authCoreKeys?: Buffer[] & Buffer[] & { [K_8 in Exclude]: never; }; + configCoreKeys?: Buffer[] & Buffer[] & { [K_9 in Exclude]: never; }; + dataCoreKeys?: Buffer[] & Buffer[] & { [K_10 in Exclude]: never; }; + blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_11 in Exclude]: never; }; + blobCoreKeys?: Buffer[] & Buffer[] & { [K_12 in Exclude]: never; }; + } & { [K_13 in Exclude]: never; }>(object: I_1): ProjectExtension; }; export declare const HaveExtension: { encode(message: HaveExtension, writer?: _m0.Writer): _m0.Writer; diff --git a/src/generated/extensions.js b/src/generated/extensions.js index 7fe53088e..741d330dd 100644 --- a/src/generated/extensions.js +++ b/src/generated/extensions.js @@ -50,17 +50,44 @@ export function haveExtension_NamespaceToNumber(object) { } } function createBaseProjectExtension() { - return { authCoreKeys: [] } + return { + wantCoreKeys: [], + authCoreKeys: [], + configCoreKeys: [], + dataCoreKeys: [], + blobIndexCoreKeys: [], + blobCoreKeys: [], + } } export var ProjectExtension = { encode: function (message, writer) { if (writer === void 0) { writer = _m0.Writer.create() } - for (var _i = 0, _a = message.authCoreKeys; _i < _a.length; _i++) { + for (var _i = 0, _a = message.wantCoreKeys; _i < _a.length; _i++) { var v = _a[_i] writer.uint32(10).bytes(v) } + for (var _b = 0, _c = message.authCoreKeys; _b < _c.length; _b++) { + var v = _c[_b] + writer.uint32(18).bytes(v) + } + for (var _d = 0, _e = message.configCoreKeys; _d < _e.length; _d++) { + var v = _e[_d] + writer.uint32(26).bytes(v) + } + for (var _f = 0, _g = message.dataCoreKeys; _f < _g.length; _f++) { + var v = _g[_f] + writer.uint32(34).bytes(v) + } + for (var _h = 0, _j = message.blobIndexCoreKeys; _h < _j.length; _h++) { + var v = _j[_h] + writer.uint32(42).bytes(v) + } + for (var _k = 0, _l = message.blobCoreKeys; _k < _l.length; _k++) { + var v = _l[_k] + writer.uint32(50).bytes(v) + } return writer }, decode: function (input, length) { @@ -74,8 +101,38 @@ export var ProjectExtension = { if (tag !== 10) { break } + message.wantCoreKeys.push(reader.bytes()) + continue + case 2: + if (tag !== 18) { + break + } message.authCoreKeys.push(reader.bytes()) continue + case 3: + if (tag !== 26) { + break + } + message.configCoreKeys.push(reader.bytes()) + continue + case 4: + if (tag !== 34) { + break + } + message.dataCoreKeys.push(reader.bytes()) + continue + case 5: + if (tag !== 42) { + break + } + message.blobIndexCoreKeys.push(reader.bytes()) + continue + case 6: + if (tag !== 50) { + break + } + message.blobCoreKeys.push(reader.bytes()) + continue } if ((tag & 7) === 4 || tag === 0) { break @@ -90,14 +147,44 @@ export var ProjectExtension = { ) }, fromPartial: function (object) { - var _a + var _a, _b, _c, _d, _e, _f var message = createBaseProjectExtension() - message.authCoreKeys = - ((_a = object.authCoreKeys) === null || _a === void 0 + message.wantCoreKeys = + ((_a = object.wantCoreKeys) === null || _a === void 0 ? void 0 : _a.map(function (e) { return e })) || [] + message.authCoreKeys = + ((_b = object.authCoreKeys) === null || _b === void 0 + ? void 0 + : _b.map(function (e) { + return e + })) || [] + message.configCoreKeys = + ((_c = object.configCoreKeys) === null || _c === void 0 + ? void 0 + : _c.map(function (e) { + return e + })) || [] + message.dataCoreKeys = + ((_d = object.dataCoreKeys) === null || _d === void 0 + ? void 0 + : _d.map(function (e) { + return e + })) || [] + message.blobIndexCoreKeys = + ((_e = object.blobIndexCoreKeys) === null || _e === void 0 + ? void 0 + : _e.map(function (e) { + return e + })) || [] + message.blobCoreKeys = + ((_f = object.blobCoreKeys) === null || _f === void 0 + ? void 0 + : _f.map(function (e) { + return e + })) || [] return message }, } diff --git a/src/generated/extensions.ts b/src/generated/extensions.ts index eddb7efeb..6c3d45a00 100644 --- a/src/generated/extensions.ts +++ b/src/generated/extensions.ts @@ -3,7 +3,12 @@ import Long from "long"; import _m0 from "protobufjs/minimal.js"; export interface ProjectExtension { + wantCoreKeys: Buffer[]; authCoreKeys: Buffer[]; + configCoreKeys: Buffer[]; + dataCoreKeys: Buffer[]; + blobIndexCoreKeys: Buffer[]; + blobCoreKeys: Buffer[]; } export interface HaveExtension { @@ -67,14 +72,36 @@ export function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace) } function createBaseProjectExtension(): ProjectExtension { - return { authCoreKeys: [] }; + return { + wantCoreKeys: [], + authCoreKeys: [], + configCoreKeys: [], + dataCoreKeys: [], + blobIndexCoreKeys: [], + blobCoreKeys: [], + }; } export const ProjectExtension = { encode(message: ProjectExtension, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - for (const v of message.authCoreKeys) { + for (const v of message.wantCoreKeys) { writer.uint32(10).bytes(v!); } + for (const v of message.authCoreKeys) { + writer.uint32(18).bytes(v!); + } + for (const v of message.configCoreKeys) { + writer.uint32(26).bytes(v!); + } + for (const v of message.dataCoreKeys) { + writer.uint32(34).bytes(v!); + } + for (const v of message.blobIndexCoreKeys) { + writer.uint32(42).bytes(v!); + } + for (const v of message.blobCoreKeys) { + writer.uint32(50).bytes(v!); + } return writer; }, @@ -90,8 +117,43 @@ export const ProjectExtension = { break; } + message.wantCoreKeys.push(reader.bytes() as Buffer); + continue; + case 2: + if (tag !== 18) { + break; + } + message.authCoreKeys.push(reader.bytes() as Buffer); continue; + case 3: + if (tag !== 26) { + break; + } + + message.configCoreKeys.push(reader.bytes() as Buffer); + continue; + case 4: + if (tag !== 34) { + break; + } + + message.dataCoreKeys.push(reader.bytes() as Buffer); + continue; + case 5: + if (tag !== 42) { + break; + } + + message.blobIndexCoreKeys.push(reader.bytes() as Buffer); + continue; + case 6: + if (tag !== 50) { + break; + } + + message.blobCoreKeys.push(reader.bytes() as Buffer); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -106,7 +168,12 @@ export const ProjectExtension = { }, fromPartial, I>>(object: I): ProjectExtension { const message = createBaseProjectExtension(); + message.wantCoreKeys = object.wantCoreKeys?.map((e) => e) || []; message.authCoreKeys = object.authCoreKeys?.map((e) => e) || []; + message.configCoreKeys = object.configCoreKeys?.map((e) => e) || []; + message.dataCoreKeys = object.dataCoreKeys?.map((e) => e) || []; + message.blobIndexCoreKeys = object.blobIndexCoreKeys?.map((e) => e) || []; + message.blobCoreKeys = object.blobCoreKeys?.map((e) => e) || []; return message; }, }; diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index 87078d9dc..6b82e8326 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -1,4 +1,3 @@ -import mapObject from 'map-obj' import { NAMESPACES } from '../core-manager/index.js' import { Logger } from '../logger.js' @@ -12,10 +11,8 @@ import { Logger } from '../logger.js' /** @type {Namespace[]} */ export const PRESYNC_NAMESPACES = ['auth', 'config', 'blobIndex'] -// After at least one auth core is replicating, wait this long before sending haves to allow other auth cores to see if -const SEND_KEYS_WAIT_MS = 1000 - export class PeerSyncController { + /** @type {Set>} */ #replicatingCores = new Set() /** @type {Set} */ #enabledNamespaces = new Set() @@ -25,17 +22,10 @@ export class PeerSyncController { /** @type {Record} */ #syncCapability = createNamespaceMap('unknown') #isDataSyncEnabled = false - /** @type {Record} */ - #prevLocalState = createNamespaceMap(null) - /** @type {SyncStatus} */ - #syncStatus = createNamespaceMap('unknown') - /** @type {SyncStatus} */ - #prevSyncStatus = createNamespaceMap('unknown') - /** @type {NodeJS.Timeout | undefined } */ - #sentKeysTimeoutId #hasSentHaves = createNamespaceMap(false) #log #syncState + #presyncDone = false /** * @param {object} opts @@ -66,11 +56,24 @@ export class PeerSyncController { this.#capabilities = capabilities this.#syncState = syncState - // Always need to replicate the project creator core - this.#replicateCore(coreManager.creatorCore) + // The creator core is replicating before this instance is created + this.#replicatingCores = new Set([coreManager.creatorCore]) + + // A PeerSyncController instance is only created once the creator cores are + // replicating, which imeans that the peer has the project key, so now we + // can send all the auth core keys. + // + // We could reduce network traffic by delaying sending this until we see + // which keys the peer already has, so that we only send the keys they are + // missing. However the network traffic cost of sending keys is low (it's 8 + // bytes * number of devices in a project) vs. the delay in sync e.g. if the + // delay is more than the time it takes to share the keys, it's not worth + // it. + coreManager.sendAuthCoreKeys(creatorCorePeer) coreManager.on('add-core', this.#handleAddCore) syncState.on('state', this.#handleStateChange) + capabilities.on('update', this.#handleCapabilitiesUpdate) this.#updateEnabledNamespaces() } @@ -126,11 +129,9 @@ export class PeerSyncController { } destroy() { - if (this.#sentKeysTimeoutId) { - clearTimeout(this.#sentKeysTimeoutId) - } this.#coreManager.off('add-core', this.#handleAddCore) this.#syncState.off('state', this.#handleStateChange) + this.#capabilities.off('update', this.#handleCapabilitiesUpdate) } /** @@ -151,54 +152,44 @@ export class PeerSyncController { * @param {import("./sync-state.js").State} state */ #handleStateChange = async (state) => { - // Once we are replicating with at least one auth core, wait a bit for other - // auth cores to replicate, then send the keys for any missing auth cores to - // the peer. - if ( - !this.#sentKeysTimeoutId && - Object.keys(state.auth.remoteStates).length > 0 - ) { - this.#sentKeysTimeoutId = setTimeout(() => { - this.#coreManager.sendAuthCoreKeys(this.#creatorCorePeer) - }, SEND_KEYS_WAIT_MS) - } - this.#syncStatus = getSyncStatus(this.peerId, state) - const localState = mapObject(state, (ns, nsState) => { - return [ns, nsState.localState] + if (this.#presyncDone) return + const syncStatus = getSyncStatus(this.peerId, state) + this.#presyncDone = PRESYNC_NAMESPACES.every((ns) => { + return syncStatus[ns] === 'synced' }) - this.#log('state %X', state) - - // Map of which namespaces have received new data since last sync change - const didUpdate = mapObject(state, (ns) => { - const nsDidSync = - this.#prevSyncStatus[ns] !== 'synced' && - this.#syncStatus[ns] === 'synced' - const prevNsState = this.#prevLocalState[ns] - const nsDidUpdate = - nsDidSync && - (prevNsState === null || prevNsState.have !== localState[ns].have) - if (nsDidUpdate) { - this.#prevLocalState[ns] = localState[ns] - } - return [ns, nsDidUpdate] - }) - this.#prevSyncStatus = this.#syncStatus + if (!this.#presyncDone) return + this.#log('Pre-sync done') + // Once pre-sync is done, if data sync is enabled and the peer has the + // correct capabilities, then we will enable sync of data namespaces + this.#updateEnabledNamespaces() + } - if (didUpdate.auth) { - try { - const cap = await this.#capabilities.getCapabilities(this.peerId) - this.#syncCapability = cap.sync - } catch (e) { - this.#log('Error reading capability', e) - // Any error, consider sync unknown - this.#syncCapability = createNamespaceMap('unknown') - } + /** + * Handler for capabilities being updated. If they have changed for this peer + * then we update enabled namespaces and send pre-haves for any namespaces + * authorized for sync + * + * @param {import('@mapeo/schema').Role[]} docs + */ + #handleCapabilitiesUpdate = async (docs) => { + const peerRoleUpdated = docs.some((doc) => doc.docId === this.peerId) + if (!peerRoleUpdated) return + const prevSyncCapability = this.#syncCapability + try { + const cap = await this.#capabilities.getCapabilities(this.peerId) + this.#syncCapability = cap.sync + } catch (e) { + this.#log('Error reading capability', e) + // Any error, consider sync unknown + this.#syncCapability = createNamespaceMap('unknown') } - this.#log('capability %o', this.#syncCapability) - - // Stop here if no updates - if (Object.values(didUpdate).indexOf(true) === -1) return + const syncCapabilityChanged = !shallowEqual( + prevSyncCapability, + this.#syncCapability + ) + if (!syncCapabilityChanged) return + this.#log('Sync capability changed %o', this.#syncCapability) this.#updateEnabledNamespaces() // Send pre-haves for any namespaces that the peer is allowed to sync @@ -207,6 +198,7 @@ export class PeerSyncController { if (this.#hasSentHaves[ns]) continue if (this.#syncCapability[ns] !== 'allowed') continue this.#coreManager.sendHaves(this.#creatorCorePeer, ns) + this.#log('Sent pre-haves for %s', ns) this.#hasSentHaves[ns] = true } } @@ -229,14 +221,9 @@ export class PeerSyncController { } else if (cap === 'allowed') { if (PRESYNC_NAMESPACES.includes(ns)) { this.#enableNamespace(ns) - } else if (this.#isDataSyncEnabled) { - const arePresyncNamespacesSynced = PRESYNC_NAMESPACES.every( - (ns) => this.#syncStatus[ns] === 'synced' - ) + } else if (this.#isDataSyncEnabled && this.#presyncDone) { // Only enable data namespaces once the pre-sync namespaces have synced - if (arePresyncNamespacesSynced) { - this.#enableNamespace(ns) - } + this.#enableNamespace(ns) } else { this.#disableNamespace(ns) } @@ -341,3 +328,17 @@ function createNamespaceMap(value) { } return map } + +/** + * Very naive shallow equal, but all we need for comparing sync capabilities + * + * @param {Record} a + * @param {Record} b + * @returns + */ +function shallowEqual(a, b) { + for (const key of Object.keys(a)) { + if (a[key] !== b[key]) return false + } + return true +} diff --git a/src/sync/sync-api.js b/src/sync/sync-api.js index 977be7234..045f65713 100644 --- a/src/sync/sync-api.js +++ b/src/sync/sync-api.js @@ -6,7 +6,6 @@ import { } from './peer-sync-controller.js' import { Logger } from '../logger.js' import { NAMESPACES } from '../core-manager/index.js' -import { keyToId } from '../utils.js' export const kHandleDiscoveryKey = Symbol('handle discovery key') @@ -46,7 +45,6 @@ export class SyncApi extends TypedEmitter { * @param {import('../core-manager/index.js').CoreManager} opts.coreManager * @param {import("../capabilities.js").Capabilities} opts.capabilities * @param {import('../core-ownership.js').CoreOwnership} opts.coreOwnership - * @param {import('../index-writer/index.js').IndexWriter} opts.indexWriter * @param {number} [opts.throttleMs] * @param {Logger} [opts.logger] */ @@ -54,7 +52,6 @@ export class SyncApi extends TypedEmitter { coreManager, capabilities, coreOwnership, - indexWriter, throttleMs = 200, logger, }) { @@ -69,11 +66,8 @@ export class SyncApi extends TypedEmitter { this.#coreManager.creatorCore.on('peer-add', this.#handlePeerAdd) this.#coreManager.creatorCore.on('peer-remove', this.#handlePeerRemove) - indexWriter.subscribe({ schemaName: 'role' }, this.#handleRoleUpdate) - indexWriter.subscribe( - { schemaName: 'coreOwnership' }, - this.#handleCoreOwnershipUpdate - ) + capabilities.on('update', this.#handleRoleUpdate) + coreOwnership.on('update', this.#handleCoreOwnershipUpdate) } /** @type {import('../local-peers.js').LocalPeersEvents['discovery-key']} */ @@ -175,7 +169,7 @@ export class SyncApi extends TypedEmitter { logger: this.#l, }) this.#peerSyncControllers.set(protomux, peerSyncController) - if (peerSyncController.peerId) this.#peerIds.add(peerSyncController.peerId) + this.#peerIds.add(peerSyncController.peerId) if (this.#dataSyncEnabled.has('local')) { peerSyncController.enableDataSync() @@ -210,7 +204,7 @@ export class SyncApi extends TypedEmitter { } psc.destroy() this.#peerSyncControllers.delete(protomux) - this.#peerIds.delete(keyToId(peer.remotePublicKey)) + this.#peerIds.delete(psc.peerId) this.#pendingDiscoveryKeys.delete(protomux) }