diff --git a/renderers/web_core/package-lock.json b/renderers/web_core/package-lock.json index 8bf2ca709..0d8881461 100644 --- a/renderers/web_core/package-lock.json +++ b/renderers/web_core/package-lock.json @@ -1,12 +1,12 @@ { "name": "@a2ui/web_core", - "version": "0.8.2", + "version": "0.8.5", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@a2ui/web_core", - "version": "0.8.2", + "version": "0.8.5", "license": "Apache-2.0", "dependencies": { "@preact/signals-core": "^1.13.0", diff --git a/renderers/web_core/src/v0_9/processing/message-processor.test.ts b/renderers/web_core/src/v0_9/processing/message-processor.test.ts index a4c4b0ad1..671fa9350 100644 --- a/renderers/web_core/src/v0_9/processing/message-processor.test.ts +++ b/renderers/web_core/src/v0_9/processing/message-processor.test.ts @@ -356,4 +356,146 @@ describe("MessageProcessor", () => { assert.strictEqual(processor.resolvePath("foo", "/bar/"), "/bar/foo"); assert.strictEqual(processor.resolvePath("foo"), "/foo"); }); + + describe("Batch Processing", () => { + it("batches multiple updateDataModel messages into single notification cycle", () => { + processor.processMessages([ + { + version: "v0.9", + createSurface: { surfaceId: "s1", catalogId: "test-catalog" }, + }, + ]); + + const surface = processor.model.getSurface("s1")!; + const notifications: string[] = []; + + surface.dataModel.subscribe("/user/name", () => notifications.push("name")); + surface.dataModel.subscribe("/user/age", () => notifications.push("age")); + surface.dataModel.subscribe("/user", () => notifications.push("user")); + + // Process multiple updates in one batch + processor.processMessages([ + { + version: "v0.9", + updateDataModel: { surfaceId: "s1", path: "/user/name", value: "Alice" }, + }, + { + version: "v0.9", + updateDataModel: { surfaceId: "s1", path: "/user/age", value: 30 }, + }, + { + version: "v0.9", + updateDataModel: { surfaceId: "s1", path: "/user/email", value: "alice@example.com" }, + }, + ]); + + // All paths should be notified + assert.ok(notifications.includes("name"), "name path notified"); + assert.ok(notifications.includes("age"), "age path notified"); + assert.ok(notifications.includes("user"), "user path notified"); + + // Each path should only be notified once (deduplication) + assert.strictEqual( + notifications.filter((n) => n === "name").length, + 1, + "name notified exactly once" + ); + }); + + it("does not batch single message", () => { + processor.processMessages([ + { + version: "v0.9", + createSurface: { surfaceId: "s1", catalogId: "test-catalog" }, + }, + ]); + + const surface = processor.model.getSurface("s1")!; + let notificationCount = 0; + + surface.dataModel.subscribe("/value", () => notificationCount++); + + // Single message - should not use batch mode + processor.processMessages([ + { + version: "v0.9", + updateDataModel: { surfaceId: "s1", path: "/value", value: "test" }, + }, + ]); + + assert.strictEqual(notificationCount, 1, "Single message notified immediately"); + }); + + it("handles multiple surfaces in one batch", () => { + processor.processMessages([ + { + version: "v0.9", + createSurface: { surfaceId: "s1", catalogId: "test-catalog" }, + }, + { + version: "v0.9", + createSurface: { surfaceId: "s2", catalogId: "test-catalog" }, + }, + ]); + + const surface1 = processor.model.getSurface("s1")!; + const surface2 = processor.model.getSurface("s2")!; + + let count1 = 0; + let count2 = 0; + + surface1.dataModel.subscribe("/value", () => count1++); + surface2.dataModel.subscribe("/value", () => count2++); + + // Update both surfaces in one batch + processor.processMessages([ + { + version: "v0.9", + updateDataModel: { surfaceId: "s1", path: "/value", value: "a" }, + }, + { + version: "v0.9", + updateDataModel: { surfaceId: "s2", path: "/value", value: "b" }, + }, + ]); + + assert.strictEqual(count1, 1, "surface-1 notified once"); + assert.strictEqual(count2, 1, "surface-2 notified once"); + }); + + it("clears pending notifications on error", () => { + processor.processMessages([ + { + version: "v0.9", + createSurface: { surfaceId: "s1", catalogId: "test-catalog" }, + }, + ]); + + const surface = processor.model.getSurface("s1")!; + let notificationCount = 0; + + surface.dataModel.subscribe("/value", () => notificationCount++); + + // Try to process messages that will cause an error + try { + processor.processMessages([ + { + version: "v0.9", + updateDataModel: { surfaceId: "s1", path: "/value", value: "updated" }, + }, + { + version: "v0.9", + updateDataModel: { surfaceId: "non-existent", path: "/value", value: "error" }, + }, + ]); + } catch (e) { + // Expected error + } + + // Data was written before the error + assert.strictEqual(surface.dataModel.get("/value"), "updated"); + // Pending notifications should have been cleared, not fired + assert.strictEqual(notificationCount, 0, "No notifications fired due to error"); + }); + }); }); diff --git a/renderers/web_core/src/v0_9/processing/message-processor.ts b/renderers/web_core/src/v0_9/processing/message-processor.ts index df8c95c52..57c34a651 100644 --- a/renderers/web_core/src/v0_9/processing/message-processor.ts +++ b/renderers/web_core/src/v0_9/processing/message-processor.ts @@ -68,13 +68,67 @@ export class MessageProcessor { /** * Processes a list of messages. + * When multiple messages are provided, data model updates are batched + * to reduce redundant notifications and re-renders. * * @param messages The messages to process. */ processMessages(messages: A2uiMessage[]): void { - for (const message of messages) { - this.processMessage(message); + if (messages.length <= 1) { + // Single message: process directly without batch overhead + for (const message of messages) { + this.processMessage(message); + } + return; } + + // Collect all surfaces that will be affected by these messages + const surfaces = new Set>(); + for (const msg of messages) { + const surfaceId = this.extractSurfaceId(msg); + if (surfaceId) { + const surface = this.model.getSurface(surfaceId); + if (surface) { + surfaces.add(surface); + } + } + } + + // Enter batch mode for all affected surfaces + for (const surface of surfaces) { + surface.dataModel.beginBatch(); + } + + try { + // Process all messages (writes are batched, notifications are deferred) + for (const message of messages) { + this.processMessage(message); + } + } catch (e) { + // On error, clear pending notifications to avoid inconsistent state + for (const surface of surfaces) { + surface.dataModel.clearPending(); + } + throw e; + } finally { + // Always exit batch mode and trigger notifications + for (const surface of surfaces) { + surface.dataModel.endBatch(); + } + } + } + + /** + * Extracts the surface ID from a message. + * @param msg The message to extract from. + * @returns The surface ID or undefined if not applicable. + */ + private extractSurfaceId(msg: A2uiMessage): string | undefined { + if ("createSurface" in msg) return msg.createSurface.surfaceId; + if ("updateComponents" in msg) return msg.updateComponents.surfaceId; + if ("updateDataModel" in msg) return msg.updateDataModel.surfaceId; + if ("deleteSurface" in msg) return msg.deleteSurface.surfaceId; + return undefined; } private processMessage(message: A2uiMessage): void { diff --git a/renderers/web_core/src/v0_9/state/data-model.test.ts b/renderers/web_core/src/v0_9/state/data-model.test.ts index 300e5e713..6968555f3 100644 --- a/renderers/web_core/src/v0_9/state/data-model.test.ts +++ b/renderers/web_core/src/v0_9/state/data-model.test.ts @@ -335,4 +335,103 @@ describe("DataModel", () => { assert.strictEqual(isDescendant("/user", "/"), true); assert.strictEqual(isDescendant("/", "/"), false); }); + + // --- Batch Processing --- + + describe("Batch Processing", () => { + it("batches notifications when in batch mode", () => { + const batchModel = new DataModel({}); + const notifications: string[] = []; + + batchModel.subscribe("/user/name", () => notifications.push("name")); + batchModel.subscribe("/user/age", () => notifications.push("age")); + batchModel.subscribe("/user", () => notifications.push("user")); + batchModel.subscribe("/", () => notifications.push("root")); + + batchModel.beginBatch(); + batchModel.set("/user/name", "Alice"); + batchModel.set("/user/age", 30); + batchModel.set("/user/email", "alice@example.com"); + + assert.strictEqual(notifications.length, 0, "No notifications during batch"); + + batchModel.endBatch(); + + assert.ok(notifications.includes("name"), "name path notified"); + assert.ok(notifications.includes("age"), "age path notified"); + assert.ok(notifications.includes("user"), "user path notified"); + assert.ok(notifications.includes("root"), "root path notified"); + + const nameCount = notifications.filter((n) => n === "name").length; + assert.strictEqual(nameCount, 1, "name notified exactly once"); + }); + + it("notifies immediately when not in batch mode", () => { + const batchModel = new DataModel({}); + const notifications: string[] = []; + + batchModel.subscribe("/user/name", () => notifications.push("name")); + batchModel.set("/user/name", "Alice"); + + assert.strictEqual(notifications.length, 1, "Immediate notification"); + }); + + it("clears pending notifications on clearPending", () => { + const batchModel = new DataModel({}); + const notifications: string[] = []; + + batchModel.subscribe("/user/name", () => notifications.push("name")); + batchModel.beginBatch(); + batchModel.set("/user/name", "Alice"); + batchModel.clearPending(); + batchModel.endBatch(); + + assert.strictEqual(notifications.length, 0, "Notifications cleared"); + }); + + it("handles nested batch calls", () => { + const batchModel = new DataModel({}); + const notifications: string[] = []; + + batchModel.subscribe("/user/name", () => notifications.push("name")); + + batchModel.beginBatch(); + batchModel.set("/user/name", "Alice"); + batchModel.beginBatch(); // nested + batchModel.set("/user/name", "Bob"); + batchModel.endBatch(); // outer batch still active + assert.strictEqual(notifications.length, 0, "Still in batch after first endBatch"); + batchModel.endBatch(); // now notifications should fire + assert.strictEqual(notifications.length, 1, "Notified after second endBatch"); + }); + + it("deduplicates same path updates in batch", () => { + const batchModel = new DataModel({}); + let callCount = 0; + + batchModel.subscribe("/counter", () => callCount++); + batchModel.beginBatch(); + batchModel.set("/counter", 1); + batchModel.set("/counter", 2); + batchModel.set("/counter", 3); + batchModel.endBatch(); + + assert.strictEqual(callCount, 1, "Same path deduplicated"); + }); + + it("receives correct final values after batch", () => { + const batchModel = new DataModel({}); + const values: (number | undefined)[] = []; + + batchModel.subscribe("/counter", (val) => values.push(val)); + batchModel.beginBatch(); + batchModel.set("/counter", 1); + batchModel.set("/counter", 2); + batchModel.set("/counter", 3); + batchModel.endBatch(); + + assert.strictEqual(values.length, 1); + assert.strictEqual(values[0], 3); + }); + }); }); diff --git a/renderers/web_core/src/v0_9/state/data-model.ts b/renderers/web_core/src/v0_9/state/data-model.ts index 7c9866c2c..6c08531ba 100644 --- a/renderers/web_core/src/v0_9/state/data-model.ts +++ b/renderers/web_core/src/v0_9/state/data-model.ts @@ -41,6 +41,10 @@ export class DataModel { private readonly signals: Map> = new Map(); private readonly subscriptions: Set<() => void> = new Set(); // To track direct subscriptions for dispose + // Batch processing state + private _batchDepth = 0; + private _pendingPaths = new Set(); + /** * Creates a new data model. * @@ -132,10 +136,83 @@ export class DataModel { current[lastSegment] = value; } - this.notifySignals(path); + if (this._batchDepth > 0) { + this._pendingPaths.add(path); + } else { + this.notifySignals(path); + } return this; } + /** + * Enters batch mode. Notifications will be deferred until endBatch() is called. + * Supports nested batch calls - only the outermost endBatch() triggers notifications. + */ + beginBatch(): void { + this._batchDepth++; + } + + /** + * Exits batch mode and triggers all pending notifications. + * Only triggers when the outermost batch ends (nested batch support). + */ + endBatch(): void { + this._batchDepth--; + if (this._batchDepth > 0) { + return; // Still in a nested batch + } + + // Step 1: Collect all pending paths and their ancestors into a Set. + // This gives us O(1) lookups for descendant checking in Step 2. + const modifiedPaths = new Set(); + for (const path of this._pendingPaths) { + const normalizedPath = this.normalizePath(path); + modifiedPaths.add(normalizedPath); + + // Walk up the ancestor chain + let parentPath = normalizedPath; + while (parentPath !== "/" && parentPath !== "") { + parentPath = + parentPath.substring(0, parentPath.lastIndexOf("/")) || "/"; + modifiedPaths.add(parentPath); + } + } + + // Step 2: Find descendant subscriptions in a single pass over subscriptions. + // For each subscription, walk up its hierarchy to check if any ancestor + // is a pending path. This is O(S * D) instead of O(P * S). + const pathsToNotify = new Set(modifiedPaths); + for (const subPath of this.subscriptions.keys()) { + if (pathsToNotify.has(subPath)) { + continue; // Already included + } + // Walk up from subscription path to check if any ancestor was modified + let ancestor = subPath; + while (ancestor !== "/" && ancestor !== "") { + ancestor = ancestor.substring(0, ancestor.lastIndexOf("/")) || "/"; + if (this._pendingPaths.has(ancestor)) { + pathsToNotify.add(subPath); + break; + } + } + } + + this._pendingPaths.clear(); + + // Notify all collected paths + for (const path of pathsToNotify) { + this.notify(path); + } + } + + /** + * Clears all pending notifications without triggering them. + * Useful when an error occurs during batch processing. + */ + clearPending(): void { + this._pendingPaths.clear(); + } + /** * Retrieves data at a specific JSON pointer path. *