Skip to content

Commit

Permalink
cleanup implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Nov 29, 2023
1 parent 9e05beb commit 0b6362a
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 120 deletions.
8 changes: 7 additions & 1 deletion proto/extensions.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
syntax = "proto3";

message ProjectExtension {
repeated bytes authCoreKeys = 1;
repeated bytes wantCoreKeys = 1;

repeated bytes authCoreKeys = 2;
repeated bytes configCoreKeys = 3;
repeated bytes dataCoreKeys = 4;
repeated bytes blobIndexCoreKeys = 5;
repeated bytes blobCoreKeys = 6;
}

message HaveExtension {
Expand Down
16 changes: 14 additions & 2 deletions src/capabilities.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { currentSchemaVersions } from '@mapeo/schema'
import mapObject from 'map-obj'
import { kCreateWithDocId } from './datatype/index.js'
import { kCreateWithDocId, kDataStore } from './datatype/index.js'
import { TypedEmitter } from 'tiny-typed-emitter'

// Randomly generated 8-byte encoded as hex
export const COORDINATOR_ROLE_ID = 'f7c150f5a3a9a855'
Expand Down Expand Up @@ -140,7 +141,15 @@ export const DEFAULT_CAPABILITIES = {
},
}

export class Capabilities {
/**
* @typedef {object} CapabilitiesEvents
* @property {(docs: import('@mapeo/schema').Role[]) => void} update - Emitted when new role records are indexed
*/

/**
* @extends {TypedEmitter<CapabilitiesEvents>}
*/
export class Capabilities extends TypedEmitter {
#dataType
#coreOwnership
#coreManager
Expand All @@ -165,11 +174,14 @@ export class Capabilities {
* @param {Buffer} opts.deviceKey public key of this device
*/
constructor({ dataType, coreOwnership, coreManager, projectKey, deviceKey }) {
super()
this.#dataType = dataType
this.#coreOwnership = coreOwnership
this.#coreManager = coreManager
this.#projectCreatorAuthCoreId = projectKey.toString('hex')
this.#ownDeviceId = deviceKey.toString('hex')

dataType[kDataStore].on('role', this.emit.bind(this, 'update'))
}

/**
Expand Down
18 changes: 3 additions & 15 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,24 +349,12 @@ export class CoreManager extends TypedEmitter {
*/
#sendCoreKeys(peer, namespaces) {
const message = ProjectExtension.create()
let hasKeys = false
for (const ns of namespaces) {
for (const { core, key } of this.getCores(ns)) {
let peerHasKeyAlready = false
for (const peer of core.peers) {
if (peer.remotePublicKey.equals(peer.remotePublicKey)) {
peerHasKeyAlready = true
break
}
}
if (peerHasKeyAlready) continue
message.authCoreKeys.push(key)
hasKeys = true
for (const { key } of this.getCores(ns)) {
message[`${ns}CoreKeys`].push(key)
}
}
if (hasKeys) {
this.#projectExtension.send(message, peer)
}
this.#projectExtension.send(message, peer)
}

/**
Expand Down
25 changes: 21 additions & 4 deletions src/core-ownership.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,31 @@ import { parseVersionId } from '@mapeo/schema'
import { defaultGetWinner } from '@mapeo/sqlite-indexer'
import assert from 'node:assert'
import sodium from 'sodium-universal'
import { kTable, kSelect, kCreateWithDocId } from './datatype/index.js'
import {
kTable,
kSelect,
kCreateWithDocId,
kDataStore,
} from './datatype/index.js'
import { eq, or } from 'drizzle-orm'
import mapObject from 'map-obj'
import { discoveryKey } from 'hypercore-crypto'
import pDefer from 'p-defer'
import { TypedEmitter } from 'tiny-typed-emitter'

/**
* @typedef {import('./types.js').CoreOwnershipWithSignatures} CoreOwnershipWithSignatures
*/

export class CoreOwnership {
/**
* @typedef {object} CoreOwnershipEvents
* @property {(docs: import('@mapeo/schema').CoreOwnership[]) => void} update - Emitted when new coreOwnership records are indexed
*/

/**
* @extends {TypedEmitter<CoreOwnershipEvents>}
*/
export class CoreOwnership extends TypedEmitter {
#dataType
#ownershipWriteDone
/**
Expand All @@ -31,11 +45,14 @@ export class CoreOwnership {
* @param {import('./types.js').KeyPair} opts.identityKeypair
*/
constructor({ dataType, coreKeypairs, identityKeypair }) {
super()
this.#dataType = dataType
const authWriterCore = dataType.writerCore
const authWriterCore = dataType[kDataStore].writerCore
const deferred = pDefer()
this.#ownershipWriteDone = deferred.promise

dataType[kDataStore].on('coreOwnership', this.emit.bind(this, 'update'))

const writeOwnership = () => {
if (authWriterCore.length > 0) {
deferred.resolve()
Expand All @@ -49,7 +66,7 @@ export class CoreOwnership {
if (authWriterCore.opened) {
writeOwnership()
} else {
authWriterCore.on('ready', writeOwnership)
authWriterCore.once('ready', writeOwnership)
}
}

Expand Down
58 changes: 52 additions & 6 deletions src/datastore/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { encode, decode, getVersionId, parseVersionId } from '@mapeo/schema'
import {
encode,
decode,
getVersionId,
parseVersionId,
decodeBlockPrefix,
} from '@mapeo/schema'
import MultiCoreIndexer from 'multi-core-indexer'
import pDefer from 'p-defer'
import { discoveryKey } from 'hypercore-crypto'
import { TypedEmitter } from 'tiny-typed-emitter'

/**
* @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc
Expand All @@ -26,11 +33,19 @@ const NAMESPACE_SCHEMAS = /** @type {const} */ ({
* @typedef {typeof NAMESPACE_SCHEMAS} NamespaceSchemas
*/

/**
* @template {MapeoDoc['schemaName']} TSchemaName
* @typedef {{
* [S in TSchemaName]: (docs: Extract<MapeoDoc, { schemaName: S }>[]) => void
* }} DataStoreEvents
*/

/**
* @template {keyof NamespaceSchemas} [TNamespace=keyof NamespaceSchemas]
* @template {NamespaceSchemas[TNamespace][number]} [TSchemaName=NamespaceSchemas[TNamespace][number]]
* @extends {TypedEmitter<DataStoreEvents<TSchemaName>>}
*/
export class DataStore {
export class DataStore extends TypedEmitter {
#coreManager
#namespace
#batch
Expand All @@ -40,18 +55,22 @@ export class DataStore {
#pendingIndex = new Map()
/** @type {Set<import('p-defer').DeferredPromise<void>['promise']>} */
#pendingAppends = new Set()
#emitUpdates

/**
* @param {object} opts
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {TNamespace} opts.namespace
* @param {(entries: MultiCoreIndexer.Entry<'binary'>[]) => Promise<void>} opts.batch
* @param {MultiCoreIndexer.StorageParam} opts.storage
* @param {TSchemaName[]} [opts.emitUpdates] - List of schemas to emit updates for. Emitting an update is expensive because it requires decoding an entry, so should only be done schemas that will not have many documents in the database
*/
constructor({ coreManager, namespace, batch, storage }) {
constructor({ coreManager, namespace, batch, storage, emitUpdates }) {
super()
this.#coreManager = coreManager
this.#namespace = namespace
this.#batch = batch
this.#emitUpdates = emitUpdates && new Set(emitUpdates)
this.#writerCore = coreManager.getWriterCore(namespace).core
const cores = coreManager.getCores(namespace).map((cr) => cr.core)
this.#coreIndexer = new MultiCoreIndexer(cores, {
Expand Down Expand Up @@ -92,18 +111,45 @@ export class DataStore {
async #handleEntries(entries) {
await this.#batch(entries)
await Promise.all(this.#pendingAppends)
const toEmit =
/** @type {{ [S in TSchemaName]: Array<Extract<MapeoDoc, { schemaName: S }>> }} */
({})
// Writes to the writerCore need to wait until the entry is indexed before
// returning, so we check if any incoming entry has a pending promise
for (const entry of entries) {
if (!entry.key.equals(this.#writerCore.key)) continue
const versionId = getVersionId({
const versionObj = {
coreDiscoveryKey: discoveryKey(entry.key),
index: entry.index,
})
}
// We do this here rather than in IndexWriter (which is already decoding
// the entries and could return decoded entries from the batch function or
// emit events itself) because IndexWriter will eventually be in a worker
// thread, and my assumption is that sending a decoded doc over a
// MessageChannel from the worker is more expensive than decoding the
// entry here. This also avoids setting up RPC calls with the worker.
if (this.#emitUpdates) {
try {
const { schemaName } = decodeBlockPrefix(entry.block)
// @ts-ignore
if (!this.#emitUpdates.has(schemaName)) return
// @ts-ignore
toEmit[schemaName] = toEmit[schemaName] || []
// @ts-ignore
toEmit[schemaName].push(decode(entry.block, versionObj))
} catch (e) {
// Ignore docs we can't decode
}
}
if (!entry.key.equals(this.#writerCore.key)) continue
const versionId = getVersionId(versionObj)
const pending = this.#pendingIndex.get(versionId)
if (!pending) continue
pending.resolve()
}
for (const [schemaName, docs] of Object.entries(toEmit)) {
// @ts-ignore
this.emit(schemaName, docs)
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/datatype/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type MapeoDocTablesMap = {
export const kCreateWithDocId: unique symbol
export const kSelect: unique symbol
export const kTable: unique symbol
export const kDataStore: unique symbol

type OmitUnion<T, K extends keyof any> = T extends any ? Omit<T, K> : never
type ExcludeSchema<
Expand Down Expand Up @@ -60,7 +61,7 @@ export class DataType<

get [kTable](): TTable

get writerCore(): Hypercore<'binary', Buffer>
get [kDataStore](): TDataStore

[kCreateWithDocId](
docId: string,
Expand Down
5 changes: 3 additions & 2 deletions src/datatype/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ function generateDate() {
export const kCreateWithDocId = Symbol('kCreateWithDocId')
export const kSelect = Symbol('select')
export const kTable = Symbol('table')
export const kDataStore = Symbol('dataStore')

/**
* @template {import('../datastore/index.js').DataStore} TDataStore
Expand Down Expand Up @@ -91,8 +92,8 @@ export class DataType {
return this.#table
}

get writerCore() {
return this.#dataStore.writerCore
get [kDataStore]() {
return this.#dataStore
}

/**
Expand Down
33 changes: 29 additions & 4 deletions src/generated/extensions.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
/// <reference types="node" />
import _m0 from "protobufjs/minimal.js";
export interface ProjectExtension {
wantCoreKeys: Buffer[];
authCoreKeys: Buffer[];
configCoreKeys: Buffer[];
dataCoreKeys: Buffer[];
blobIndexCoreKeys: Buffer[];
blobCoreKeys: Buffer[];
}
export interface HaveExtension {
discoveryKey: Buffer;
Expand All @@ -24,15 +29,35 @@ export declare const ProjectExtension: {
encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension;
create<I extends {
wantCoreKeys?: Buffer[];
authCoreKeys?: Buffer[];
configCoreKeys?: Buffer[];
dataCoreKeys?: Buffer[];
blobIndexCoreKeys?: Buffer[];
blobCoreKeys?: Buffer[];
} & {
authCoreKeys?: Buffer[] & Buffer[] & { [K in Exclude<keyof I["authCoreKeys"], keyof Buffer[]>]: never; };
} & { [K_1 in Exclude<keyof I, "authCoreKeys">]: never; }>(base?: I): ProjectExtension;
wantCoreKeys?: Buffer[] & Buffer[] & { [K in Exclude<keyof I["wantCoreKeys"], keyof Buffer[]>]: never; };
authCoreKeys?: Buffer[] & Buffer[] & { [K_1 in Exclude<keyof I["authCoreKeys"], keyof Buffer[]>]: never; };
configCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude<keyof I["configCoreKeys"], keyof Buffer[]>]: never; };
dataCoreKeys?: Buffer[] & Buffer[] & { [K_3 in Exclude<keyof I["dataCoreKeys"], keyof Buffer[]>]: never; };
blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_4 in Exclude<keyof I["blobIndexCoreKeys"], keyof Buffer[]>]: never; };
blobCoreKeys?: Buffer[] & Buffer[] & { [K_5 in Exclude<keyof I["blobCoreKeys"], keyof Buffer[]>]: never; };
} & { [K_6 in Exclude<keyof I, keyof ProjectExtension>]: never; }>(base?: I): ProjectExtension;
fromPartial<I_1 extends {
wantCoreKeys?: Buffer[];
authCoreKeys?: Buffer[];
configCoreKeys?: Buffer[];
dataCoreKeys?: Buffer[];
blobIndexCoreKeys?: Buffer[];
blobCoreKeys?: Buffer[];
} & {
authCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude<keyof I_1["authCoreKeys"], keyof Buffer[]>]: never; };
} & { [K_3 in Exclude<keyof I_1, "authCoreKeys">]: never; }>(object: I_1): ProjectExtension;
wantCoreKeys?: Buffer[] & Buffer[] & { [K_7 in Exclude<keyof I_1["wantCoreKeys"], keyof Buffer[]>]: never; };
authCoreKeys?: Buffer[] & Buffer[] & { [K_8 in Exclude<keyof I_1["authCoreKeys"], keyof Buffer[]>]: never; };
configCoreKeys?: Buffer[] & Buffer[] & { [K_9 in Exclude<keyof I_1["configCoreKeys"], keyof Buffer[]>]: never; };
dataCoreKeys?: Buffer[] & Buffer[] & { [K_10 in Exclude<keyof I_1["dataCoreKeys"], keyof Buffer[]>]: never; };
blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_11 in Exclude<keyof I_1["blobIndexCoreKeys"], keyof Buffer[]>]: never; };
blobCoreKeys?: Buffer[] & Buffer[] & { [K_12 in Exclude<keyof I_1["blobCoreKeys"], keyof Buffer[]>]: never; };
} & { [K_13 in Exclude<keyof I_1, keyof ProjectExtension>]: never; }>(object: I_1): ProjectExtension;
};
export declare const HaveExtension: {
encode(message: HaveExtension, writer?: _m0.Writer): _m0.Writer;
Expand Down
Loading

0 comments on commit 0b6362a

Please sign in to comment.