diff --git a/src/index-writer/index.js b/src/index-writer/index.js index f7a857a09..a48c5fde1 100644 --- a/src/index-writer/index.js +++ b/src/index-writer/index.js @@ -4,7 +4,6 @@ import { getTableConfig } from 'drizzle-orm/sqlite-core' import { getBacklinkTableName } from '../schema/utils.js' import { discoveryKey } from 'hypercore-crypto' import { Logger } from '../logger.js' -import { TypedEmitter } from 'tiny-typed-emitter' /** * @typedef {import('../datatype/index.js').MapeoDocTables} MapeoDocTables @@ -15,12 +14,6 @@ import { TypedEmitter } from 'tiny-typed-emitter' /** * @typedef {ReturnType} MapeoDocInternal */ -/** - * @template {import('@mapeo/schema').MapeoDoc['schemaName']} TSchemaName - * @typedef {{ - * [S in TSchemaName]: (docs: Extract[]) => void - * }} SchemaEmitterEvents - */ /** * @template {MapeoDocTables} [TTables=MapeoDocTables] @@ -29,8 +22,6 @@ export class IndexWriter { /** @type {Map} */ #indexers = new Map() #mapDoc - /** @type {TypedEmitter> | undefined} */ - #schemaEmitter #l /** * @@ -62,20 +53,6 @@ export class IndexWriter { return [...this.#indexers.keys()] } - /** - * @template {keyof SchemaEmitterEvents} TSchemaName - * @param {{ schemaName: TSchemaName }} opts - * @param {SchemaEmitterEvents[TSchemaName]} onDocs - */ - subscribe({ schemaName }, onDocs) { - if (!this.#schemaEmitter) this.#schemaEmitter = new TypedEmitter() - const schemaEmitter = this.#schemaEmitter - schemaEmitter.on(schemaName, onDocs) - return function unsubscribe() { - schemaEmitter.off(schemaName, onDocs) - } - } - /** * * @param {import('multi-core-indexer').Entry[]} entries @@ -83,8 +60,8 @@ export class IndexWriter { async batch(entries) { // sqlite-indexer is _significantly_ faster when batching even <10 at a // time, so best to queue docs here before calling sliteIndexer.batch() - /** @type {Map} */ - const queued = new Map() + /** @type {Record} */ + const queued = {} for (const { block, key, index } of entries) { try { const version = { coreDiscoveryKey: discoveryKey(key), index } @@ -94,24 +71,19 @@ export class IndexWriter { // Unknown or invalid entry - silently ignore continue } - if (!this.#indexers.has(doc.schemaName)) continue - const queue = queued.get(doc.schemaName) - if (queue) { - queue.push(doc) + if (queued[doc.schemaName]) { + queued[doc.schemaName].push(doc) } else { - queued.set(doc.schemaName, [doc]) + queued[doc.schemaName] = [doc] } } - for (const [schemaName, docs] of queued.entries()) { + for (const [schemaName, docs] of Object.entries(queued)) { + // @ts-expect-error const indexer = this.#indexers.get(schemaName) if (!indexer) { // Don't have an indexer for this type - silently ignore continue } - if (this.#schemaEmitter) { - // @ts-ignore - this.#schemaEmitter.emit(schemaName, docs) - } indexer.batch(docs) if (this.#l.enabled) { for (const doc of docs) {