Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Nov 30, 2023
1 parent faecad2 commit 0f4a0d6
Show file tree
Hide file tree
Showing 13 changed files with 738 additions and 625 deletions.
16 changes: 14 additions & 2 deletions src/capabilities.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { currentSchemaVersions } from '@mapeo/schema'
import mapObject from 'map-obj'
import { kCreateWithDocId } from './datatype/index.js'
import { kCreateWithDocId, kDataStore } from './datatype/index.js'
import { TypedEmitter } from 'tiny-typed-emitter'

// Randomly generated 8-byte encoded as hex
export const COORDINATOR_ROLE_ID = 'f7c150f5a3a9a855'
Expand Down Expand Up @@ -140,7 +141,15 @@ export const DEFAULT_CAPABILITIES = {
},
}

export class Capabilities {
/**
* @typedef {object} CapabilitiesEvents
* @property {(docs: import('@mapeo/schema').Role[]) => void} update - Emitted when new role records are indexed
*/

/**
* @extends {TypedEmitter<CapabilitiesEvents>}
*/
export class Capabilities extends TypedEmitter {
#dataType
#coreOwnership
#coreManager
Expand All @@ -165,11 +174,14 @@ export class Capabilities {
* @param {Buffer} opts.deviceKey public key of this device
*/
constructor({ dataType, coreOwnership, coreManager, projectKey, deviceKey }) {
super()
this.#dataType = dataType
this.#coreOwnership = coreOwnership
this.#coreManager = coreManager
this.#projectCreatorAuthCoreId = projectKey.toString('hex')
this.#ownDeviceId = deviceKey.toString('hex')

dataType[kDataStore].on('role', this.emit.bind(this, 'update'))
}

/**
Expand Down
222 changes: 37 additions & 185 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import { CoreIndex } from './core-index.js'
import * as rle from './bitfield-rle.js'
import { Logger } from '../logger.js'
import { keyToId } from '../utils.js'
import { discoveryKey } from 'hypercore-crypto'
import Hypercore from 'hypercore'

export const kCoreManagerReplicate = Symbol('replicate core manager')
// WARNING: Changing these will break things for existing apps, since namespaces
Expand Down Expand Up @@ -55,12 +52,6 @@ export class CoreManager extends TypedEmitter {
#haveExtension
#deviceId
#l
/**
* We use this to reduce network traffic caused by requesting the same key
* from multiple clients.
* TODO: Remove items from this set after a max age
*/
#keyRequests = new TrackedKeyRequests()
#autoDownload

static get namespaces() {
Expand Down Expand Up @@ -155,8 +146,8 @@ export class CoreManager extends TypedEmitter {
'mapeo/project',
{
encoding: ProjectExtensionCodec,
onmessage: (msg, peer) => {
this.#handleProjectMessage(msg, peer)
onmessage: (msg) => {
this.#handleProjectMessage(msg)
},
}
)
Expand All @@ -168,16 +159,6 @@ export class CoreManager extends TypedEmitter {
},
})

this.#creatorCore.on('peer-add', (peer) => {
this.#sendHaves(peer)
})
this.#creatorCore.on('peer-remove', (peer) => {
// When a peer is removed we clean up any unanswered key requests, so that
// we will request from a different peer, and to avoid the tracking of key
// requests growing without bounds.
this.#keyRequests.deleteByPeerKey(peer.remotePublicKey)
})

this.#ready = Promise.all(
[...this.#coreIndex].map(({ core }) => core.ready())
)
Expand Down Expand Up @@ -253,7 +234,6 @@ export class CoreManager extends TypedEmitter {
*/
async close() {
this.#state = 'closing'
this.#keyRequests.clear()
const promises = []
for (const { core } of this.#coreIndex) {
promises.push(core.close())
Expand Down Expand Up @@ -342,69 +322,39 @@ export class CoreManager extends TypedEmitter {
}

/**
* Send an extension message over the project creator core replication stream
* requesting a core key for the given discovery key.
*
* @param {Buffer} peerKey
* @param {Buffer} discoveryKey
* @param {ProjectExtension} msg
*/
requestCoreKey(peerKey, discoveryKey) {
// No-op if we already have this core
if (this.getCoreByDiscoveryKey(discoveryKey)) return
const peer = this.#creatorCore.peers.find((peer) => {
return peer.remotePublicKey.equals(peerKey)
})
if (!peer) {
// This should not happen because this is only called from SyncApi, which
// checks the peer exists before calling this method.
this.#l.log(
'Attempted to request core key for %h, but no connected peer %h',
discoveryKey,
peerKey
)
return
#handleProjectMessage({ authCoreKeys }) {
for (const coreKey of authCoreKeys) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(coreKey, 'auth')
}
// Only request a key once, e.g. from the peer we first receive it from (we
// can assume that a peer must have the key if we see the discovery key in
// the protomux). This is necessary to reduce network traffic for many newly
// connected peers - otherwise duplicate requests will be sent to every peer
if (this.#keyRequests.has(discoveryKey)) return
this.#keyRequests.set(discoveryKey, peerKey)
}

this.#l.log(
'Requesting core key for discovery key %h from peer %h',
discoveryKey,
peerKey
)
const message = ProjectExtension.fromPartial({
wantCoreKeys: [discoveryKey],
})
this.#projectExtension.send(message, peer)
/**
* Sends auth core keys to the given peer, skipping any keys that we know the
* peer has already (depends on the peer having already replicated the auth
* cores it has)
*
* @param {any} peer
*/
sendAuthCoreKeys(peer) {
this.#sendCoreKeys(peer, ['auth'])
}

/**
* @param {ProjectExtension} msg
* We only send non-auth core keys to a peer for unit tests
* @param {any} peer
* @param {Readonly<Namespace[]>} namespaces
*/
#handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) {
#sendCoreKeys(peer, namespaces) {
const message = ProjectExtension.create()
let hasKeys = false
for (const discoveryKey of wantCoreKeys) {
const coreRecord = this.getCoreByDiscoveryKey(discoveryKey)
if (!coreRecord) continue
message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key)
hasKeys = true
}
if (hasKeys) {
this.#projectExtension.send(message, peer)
}
for (const namespace of NAMESPACES) {
for (const coreKey of coreKeys[`${namespace}CoreKeys`]) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(coreKey, namespace)
this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey))
for (const ns of namespaces) {
for (const { key } of this.getCores(ns)) {
message[`${ns}CoreKeys`].push(key)
}
}
this.#projectExtension.send(message, peer)
}

/**
Expand All @@ -426,21 +376,17 @@ export class CoreManager extends TypedEmitter {
}

/**
*
* @param {any} peer
* @param {Exclude<Namespace, 'auth'>} namespace
*/
async #sendHaves(peer) {
if (!peer) {
console.warn('sendHaves no peer', peer.remotePublicKey)
// TODO: How to handle this and when does it happen?
return
}
async sendHaves(peer, namespace) {
// We want ready() rather than update() because we are only interested in
// local data. This waits for all cores to be ready.
await this.ready()

peer.protomux.cork()

for (const { core, namespace } of this.#coreIndex) {
// We want ready() rather than update() because we are only interested in local data
await core.ready()
for (const { core } of this.getCores(namespace)) {
if (core.length === 0) continue
const { discoveryKey } = core
// This will always be defined after ready(), but need to let TS know
Expand All @@ -465,18 +411,14 @@ export class CoreManager extends TypedEmitter {
* @returns
*/
[kCoreManagerReplicate](stream) {
const protocolStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: async (discoveryKey) => {
const peer = await findPeer(
this.creatorCore,
// @ts-ignore
protocolStream.noiseStream.remotePublicKey
)
if (!peer) return
this.requestCoreKey(peer.remotePublicKey, discoveryKey)
},
const protocolStream = this.#corestore.replicate(stream)
this.#creatorCore.on('peer-add', (peer) => {
// Normally only auth core keys are sent, but for unit tests we need to
// send all of them, because unit tests don't include the Sync API which
// adds cores from core ownership records.
this.#sendCoreKeys(peer, NAMESPACES)
})
return this.#corestore.replicate(stream)
return protocolStream
}
}

Expand Down Expand Up @@ -523,93 +465,3 @@ const HaveExtensionCodec = {
}
},
}

class TrackedKeyRequests {
/** @type {Map<string, string>} */
#byDiscoveryId = new Map()
/** @type {Map<string, Set<string>>} */
#byPeerId = new Map()

/**
* @param {Buffer} discoveryKey
* @param {Buffer} peerKey
*/
set(discoveryKey, peerKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId) || new Set()
this.#byDiscoveryId.set(discoveryId, peerId)
existingForPeer.add(discoveryId)
this.#byPeerId.set(peerId, existingForPeer)
return this
}
/**
* @param {Buffer} discoveryKey
*/
has(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
return this.#byDiscoveryId.has(discoveryId)
}
/**
* @param {Buffer} discoveryKey
*/
deleteByDiscoveryKey(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = this.#byDiscoveryId.get(discoveryId)
if (!peerId) return false
this.#byDiscoveryId.delete(discoveryId)
const existingForPeer = this.#byPeerId.get(peerId)
if (existingForPeer) {
existingForPeer.delete(discoveryId)
}
return true
}
/**
* @param {Buffer} peerKey
*/
deleteByPeerKey(peerKey) {
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId)
if (!existingForPeer) return
for (const discoveryId of existingForPeer) {
this.#byDiscoveryId.delete(discoveryId)
}
this.#byPeerId.delete(peerId)
}
clear() {
this.#byDiscoveryId.clear()
this.#byPeerId.clear()
}
}

/**
* @param {Hypercore<"binary", Buffer>} core
* @param {Buffer} publicKey
* @param {{ timeout?: number }} [opts]
*/
function findPeer(core, publicKey, { timeout = 200 } = {}) {
const peer = core.peers.find((peer) => {
return peer.remotePublicKey.equals(publicKey)
})
if (peer) return peer
// This is called from the from the handleDiscoveryId event, which can
// happen before the peer connection is fully established, so we wait for
// the `peer-add` event, with a timeout in case the peer never gets added
return new Promise(function (res) {
const timeoutId = setTimeout(function () {
core.off('peer-add', onPeer)
res(null)
}, timeout)

core.on('peer-add', onPeer)

/** @param {any} peer */
function onPeer(peer) {
if (peer.remotePublicKey.equals(publicKey)) {
clearTimeout(timeoutId)
core.off('peer-add', onPeer)
res(peer)
}
}
})
}
Loading

0 comments on commit 0f4a0d6

Please sign in to comment.