Skip to content

Commit

Permalink
revert IndexWriter changes
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Nov 29, 2023
1 parent 77aeeff commit 9e05beb
Showing 1 changed file with 7 additions and 35 deletions.
42 changes: 7 additions & 35 deletions src/index-writer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { getTableConfig } from 'drizzle-orm/sqlite-core'
import { getBacklinkTableName } from '../schema/utils.js'
import { discoveryKey } from 'hypercore-crypto'
import { Logger } from '../logger.js'
import { TypedEmitter } from 'tiny-typed-emitter'

/**
* @typedef {import('../datatype/index.js').MapeoDocTables} MapeoDocTables
Expand All @@ -15,12 +14,6 @@ import { TypedEmitter } from 'tiny-typed-emitter'
/**
* @typedef {ReturnType<import('@mapeo/schema').decode>} MapeoDocInternal
*/
/**
* @template {import('@mapeo/schema').MapeoDoc['schemaName']} TSchemaName
* @typedef {{
* [S in TSchemaName]: (docs: Extract<MapeoDoc, { schemaName: S }>[]) => void
* }} SchemaEmitterEvents
*/

/**
* @template {MapeoDocTables} [TTables=MapeoDocTables]
Expand All @@ -29,8 +22,6 @@ export class IndexWriter {
/** @type {Map<TTables['_']['name'], SqliteIndexer>} */
#indexers = new Map()
#mapDoc
/** @type {TypedEmitter<SchemaEmitterEvents<TTables['_']['name']>> | undefined} */
#schemaEmitter
#l
/**
*
Expand Down Expand Up @@ -62,29 +53,15 @@ export class IndexWriter {
return [...this.#indexers.keys()]
}

/**
* @template {keyof SchemaEmitterEvents<TTables['_']['name']>} TSchemaName
* @param {{ schemaName: TSchemaName }} opts
* @param {SchemaEmitterEvents<TTables['_']['name']>[TSchemaName]} onDocs
*/
subscribe({ schemaName }, onDocs) {
if (!this.#schemaEmitter) this.#schemaEmitter = new TypedEmitter()
const schemaEmitter = this.#schemaEmitter
schemaEmitter.on(schemaName, onDocs)
return function unsubscribe() {
schemaEmitter.off(schemaName, onDocs)
}
}

/**
*
* @param {import('multi-core-indexer').Entry[]} entries
*/
async batch(entries) {
// sqlite-indexer is _significantly_ faster when batching even <10 at a
// time, so best to queue docs here before calling sliteIndexer.batch()
/** @type {Map<TTables['_']['name'], MapeoDoc[]>} */
const queued = new Map()
/** @type {Record<string, MapeoDoc[]>} */
const queued = {}
for (const { block, key, index } of entries) {
try {
const version = { coreDiscoveryKey: discoveryKey(key), index }
Expand All @@ -94,24 +71,19 @@ export class IndexWriter {
// Unknown or invalid entry - silently ignore
continue
}
if (!this.#indexers.has(doc.schemaName)) continue
const queue = queued.get(doc.schemaName)
if (queue) {
queue.push(doc)
if (queued[doc.schemaName]) {
queued[doc.schemaName].push(doc)
} else {
queued.set(doc.schemaName, [doc])
queued[doc.schemaName] = [doc]
}
}
for (const [schemaName, docs] of queued.entries()) {
for (const [schemaName, docs] of Object.entries(queued)) {
// @ts-expect-error
const indexer = this.#indexers.get(schemaName)
if (!indexer) {
// Don't have an indexer for this type - silently ignore
continue
}
if (this.#schemaEmitter) {
// @ts-ignore
this.#schemaEmitter.emit(schemaName, docs)
}
indexer.batch(docs)
if (this.#l.enabled) {
for (const doc of docs) {
Expand Down

0 comments on commit 9e05beb

Please sign in to comment.