diff --git a/CHANGELOG.md b/CHANGELOG.md index 3043db0..b9028a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.2.2-alpha.0 + +- Enables buffering writes for performance when using triggers + ## 0.2.1 - Allow passing { bounds: { eq: key }}, supporting non-array keys for counts diff --git a/README.md b/README.md index f07ffec..26a9ae5 100644 --- a/README.md +++ b/README.md @@ -489,6 +489,74 @@ export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB)); The [`example/convex/photos.ts`](example/convex/photos.ts) example uses a trigger. +### Optimizing Triggers with Batching + +**Recommended:** When using triggers, combine them with the batching API for +optimal performance. + +Without batching, each table write triggers a separate call to the aggregate +component. If you insert 100 rows in a mutation, that's 100 individual calls to +the aggregate component, each fetching and updating the B-tree separately. + +With batching, all triggered aggregate operations are queued and sent as a +single batch at the end of the mutation. This provides: + +- **Single component call** instead of N separate calls +- **Single tree fetch** instead of N fetches +- **Better write contention handling** - one atomic update instead of many +- **Significant performance improvement** - especially for bulk operations + +Here's how to set it up: + +```ts +import { TableAggregate } from "@convex-dev/aggregate"; +import { Triggers } from "convex-helpers/server/triggers"; +import { customMutation } from "convex-helpers/server/customFunctions"; +import { mutation as rawMutation } from "./_generated/server"; + +const aggregate = new TableAggregate<{ + Key: number; + DataModel: DataModel; + TableName: "leaderboard"; +}>(components.aggregate, { + sortKey: (doc) => -doc.score, +}); + +// Set up triggers +const triggers = new Triggers(); +triggers.register("leaderboard", aggregate.trigger()); + +// Create a custom mutation that enables buffering and flushes on success +const mutation = customMutation(rawMutation, { + args: {}, + input: async (ctx) => { + aggregate.startBuffering(); + return { + ctx: triggers.wrapDB(ctx), + args: {}, + onSuccess: async ({ ctx }) => { + await aggregate.finishBuffering(ctx); + }, + }; + }, +}); + +// Now use this mutation in your functions +export const addScores = mutation({ + args: { scores: v.array(v.object({ name: v.string(), score: v.number() })) }, + handler: async (ctx, { scores }) => { + // Each insert triggers an aggregate operation, but they're all batched! + for (const { name, score } of scores) { + await ctx.db.insert("leaderboard", { name, score }); + } + // The flush happens automatically in the onSuccess callback + }, +}); +``` + +See [`example/convex/batchedWrites.ts`](example/convex/batchedWrites.ts) for a +complete working example with performance comparisons. + ### Repair incorrect aggregates If some mutation or direct write in the Dashboard updated the source of truth @@ -507,10 +575,10 @@ aggregates based on the diff of these two paginated data streams. ## Performance Optimizations -### Batch Operations +### Batch Read Operations For improved performance when making multiple similar queries, the Aggregate -component provides batch versions of common operations: +component provides batch versions of common read operations: - `countBatch()` - Count items for multiple bounds in a single call - `sumBatch()` - Sum items for multiple bounds in a single call @@ -547,6 +615,46 @@ The batch functions accept arrays of query parameters and return arrays of results in the same order, making them drop-in replacements for multiple individual calls while providing better performance characteristics. +### Batch Write Operations + +When making multiple write operations (inserts, deletes, or updates), you can +use the batching API to queue operations and send them in a single call to the +aggregate component. This is especially valuable when using triggers. + +**When to use batching:** + +- Making multiple aggregate writes in a single mutation +- Using triggers that automatically update aggregates on table changes +- Bulk operations like importing data or backfilling + +The batching API works by enabling buffering mode, queueing operations in +memory, and flushing them all at once: + +```ts +// Enable buffering +aggregate.startBuffering(); + +// Queue operations (not sent yet) +await aggregate.insert(ctx, { key: 1, id: "a" }); +await aggregate.insert(ctx, { key: 2, id: "b" }); +await aggregate.insert(ctx, { key: 3, id: "c" }); + +// Flush all operations in a single batch and stop buffering +await aggregate.finishBuffering(ctx); +``` + +**Benefits of batching:** + +- **Single component call** - One mutation instead of N separate calls +- **Single tree fetch** - The B-tree is fetched once for all operations +- **Better write contention** - All operations processed atomically together +- **Reduced overhead** - Less network and serialization overhead + +See the [Optimizing Triggers with Batching](#optimizing-triggers-with-batching) +section for the recommended pattern when using triggers, and see +[`example/convex/batchedWrites.ts`](example/convex/batchedWrites.ts) for +complete examples. + ## Reactivity and Atomicity Like all Convex queries, aggregates are diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 3753873..51b26e9 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -8,6 +8,7 @@ * @module */ +import type * as batchedWrites from "../batchedWrites.js"; import type * as btree from "../btree.js"; import type * as crons from "../crons.js"; import type * as leaderboard from "../leaderboard.js"; @@ -23,6 +24,7 @@ import type { } from "convex/server"; declare const fullApi: ApiFromModules<{ + batchedWrites: typeof batchedWrites; btree: typeof btree; crons: typeof crons; leaderboard: typeof leaderboard; @@ -65,5 +67,6 @@ export declare const components: { photos: import("@convex-dev/aggregate/_generated/component.js").ComponentApi<"photos">; stats: import("@convex-dev/aggregate/_generated/component.js").ComponentApi<"stats">; btreeAggregate: import("@convex-dev/aggregate/_generated/component.js").ComponentApi<"btreeAggregate">; + batchedWrites: import("@convex-dev/aggregate/_generated/component.js").ComponentApi<"batchedWrites">; migrations: import("@convex-dev/migrations/_generated/component.js").ComponentApi<"migrations">; }; diff --git a/example/convex/batchedWrites.ts b/example/convex/batchedWrites.ts new file mode 100644 index 0000000..7e7041f --- /dev/null +++ b/example/convex/batchedWrites.ts @@ -0,0 +1,444 @@ +/** + * Example of using the batch API for efficient writes. + * + * This demonstrates how to use `startBuffering` and `finishBuffering` for write + * operations and flush them in a batch, reducing the number of mutations + * and improving performance. + * + * This is especially useful when using triggers, as it batches all triggered + * aggregate writes into a single component call instead of one per write. + */ + +import { DirectAggregate, TableAggregate } from "@convex-dev/aggregate"; +import { internalMutation, mutation } from "./_generated/server"; +import { components } from "./_generated/api.js"; +import { v } from "convex/values"; +import { customMutation } from "convex-helpers/server/customFunctions"; +import { Triggers } from "convex-helpers/server/triggers"; +import type { DataModel } from "./_generated/dataModel"; + +const aggregate = new DirectAggregate<{ + Key: number; + Id: string; +}>(components.batchedWrites); + +// Example with TableAggregate and Triggers +const leaderboardAggregate = new TableAggregate<{ + Key: number; + DataModel: DataModel; + TableName: "leaderboard"; +}>(components.batchedWrites, { + sortKey: (doc) => -doc.score, // Negative for descending order + sumValue: (doc) => doc.score, +}); + +export const reset = internalMutation({ + args: {}, + handler: async (ctx) => { + await aggregate.clearAll(ctx); + }, +}); + +/** + * Basic example: Enable buffering, queue operations, then flush manually. + */ +export const basicBatchedWrites = internalMutation({ + args: { + count: v.number(), + }, + handler: async (ctx, { count }) => { + // Enable buffering mode - modifies the aggregate instance in place + aggregate.startBuffering(); + + const initialCount = await aggregate.count(ctx); + + // Queue multiple insert operations + for (let i = 0; i < count; i++) { + await aggregate.insert(ctx, { + key: i + initialCount, + id: `item-${i}`, + sumValue: i * 10, + }); + } + + // Disable buffering after we're done + await aggregate.finishBuffering(ctx); + + // Read operations work normally (and auto-flush if needed) + const total = await aggregate.count(ctx); + + if (total !== initialCount + count) { + console.log({ initialCount, count, total }); + throw new Error("Total count is incorrect"); + } + + return { inserted: count, total }; + }, +}); + +/** + * Advanced example: Use custom functions with Triggers and buffering. + * + * This is the RECOMMENDED pattern when using triggers! + * + * When using triggers, each table write triggers an aggregate write. + * If you insert 100 rows, that's 100 separate calls to the aggregate component. + * With buffering, all 100 writes are batched into a single component call. + * + * Performance benefits: + * - Single component call instead of N calls + * - Single tree fetch instead of N fetches + * - Better handling of write contention + */ + +// Set up triggers +const triggers = new Triggers(); +triggers.register("leaderboard", leaderboardAggregate.trigger()); + +// Create a custom mutation that: +// 1. Wraps the database with triggers +// 2. Enables buffering before the mutation runs +// 3. Flushes after the mutation completes successfully +const mutationWithTriggers = customMutation(mutation, { + args: {}, + input: async (ctx) => { + // Enable buffering for all aggregate operations + leaderboardAggregate.startBuffering(); + + return { + ctx: { + // Wrap db with triggers + ...triggers.wrapDB(ctx), + }, + args: {}, + onSuccess: async ({ ctx }) => { + // Flush all buffered operations in a single batch + await leaderboardAggregate.finishBuffering(ctx); + }, + }; + }, +}); + +/** + * Example: Add multiple scores with triggers and batching. + * + * Without buffering: Each insert triggers a separate aggregate.insert call + * With buffering: All inserts are batched into one aggregate.batch call + */ +export const addMultipleScores = mutationWithTriggers({ + args: { + scores: v.array( + v.object({ + name: v.string(), + score: v.number(), + }), + ), + }, + handler: async (ctx, { scores }) => { + const initialSumValue = await leaderboardAggregate.sum(ctx); + + // Just insert into the table - the trigger automatically + // updates the aggregate, and buffering batches all the updates + for (const { name, score } of scores) { + await ctx.db.insert("leaderboard", { name, score }); + } + + const totalSumValue = await leaderboardAggregate.sum(ctx); + + if ( + totalSumValue !== + initialSumValue + scores.reduce((acc, { score }) => acc + score, 0) + ) { + throw new Error("Total sum value is incorrect"); + } + + return { + inserted: scores.length, + message: `Added ${scores.length} scores with batched aggregate updates`, + }; + }, +}); + +/** + * Example: Update multiple scores - shows replace operations are also batched + */ +export const updateMultipleScores = mutationWithTriggers({ + args: { + updates: v.array( + v.object({ + id: v.id("leaderboard"), + newScore: v.number(), + }), + ), + }, + handler: async (ctx, { updates }) => { + // Each patch triggers aggregate.replace, all batched together + for (const { id, newScore } of updates) { + await ctx.db.patch(id, { score: newScore }); + } + + return { + updated: updates.length, + message: `Updated ${updates.length} scores with batched aggregate updates`, + }; + }, +}); + +/** + * Example showing the difference with and without batching + */ +export const compareTriggersWithAndWithoutBatching = mutation({ + args: { + count: v.number(), + useBatching: v.boolean(), + }, + handler: async (ctx, { count, useBatching }) => { + const customCtx = triggers.wrapDB(ctx); + const initialCount = await leaderboardAggregate.count(ctx); + console.time(); + if (useBatching) { + // With batching: all aggregate operations batched into one call + leaderboardAggregate.startBuffering(); + + console.time("insert loop"); + for (let i = 0; i < count; i++) { + await customCtx.db.insert("leaderboard", { + name: `player-${i + initialCount}`, + score: Math.floor(Math.random() * 1000), + }); + } + console.timeEnd("insert loop"); + + console.time("finishBuffering"); + await leaderboardAggregate.finishBuffering(ctx); + console.timeEnd("finishBuffering"); + } else { + // Without batching: each insert makes a separate aggregate call + + for (let i = 0; i < count; i++) { + await customCtx.db.insert("leaderboard", { + name: `player-${i + initialCount}`, + score: Math.floor(Math.random() * 1000), + }); + } + } + + console.timeEnd(); + + return { + method: useBatching ? "with batching" : "without batching", + count, + message: useBatching + ? `1 batched call to aggregate component` + : `${count} individual calls to aggregate component`, + }; + }, +}); + +/** + * Complex example: Mix different operation types in a batch. + */ +export const complexBatchedOperations = mutation({ + args: { + inserts: v.array( + v.object({ + key: v.number(), + id: v.string(), + value: v.number(), + }), + ), + deletes: v.array( + v.object({ + key: v.number(), + id: v.string(), + }), + ), + updates: v.array( + v.object({ + oldKey: v.number(), + newKey: v.number(), + id: v.string(), + value: v.number(), + }), + ), + }, + handler: async (ctx, { inserts, deletes, updates }) => { + // Enable buffering + aggregate.startBuffering(); + + // Queue inserts + for (const item of inserts) { + await aggregate.insert(ctx, { + key: item.key, + id: item.id, + sumValue: item.value, + }); + } + + // Queue deletes + for (const item of deletes) { + await aggregate.deleteIfExists(ctx, { + key: item.key, + id: item.id, + }); + } + + // Queue updates (replace operations) + for (const item of updates) { + await aggregate.replaceOrInsert( + ctx, + { key: item.oldKey, id: item.id }, + { key: item.newKey, sumValue: item.value }, + ); + } + + // Flush all operations at once and stop buffering + await aggregate.finishBuffering(ctx); + + return { + operations: { + inserts: inserts.length, + deletes: deletes.length, + updates: updates.length, + }, + }; + }, +}); + +/** + * Performance comparison: Batched vs unbatched writes. + */ +export const comparePerformance = mutation({ + args: { + count: v.number(), + useBatching: v.boolean(), + }, + handler: async (ctx, { count, useBatching }) => { + console.time("overall"); + + if (useBatching) { + // Batched approach + aggregate.startBuffering(); + + for (let i = 0; i < count; i++) { + await aggregate.insert(ctx, { + key: 1000000 + i, + id: `perf-test-${i}`, + sumValue: i, + }); + } + + console.time("finishBuffering"); + await aggregate.finishBuffering(ctx); + console.timeEnd("finishBuffering"); + } else { + // Unbatched approach + for (let i = 0; i < count; i++) { + await aggregate.insert(ctx, { + key: 1000000 + i, + id: `perf-test-${i}`, + sumValue: i, + }); + } + } + + console.timeEnd("overall"); + + return { + method: useBatching ? "batched" : "unbatched", + count, + }; + }, +}); + +/** + * Example showing automatic flush on read operations. + */ +export const autoFlushOnRead = mutation({ + args: { + count: v.number(), + }, + handler: async (ctx, { count }) => { + // Enable buffering + aggregate.startBuffering(); + + // Queue some operations + for (let i = 0; i < count; i++) { + await aggregate.insert(ctx, { + key: 2000000 + i, + id: `auto-flush-${i}`, + sumValue: i, + }); + } + + // This read operation automatically flushes the buffer first + // So we'll see the correct count including the queued operations + const total = await aggregate.count(ctx, { + bounds: { + lower: { key: 2000000, inclusive: true }, + }, + }); + + // Flush all operations at once and stop buffering + await aggregate.finishBuffering(ctx); + + return { + queued: count, + totalInRange: total, + }; + }, +}); + +/** + * Example: Batch operations with namespace grouping. + * + * When you have operations across multiple namespaces, + * the batch mutation automatically groups them and fetches + * each namespace's tree only once. + */ +export const batchedWritesWithNamespaces = mutation({ + args: { + operations: v.array( + v.object({ + namespace: v.string(), + key: v.number(), + id: v.string(), + value: v.number(), + }), + ), + }, + handler: async (ctx, { operations }) => { + // Create a namespaced aggregate + const namespacedAggregate = new DirectAggregate<{ + Key: number; + Id: string; + Namespace: string; + }>(components.batchedWrites); + + // Enable buffering + namespacedAggregate.startBuffering(); + + // Queue operations - they'll be grouped by namespace internally + for (const op of operations) { + await namespacedAggregate.insert(ctx, { + namespace: op.namespace, + key: op.key, + id: op.id, + sumValue: op.value, + }); + } + + // Flush all operations and stop buffering + // The batch mutation will group by namespace automatically + await namespacedAggregate.finishBuffering(ctx); + + // Count unique namespaces + const namespaces = new Set(operations.map((op) => op.namespace)); + + return { + operations: operations.length, + namespaces: namespaces.size, + message: `Processed ${operations.length} operations across ${namespaces.size} namespaces in a single batch`, + }; + }, +}); diff --git a/example/convex/convex.config.ts b/example/convex/convex.config.ts index e58e969..18fbf76 100644 --- a/example/convex/convex.config.ts +++ b/example/convex/convex.config.ts @@ -9,6 +9,7 @@ app.use(aggregate, { name: "music" }); app.use(aggregate, { name: "photos" }); app.use(aggregate, { name: "stats" }); app.use(aggregate, { name: "btreeAggregate" }); +app.use(aggregate, { name: "batchedWrites" }); app.use(migrations); diff --git a/package-lock.json b/package-lock.json index 8e5edf9..4179207 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@convex-dev/aggregate", - "version": "0.2.1", + "version": "0.2.2-alpha.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@convex-dev/aggregate", - "version": "0.2.1", + "version": "0.2.2-alpha.0", "license": "Apache-2.0", "devDependencies": { "@convex-dev/migrations": "0.3.1", diff --git a/package.json b/package.json index da870ab..9d3c798 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@convex-dev/aggregate", "description": "Convex component to calculate counts and sums of values for efficient aggregation.", - "version": "0.2.1", + "version": "0.2.2-alpha.0", "keywords": [ "convex", "component", diff --git a/src/client/buffer.test.ts b/src/client/buffer.test.ts new file mode 100644 index 0000000..2d89969 --- /dev/null +++ b/src/client/buffer.test.ts @@ -0,0 +1,49 @@ +import { convexTest } from "convex-test"; +import { expect, test } from "vitest"; +import { DirectAggregate } from "./index.js"; +import { components, modules } from "./setup.test.js"; +import { defineSchema } from "convex/server"; +import { register } from "../test.js"; + +const schema = defineSchema({}); + +function setupTest() { + const t = convexTest(schema, modules); + register(t); + return t; +} + +test("buffer flush in mutation context", async () => { + const t = setupTest(); + + const aggregate = new DirectAggregate<{ + Key: number; + Id: string; + }>(components.aggregate); + + // Test that reading with buffered operations in a mutation works (auto-flushes) + await t.run(async (ctx) => { + aggregate.startBuffering(); + await aggregate.insert(ctx, { key: 1, id: "a" }); + await aggregate.insert(ctx, { key: 2, id: "b" }); + + // This should work because we're in a mutation context and it auto-flushes + const count = await aggregate.count(ctx); + expect(count).toBe(2); + + await aggregate.finishBuffering(ctx); + }); + + // Test manual flush + await t.run(async (ctx) => { + aggregate.startBuffering(); + await aggregate.insert(ctx, { key: 3, id: "c" }); + + // Manual flush + await aggregate.flush(ctx); + + const count = await aggregate.count(ctx); + expect(count).toBe(3); + await aggregate.finishBuffering(ctx); + }); +}); diff --git a/src/client/index.ts b/src/client/index.ts index f173d13..e7d7500 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -36,6 +36,43 @@ export type Item = { export type { Key, Bound, Bounds }; +type BufferedOperation = + | { + type: "insert"; + key: any; + value: any; + summand?: number; + namespace?: any; + } + | { + type: "delete"; + key: any; + namespace?: any; + } + | { + type: "replace"; + currentKey: any; + newKey: any; + value: any; + summand?: number; + namespace?: any; + newNamespace?: any; + } + | { + type: "deleteIfExists"; + key: any; + namespace?: any; + } + | { + type: "replaceOrInsert"; + currentKey: any; + newKey: any; + value: any; + summand?: number; + namespace?: any; + newNamespace?: any; + }; + /** * Write data to be aggregated, and read aggregated data. * @@ -54,8 +91,82 @@ export class Aggregate< ID extends string, Namespace extends ConvexValue | undefined = undefined, > { + private isBuffering = false; + private currentFlushPromise: Promise | null = null; + private operationQueue: BufferedOperation[] = []; + constructor(protected component: ComponentApi) {} + /** + * Start buffering write operations. When buffering is enabled, write operations are + * queued and sent in a batch when flush() or stopBuffering() is called or when any read + * operation is performed. + * + * Example usage: + * ```ts + * aggregate.startBuffering(); + * aggregate.insert(ctx, { key: 1, id: "a" }); + * aggregate.insert(ctx, { key: 2, id: "b" }); + * await aggregate.finishBuffering(ctx); // Send all buffered operations + * ``` + */ + startBuffering(): void { + this.isBuffering = true; + } + + /** + * Stop buffering write operations and flush all buffered operations. + * @param ctx - The mutation context, used to flush the buffered operations. + */ + async finishBuffering(ctx: RunMutationCtx): Promise { + await this.flush(ctx, { stopBuffering: true }); + } + + /** + * Flush all buffered operations to the database. + * This sends all queued write operations in a single batch mutation. + * Called automatically before any read operation when buffering is enabled. + */ + async flush( + ctx: RunMutationCtx, + opts?: { stopBuffering?: boolean }, + ): Promise { + const { stopBuffering = false } = opts ?? {}; + while (this.currentFlushPromise) { + await this.currentFlushPromise; + } + // start critical section (no awaiting allowed) + if (stopBuffering) { + this.isBuffering = false; + } + if (this.operationQueue.length === 0) { + return; + } + const operations = this.operationQueue; + this.operationQueue = []; + this.currentFlushPromise = ctx + .runMutation(this.component.public.batch, { + operations, + }) + .then(() => (this.currentFlushPromise = null)); + // end critical section + await this.currentFlushPromise; + } + + private async flushBeforeRead(ctx: RunQueryCtx | RunMutationCtx) { + if (this.isBuffering && this.operationQueue.length > 0) { + if (!("runMutation" in ctx)) { + throw new Error( + "Buffered operations found in a query context: " + + this.operationQueue.map((op) => op.type).join(", "), + ); + } + await this.flush(ctx); + } else if (this.currentFlushPromise) { + await this.currentFlushPromise; + } + } + /// Aggregate queries. /** @@ -65,6 +176,7 @@ export class Aggregate< ctx: RunQueryCtx, ...opts: NamespacedOpts<{ bounds?: Bounds }, Namespace> ): Promise { + await this.flushBeforeRead(ctx); const { count } = await ctx.runQuery( this.component.btree.aggregateBetween, { @@ -82,6 +194,7 @@ export class Aggregate< ctx: RunQueryCtx, queries: NamespacedOptsBatch<{ bounds?: Bounds }, Namespace>, ): Promise { + await this.flushBeforeRead(ctx); const queryArgs = queries.map((query) => { if (!query) { throw new Error("You must pass bounds and/or namespace"); @@ -106,6 +219,7 @@ export class Aggregate< ctx: RunQueryCtx, ...opts: NamespacedOpts<{ bounds?: Bounds }, Namespace> ): Promise { + await this.flushBeforeRead(ctx); const { sum } = await ctx.runQuery(this.component.btree.aggregateBetween, { ...boundsToPositions(opts[0]?.bounds), namespace: namespaceFromOpts(opts), @@ -120,6 +234,7 @@ export class Aggregate< ctx: RunQueryCtx, queries: NamespacedOptsBatch<{ bounds?: Bounds }, Namespace>, ): Promise { + await this.flushBeforeRead(ctx); const queryArgs = queries.map((query) => { if (!query) { throw new Error("You must pass bounds and/or namespace"); @@ -150,6 +265,7 @@ export class Aggregate< offset: number, ...opts: NamespacedOpts<{ bounds?: Bounds }, Namespace> ): Promise> { + await this.flushBeforeRead(ctx); if (offset < 0) { const item = await ctx.runQuery(this.component.btree.atNegativeOffset, { offset: -offset - 1, @@ -175,6 +291,7 @@ export class Aggregate< Namespace >, ): Promise[]> { + await this.flushBeforeRead(ctx); const queryArgs = queries.map((q) => ({ offset: q.offset, ...boundsToPositions(q.bounds), @@ -202,6 +319,7 @@ export class Aggregate< Namespace > ): Promise { + await this.flushBeforeRead(ctx); const { k1, k2 } = boundsToPositions(opts[0]?.bounds); if (opts[0]?.order === "desc") { return await ctx.runQuery(this.component.btree.offsetUntil, { @@ -305,6 +423,7 @@ export class Aggregate< Namespace > ): Promise<{ page: Item[]; cursor: string; isDone: boolean }> { + await this.flushBeforeRead(ctx); const order = opts[0]?.order ?? "asc"; const pageSize = opts[0]?.pageSize ?? 100; const { @@ -373,12 +492,14 @@ export class Aggregate< id: ID, summand?: number, ): Promise { - await ctx.runMutation(this.component.public.insert, { - key: keyToPosition(key, id), - summand, - value: id, - namespace, - }); + const args = { key: keyToPosition(key, id), summand, value: id, namespace }; + if (this.isBuffering) { + this.operationQueue.push({ type: "insert", ...args }); + return; + } else if (this.currentFlushPromise) { + await this.currentFlushPromise; + } + await ctx.runMutation(this.component.public.insert, args); } async _delete( ctx: RunMutationCtx, @@ -386,10 +507,14 @@ export class Aggregate< key: K, id: ID, ): Promise { - await ctx.runMutation(this.component.public.delete_, { - key: keyToPosition(key, id), - namespace, - }); + const args = { key: keyToPosition(key, id), namespace }; + if (this.isBuffering) { + this.operationQueue.push({ type: "delete", ...args }); + return; + } else if (this.currentFlushPromise) { + await this.currentFlushPromise; + } + await ctx.runMutation(this.component.public.delete_, args); } async _replace( ctx: RunMutationCtx, @@ -400,14 +525,21 @@ export class Aggregate< id: ID, summand?: number, ): Promise { - await ctx.runMutation(this.component.public.replace, { + const args = { currentKey: keyToPosition(currentKey, id), newKey: keyToPosition(newKey, id), summand, value: id, namespace: currentNamespace, newNamespace, - }); + }; + if (this.isBuffering) { + this.operationQueue.push({ type: "replace", ...args }); + return; + } else if (this.currentFlushPromise) { + await this.currentFlushPromise; + } + await ctx.runMutation(this.component.public.replace, args); } async _insertIfDoesNotExist( ctx: RunMutationCtx, @@ -432,10 +564,20 @@ export class Aggregate< key: K, id: ID, ): Promise { - await ctx.runMutation(this.component.public.deleteIfExists, { + const args = { key: keyToPosition(key, id), namespace, - }); + }; + if (this.isBuffering) { + this.operationQueue.push({ + type: "deleteIfExists", + ...args, + }); + return; + } else if (this.currentFlushPromise) { + await this.currentFlushPromise; + } + await ctx.runMutation(this.component.public.deleteIfExists, args); } async _replaceOrInsert( ctx: RunMutationCtx, @@ -446,14 +588,21 @@ export class Aggregate< id: ID, summand?: number, ): Promise { - await ctx.runMutation(this.component.public.replaceOrInsert, { + const args = { currentKey: keyToPosition(currentKey, id), newKey: keyToPosition(newKey, id), - summand, value: id, + summand, namespace: currentNamespace, newNamespace, - }); + }; + if (this.isBuffering) { + this.operationQueue.push({ type: "replaceOrInsert", ...args }); + return; + } else if (this.currentFlushPromise) { + await this.currentFlushPromise; + } + await ctx.runMutation(this.component.public.replaceOrInsert, args); } /// Initialization and maintenance. @@ -477,6 +626,7 @@ export class Aggregate< Namespace > ): Promise { + await this.flushBeforeRead(ctx); await ctx.runMutation(this.component.public.clear, { maxNodeSize: opts[0]?.maxNodeSize, rootLazy: opts[0]?.rootLazy, @@ -503,6 +653,7 @@ export class Aggregate< cursor?: string, pageSize: number = 100, ): Promise<{ page: Namespace[]; cursor: string; isDone: boolean }> { + await this.flushBeforeRead(ctx); const { page, cursor: newCursor, @@ -542,6 +693,7 @@ export class Aggregate< ctx: RunMutationCtx & RunQueryCtx, opts?: { maxNodeSize?: number; rootLazy?: boolean }, ): Promise { + await this.flushBeforeRead(ctx); for await (const namespace of this.iterNamespaces(ctx)) { await this.clear(ctx, { ...opts, namespace }); } diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts index d3e82f8..cac2341 100644 --- a/src/component/_generated/component.ts +++ b/src/component/_generated/component.ts @@ -170,6 +170,43 @@ export type ComponentApi = >; }; public: { + batch: FunctionReference< + "mutation", + "internal", + { + operations: Array< + | { + key: any; + namespace?: any; + summand?: number; + type: "insert"; + value: any; + } + | { key: any; namespace?: any; type: "delete" } + | { + currentKey: any; + namespace?: any; + newKey: any; + newNamespace?: any; + summand?: number; + type: "replace"; + value: any; + } + | { key: any; namespace?: any; type: "deleteIfExists" } + | { + currentKey: any; + namespace?: any; + newKey: any; + newNamespace?: any; + summand?: number; + type: "replaceOrInsert"; + value: any; + } + >; + }, + null, + Name + >; clear: FunctionReference< "mutation", "internal", diff --git a/src/component/btree.ts b/src/component/btree.ts index 73300cb..d9982ec 100644 --- a/src/component/btree.ts +++ b/src/component/btree.ts @@ -45,13 +45,16 @@ function log(s: string) { export async function insertHandler( ctx: { db: DatabaseWriter }, args: { key: Key; value: Value; summand?: number; namespace?: Namespace }, + treeArg?: Doc<"btree">, ) { - const tree = await getOrCreateTree( - ctx.db, - args.namespace, - DEFAULT_MAX_NODE_SIZE, - true, - ); + const tree = + treeArg || + (await getOrCreateTree( + ctx.db, + args.namespace, + DEFAULT_MAX_NODE_SIZE, + true, + )); const summand = args.summand ?? 0; const pushUp = await insertIntoNode(ctx, args.namespace, tree.root, { k: args.key, @@ -80,13 +83,16 @@ export async function insertHandler( export async function deleteHandler( ctx: { db: DatabaseWriter }, args: { key: Key; namespace?: Namespace }, + treeArg?: Doc<"btree">, ) { - const tree = await getOrCreateTree( - ctx.db, - args.namespace, - DEFAULT_MAX_NODE_SIZE, - true, - ); + const tree = + treeArg || + (await getOrCreateTree( + ctx.db, + args.namespace, + DEFAULT_MAX_NODE_SIZE, + true, + )); await deleteFromNode(ctx, args.namespace, tree.root, args.key); const root = (await ctx.db.get(tree.root))!; if (root.items.length === 0 && root.subtrees.length === 1) { diff --git a/src/component/public.ts b/src/component/public.ts index 3b7ff90..a0d7a39 100644 --- a/src/component/public.ts +++ b/src/component/public.ts @@ -1,13 +1,16 @@ -import { ConvexError, v } from "convex/values"; -import { mutation } from "./_generated/server.js"; +import { ConvexError, convexToJson, v, type Value } from "convex/values"; +import { mutation, type DatabaseWriter } from "./_generated/server.js"; import { DEFAULT_MAX_NODE_SIZE, deleteHandler, getOrCreateTree, getTree, insertHandler, + type Key, + type Namespace, } from "./btree.js"; import { internal } from "./_generated/api.js"; +import type { Doc } from "./_generated/dataModel.js"; export const init = mutation({ args: { @@ -78,34 +81,62 @@ export const replace = mutation({ newNamespace: v.optional(v.any()), }, returns: v.null(), - handler: async (ctx, args) => { - await deleteHandler(ctx, { + handler: replaceHandler, +}); + +async function replaceHandler( + ctx: { db: DatabaseWriter }, + args: { + currentKey: Key; + newKey: Key; + value: Value; + summand?: number; + namespace?: Namespace; + newNamespace?: Namespace; + }, + treeArg?: Doc<"btree">, + newTreeArg?: Doc<"btree">, +) { + await deleteHandler( + ctx, + { key: args.currentKey, namespace: args.namespace, - }); - await insertHandler(ctx, { + }, + treeArg, + ); + await insertHandler( + ctx, + { key: args.newKey, value: args.value, summand: args.summand, namespace: args.newNamespace, - }); - }, -}); + }, + newTreeArg, + ); +} export const deleteIfExists = mutation({ args: { key: v.any(), namespace: v.optional(v.any()) }, - handler: async (ctx, { key, namespace }) => { - try { - await deleteHandler(ctx, { key, namespace }); - } catch (e) { - if (e instanceof ConvexError && e.data?.code === "DELETE_MISSING_KEY") { - return; - } - throw e; - } - }, + handler: deleteIfExistsHandler, }); +async function deleteIfExistsHandler( + ctx: { db: DatabaseWriter }, + args: { key: Key; namespace?: Namespace }, + treeArg?: Doc<"btree">, +) { + try { + await deleteHandler(ctx, args, treeArg); + } catch (e) { + if (e instanceof ConvexError && e.data?.code === "DELETE_MISSING_KEY") { + return; + } + throw e; + } +} + export const replaceOrInsert = mutation({ args: { currentKey: v.any(), @@ -115,27 +146,47 @@ export const replaceOrInsert = mutation({ namespace: v.optional(v.any()), newNamespace: v.optional(v.any()), }, - handler: async (ctx, args) => { - try { - await deleteHandler(ctx, { + handler: replaceOrInsertHandler, +}); + +async function replaceOrInsertHandler( + ctx: { db: DatabaseWriter }, + args: { + currentKey: Key; + newKey: Key; + value: Value; + summand?: number; + namespace?: Namespace; + newNamespace?: Namespace; + }, + treeArg?: Doc<"btree">, + newTreeArg?: Doc<"btree">, +) { + try { + await deleteHandler( + ctx, + { key: args.currentKey, namespace: args.namespace, - }); - } catch (e) { - if ( - !(e instanceof ConvexError && e.data?.code === "DELETE_MISSING_KEY") - ) { - throw e; - } + }, + treeArg, + ); + } catch (e) { + if (!(e instanceof ConvexError && e.data?.code === "DELETE_MISSING_KEY")) { + throw e; } - await insertHandler(ctx, { + } + await insertHandler( + ctx, + { key: args.newKey, value: args.value, summand: args.summand, namespace: args.newNamespace, - }); - }, -}); + }, + newTreeArg, + ); +} /** * Reinitialize the aggregate data structure, clearing all data. @@ -171,3 +222,102 @@ export const clear = mutation({ ); }, }); + +/** + * Batch mutation that processes multiple operations efficiently by fetching + * the tree once and passing it to all handlers. + */ +export const batch = mutation({ + args: { + operations: v.array( + v.union( + v.object({ + type: v.literal("insert"), + key: v.any(), + value: v.any(), + summand: v.optional(v.number()), + namespace: v.optional(v.any()), + }), + v.object({ + type: v.literal("delete"), + key: v.any(), + namespace: v.optional(v.any()), + }), + v.object({ + type: v.literal("replace"), + currentKey: v.any(), + newKey: v.any(), + value: v.any(), + summand: v.optional(v.number()), + namespace: v.optional(v.any()), + newNamespace: v.optional(v.any()), + }), + v.object({ + type: v.literal("deleteIfExists"), + key: v.any(), + namespace: v.optional(v.any()), + }), + v.object({ + type: v.literal("replaceOrInsert"), + currentKey: v.any(), + newKey: v.any(), + value: v.any(), + summand: v.optional(v.number()), + namespace: v.optional(v.any()), + newNamespace: v.optional(v.any()), + }), + ), + ), + }, + returns: v.null(), + handler: async (ctx, { operations }) => { + // Map to store trees for each namespace + const treesMap = new Map>(); + + // Helper function to get or create tree for a namespace + const getTreeForNamespace = async (namespace: any) => { + // Use a sentinel value for undefined namespace since JSON.stringify(undefined) returns undefined + const key = + namespace === undefined + ? "__undefined__" + : JSON.stringify(convexToJson(namespace)); + if (!treesMap.has(key)) { + treesMap.set( + key, + getOrCreateTree(ctx.db, namespace, DEFAULT_MAX_NODE_SIZE, true), + ); + } + return await treesMap.get(key)!; + }; + + // Process operations in order + for (const op of operations) { + if (op.type === "insert") { + const { type: _, ...args } = op; + const tree = await getTreeForNamespace(op.namespace); + await insertHandler(ctx, args, tree); + } else if (op.type === "delete") { + const { type: _, ...args } = op; + const tree = await getTreeForNamespace(op.namespace); + await deleteHandler(ctx, args, tree); + } else if (op.type === "replace") { + const { type: _, ...args } = op; + // Handle delete from original namespace + const deleteTree = await getTreeForNamespace(op.namespace); + // Handle insert to new namespace (which might be different) + const insertTree = await getTreeForNamespace(op.newNamespace); + await replaceHandler(ctx, args, deleteTree, insertTree); + } else if (op.type === "deleteIfExists") { + const { type: _, ...args } = op; + const tree = await getTreeForNamespace(op.namespace); + await deleteIfExistsHandler(ctx, args, tree); + } else if (op.type === "replaceOrInsert") { + const { type: _, ...args } = op; + // Handle delete from original namespace + const deleteTree = await getTreeForNamespace(op.namespace); + const newTree = await getTreeForNamespace(op.newNamespace); + await replaceOrInsertHandler(ctx, args, deleteTree, newTree); + } + } + }, +});