Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't send core keys or haves to peers without capabilities #390

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/capabilities.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { currentSchemaVersions } from '@mapeo/schema'
import mapObject from 'map-obj'
import { kCreateWithDocId } from './datatype/index.js'
import { kCreateWithDocId, kDataStore } from './datatype/index.js'
import { TypedEmitter } from 'tiny-typed-emitter'

// Randomly generated 8-byte encoded as hex
export const COORDINATOR_ROLE_ID = 'f7c150f5a3a9a855'
Expand Down Expand Up @@ -140,7 +141,15 @@ export const DEFAULT_CAPABILITIES = {
},
}

export class Capabilities {
/**
* @typedef {object} CapabilitiesEvents
* @property {(docs: import('@mapeo/schema').Role[]) => void} update - Emitted when new role records are indexed
*/

/**
* @extends {TypedEmitter<CapabilitiesEvents>}
*/
export class Capabilities extends TypedEmitter {
#dataType
#coreOwnership
#coreManager
Expand All @@ -165,11 +174,14 @@ export class Capabilities {
* @param {Buffer} opts.deviceKey public key of this device
*/
constructor({ dataType, coreOwnership, coreManager, projectKey, deviceKey }) {
super()
this.#dataType = dataType
this.#coreOwnership = coreOwnership
this.#coreManager = coreManager
this.#projectCreatorAuthCoreId = projectKey.toString('hex')
this.#ownDeviceId = deviceKey.toString('hex')

dataType[kDataStore].on('role', this.emit.bind(this, 'update'))
}

/**
Expand Down
222 changes: 37 additions & 185 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import { CoreIndex } from './core-index.js'
import * as rle from './bitfield-rle.js'
import { Logger } from '../logger.js'
import { keyToId } from '../utils.js'
import { discoveryKey } from 'hypercore-crypto'
import Hypercore from 'hypercore'

export const kCoreManagerReplicate = Symbol('replicate core manager')
// WARNING: Changing these will break things for existing apps, since namespaces
Expand Down Expand Up @@ -55,12 +52,6 @@ export class CoreManager extends TypedEmitter {
#haveExtension
#deviceId
#l
/**
* We use this to reduce network traffic caused by requesting the same key
* from multiple clients.
* TODO: Remove items from this set after a max age
*/
#keyRequests = new TrackedKeyRequests()
#autoDownload

static get namespaces() {
Expand Down Expand Up @@ -155,8 +146,8 @@ export class CoreManager extends TypedEmitter {
'mapeo/project',
{
encoding: ProjectExtensionCodec,
onmessage: (msg, peer) => {
this.#handleProjectMessage(msg, peer)
onmessage: (msg) => {
this.#handleProjectMessage(msg)
},
}
)
Expand All @@ -168,16 +159,6 @@ export class CoreManager extends TypedEmitter {
},
})

this.#creatorCore.on('peer-add', (peer) => {
this.#sendHaves(peer)
})
this.#creatorCore.on('peer-remove', (peer) => {
// When a peer is removed we clean up any unanswered key requests, so that
// we will request from a different peer, and to avoid the tracking of key
// requests growing without bounds.
this.#keyRequests.deleteByPeerKey(peer.remotePublicKey)
})

this.#ready = Promise.all(
[...this.#coreIndex].map(({ core }) => core.ready())
)
Expand Down Expand Up @@ -253,7 +234,6 @@ export class CoreManager extends TypedEmitter {
*/
async close() {
this.#state = 'closing'
this.#keyRequests.clear()
const promises = []
for (const { core } of this.#coreIndex) {
promises.push(core.close())
Expand Down Expand Up @@ -342,69 +322,39 @@ export class CoreManager extends TypedEmitter {
}

/**
* Send an extension message over the project creator core replication stream
* requesting a core key for the given discovery key.
*
* @param {Buffer} peerKey
* @param {Buffer} discoveryKey
* @param {ProjectExtension} msg
*/
requestCoreKey(peerKey, discoveryKey) {
// No-op if we already have this core
if (this.getCoreByDiscoveryKey(discoveryKey)) return
const peer = this.#creatorCore.peers.find((peer) => {
return peer.remotePublicKey.equals(peerKey)
})
if (!peer) {
// This should not happen because this is only called from SyncApi, which
// checks the peer exists before calling this method.
this.#l.log(
'Attempted to request core key for %h, but no connected peer %h',
discoveryKey,
peerKey
)
return
#handleProjectMessage({ authCoreKeys }) {
for (const coreKey of authCoreKeys) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(coreKey, 'auth')
}
// Only request a key once, e.g. from the peer we first receive it from (we
// can assume that a peer must have the key if we see the discovery key in
// the protomux). This is necessary to reduce network traffic for many newly
// connected peers - otherwise duplicate requests will be sent to every peer
if (this.#keyRequests.has(discoveryKey)) return
this.#keyRequests.set(discoveryKey, peerKey)
}

this.#l.log(
'Requesting core key for discovery key %h from peer %h',
discoveryKey,
peerKey
)
const message = ProjectExtension.fromPartial({
wantCoreKeys: [discoveryKey],
})
this.#projectExtension.send(message, peer)
/**
* Sends auth core keys to the given peer, skipping any keys that we know the
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this skips any keys yet, which is fine, probably a perf optimization for later.

* peer has already (depends on the peer having already replicated the auth
* cores it has)
*
* @param {any} peer
*/
sendAuthCoreKeys(peer) {
this.#sendCoreKeys(peer, ['auth'])
}

/**
* @param {ProjectExtension} msg
* We only send non-auth core keys to a peer for unit tests
* @param {any} peer
* @param {Readonly<Namespace[]>} namespaces
*/
#handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) {
#sendCoreKeys(peer, namespaces) {
const message = ProjectExtension.create()
let hasKeys = false
for (const discoveryKey of wantCoreKeys) {
const coreRecord = this.getCoreByDiscoveryKey(discoveryKey)
if (!coreRecord) continue
message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key)
hasKeys = true
}
if (hasKeys) {
this.#projectExtension.send(message, peer)
}
for (const namespace of NAMESPACES) {
for (const coreKey of coreKeys[`${namespace}CoreKeys`]) {
// Use public method - these must be persisted (private method defaults to persisted=false)
this.addCore(coreKey, namespace)
this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey))
for (const ns of namespaces) {
for (const { key } of this.getCores(ns)) {
message[`${ns}CoreKeys`].push(key)
}
}
this.#projectExtension.send(message, peer)
}

/**
Expand All @@ -426,21 +376,17 @@ export class CoreManager extends TypedEmitter {
}

/**
*
* @param {any} peer
* @param {Exclude<Namespace, 'auth'>} namespace
*/
async #sendHaves(peer) {
if (!peer) {
console.warn('sendHaves no peer', peer.remotePublicKey)
// TODO: How to handle this and when does it happen?
return
}
async sendHaves(peer, namespace) {
// We want ready() rather than update() because we are only interested in
// local data. This waits for all cores to be ready.
await this.ready()

peer.protomux.cork()

for (const { core, namespace } of this.#coreIndex) {
// We want ready() rather than update() because we are only interested in local data
await core.ready()
for (const { core } of this.getCores(namespace)) {
if (core.length === 0) continue
const { discoveryKey } = core
// This will always be defined after ready(), but need to let TS know
Expand All @@ -465,18 +411,14 @@ export class CoreManager extends TypedEmitter {
* @returns
*/
[kCoreManagerReplicate](stream) {
const protocolStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: async (discoveryKey) => {
const peer = await findPeer(
this.creatorCore,
// @ts-ignore
protocolStream.noiseStream.remotePublicKey
)
if (!peer) return
this.requestCoreKey(peer.remotePublicKey, discoveryKey)
},
const protocolStream = this.#corestore.replicate(stream)
this.#creatorCore.on('peer-add', (peer) => {
// Normally only auth core keys are sent, but for unit tests we need to
// send all of them, because unit tests don't include the Sync API which
// adds cores from core ownership records.
this.#sendCoreKeys(peer, NAMESPACES)
})
return this.#corestore.replicate(stream)
return protocolStream
}
}

Expand Down Expand Up @@ -523,93 +465,3 @@ const HaveExtensionCodec = {
}
},
}

class TrackedKeyRequests {
/** @type {Map<string, string>} */
#byDiscoveryId = new Map()
/** @type {Map<string, Set<string>>} */
#byPeerId = new Map()

/**
* @param {Buffer} discoveryKey
* @param {Buffer} peerKey
*/
set(discoveryKey, peerKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId) || new Set()
this.#byDiscoveryId.set(discoveryId, peerId)
existingForPeer.add(discoveryId)
this.#byPeerId.set(peerId, existingForPeer)
return this
}
/**
* @param {Buffer} discoveryKey
*/
has(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
return this.#byDiscoveryId.has(discoveryId)
}
/**
* @param {Buffer} discoveryKey
*/
deleteByDiscoveryKey(discoveryKey) {
const discoveryId = keyToId(discoveryKey)
const peerId = this.#byDiscoveryId.get(discoveryId)
if (!peerId) return false
this.#byDiscoveryId.delete(discoveryId)
const existingForPeer = this.#byPeerId.get(peerId)
if (existingForPeer) {
existingForPeer.delete(discoveryId)
}
return true
}
/**
* @param {Buffer} peerKey
*/
deleteByPeerKey(peerKey) {
const peerId = keyToId(peerKey)
const existingForPeer = this.#byPeerId.get(peerId)
if (!existingForPeer) return
for (const discoveryId of existingForPeer) {
this.#byDiscoveryId.delete(discoveryId)
}
this.#byPeerId.delete(peerId)
}
clear() {
this.#byDiscoveryId.clear()
this.#byPeerId.clear()
}
}

/**
* @param {Hypercore<"binary", Buffer>} core
* @param {Buffer} publicKey
* @param {{ timeout?: number }} [opts]
*/
function findPeer(core, publicKey, { timeout = 200 } = {}) {
const peer = core.peers.find((peer) => {
return peer.remotePublicKey.equals(publicKey)
})
if (peer) return peer
// This is called from the from the handleDiscoveryId event, which can
// happen before the peer connection is fully established, so we wait for
// the `peer-add` event, with a timeout in case the peer never gets added
return new Promise(function (res) {
const timeoutId = setTimeout(function () {
core.off('peer-add', onPeer)
res(null)
}, timeout)

core.on('peer-add', onPeer)

/** @param {any} peer */
function onPeer(peer) {
if (peer.remotePublicKey.equals(publicKey)) {
clearTimeout(timeoutId)
core.off('peer-add', onPeer)
res(peer)
}
}
})
}
Loading
Loading