Skip to content

Commit

Permalink
database batching (#1858)
Browse files Browse the repository at this point in the history
* concurrent promises

* changeset

* move subscriber notification after tx

* remove await inside indexdb transaction

* satisy linter by ignoring promise with void
  • Loading branch information
TalDerei authored Oct 20, 2024
1 parent 79ee214 commit 8c036f5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .changeset/spicy-terms-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@penumbra-zone/storage': minor
'@penumbra-zone/query': minor
---

batching storage operations with promises
39 changes: 31 additions & 8 deletions packages/query/src/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,11 @@ export class BlockProcessor implements BlockProcessorInterface {

// flushing is slow, avoid it until
// - wasm says
// - every 1000th block
// - every 5000th block
// - every block at tip
const flushReasons = {
scannerWantsFlush,
interval: compactBlock.height % 1000n === 0n,
interval: compactBlock.height % 5000n === 0n,
new: compactBlock.height > latestKnownBlockHeight,
};

Expand Down Expand Up @@ -431,14 +431,18 @@ export class BlockProcessor implements BlockProcessorInterface {
}

private async identifyNewAssets(notes: SpendableNoteRecord[]) {
const saveOperations = [];

for (const note of notes) {
const assetId = note.note?.value?.assetId;
if (!assetId) {
continue;
}

await this.saveAndReturnMetadata(assetId);
saveOperations.push(this.saveAndReturnMetadata(assetId));
}

await Promise.all(saveOperations);
}

// TODO: refactor. there is definitely a better way to do this. batch
Expand Down Expand Up @@ -491,11 +495,25 @@ export class BlockProcessor implements BlockProcessorInterface {
// Nullifier is published in network when a note is spent or swap is claimed.
private async resolveNullifiers(nullifiers: Nullifier[], height: bigint) {
const spentNullifiers = new Set<Nullifier>();
const readOperations = [];
const writeOperations = [];

for (const nullifier of nullifiers) {
const record =
(await this.indexedDb.getSpendableNoteByNullifier(nullifier)) ??
(await this.indexedDb.getSwapByNullifier(nullifier));
const readPromise = (async () => {
const record =
(await this.indexedDb.getSpendableNoteByNullifier(nullifier)) ??
(await this.indexedDb.getSwapByNullifier(nullifier));
return { nullifier, record };
})();

readOperations.push(readPromise);
}

// Await all reads in parallel
const readResults = await Promise.all(readOperations);

// Process the read results and queue up write operations
for (const { nullifier, record } of readResults) {
if (!record) {
continue;
}
Expand All @@ -504,19 +522,24 @@ export class BlockProcessor implements BlockProcessorInterface {

if (record instanceof SpendableNoteRecord) {
record.heightSpent = height;
await this.indexedDb.saveSpendableNote({
const writePromise = this.indexedDb.saveSpendableNote({
...toPlainMessage(record),
noteCommitment: toPlainMessage(getSpendableNoteRecordCommitment(record)),
});
writeOperations.push(writePromise);
} else if (record instanceof SwapRecord) {
record.heightClaimed = height;
await this.indexedDb.saveSwap({
const writePromise = this.indexedDb.saveSwap({
...toPlainMessage(record),
swapCommitment: toPlainMessage(getSwapRecordCommitment(record)),
});
writeOperations.push(writePromise);
}
}

// Await all writes in parallel
await Promise.all(writeOperations);

return spentNullifiers;
}

Expand Down
10 changes: 8 additions & 2 deletions packages/storage/src/indexed-db/updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@ export class IbdUpdater {
const tables = updates.all.map(u => u.table);
const tx = this.db.transaction(tables, 'readwrite');

// Batch all the updates into promises
for (const update of updates.all) {
await tx.objectStore(update.table).put(update.value, update.key);
this.notifySubscribers(update);
void tx.objectStore(update.table).put(update.value, update.key);
}

// Await the atomic transaction to complete
await tx.done;

// Notify subscribers after the transaction has successfully committed
for (const update of updates.all) {
this.notifySubscribers(update);
}
}

async update<DBTypes extends PenumbraDb, StoreName extends StoreNames<DBTypes>>(
Expand Down

0 comments on commit 8c036f5

Please sign in to comment.