diff --git a/src/InMemoryEventStore.test.ts b/src/InMemoryEventStore.test.ts index a040dbc..3a0cc0e 100644 --- a/src/InMemoryEventStore.test.ts +++ b/src/InMemoryEventStore.test.ts @@ -157,4 +157,76 @@ describe("InMemoryEventStore", () => { ); expect(emptyResult).toBe(""); }); + + it("keeps deterministic ordering even when events share the same timestamp", async () => { + const store = new InMemoryEventStore(); + const streamId = "deterministic-stream"; + + const messages: JSONRPCMessage[] = [ + { id: 1, jsonrpc: "2.0", method: "step/one" }, + { id: 2, jsonrpc: "2.0", method: "step/two" }, + { id: 3, jsonrpc: "2.0", method: "step/three" }, + { id: 4, jsonrpc: "2.0", method: "step/four" }, + ]; + + const fixedTimestamp = 1_730_000_000_000; + const nowSpy = vi.spyOn(Date, "now").mockReturnValue(fixedTimestamp); + + const eventIds: string[] = []; + try { + for (const message of messages) { + eventIds.push(await store.storeEvent(streamId, message)); + } + } finally { + nowSpy.mockRestore(); + } + + // Ensure IDs already arrive sorted since we stored sequentially + expect(eventIds).toEqual([...eventIds].sort()); + + const parts = eventIds.map((eventId) => eventId.split("_")); + + const timestampParts = parts.map(([, timestamp]) => timestamp); + expect(timestampParts).toEqual( + Array(messages.length).fill(fixedTimestamp.toString()) + ); + + const counterSuffixes = parts.map(([, , counter]) => counter); + expect(counterSuffixes).toEqual( + messages.map((_, index) => (index).toString(36).padStart(4, "0")) + ); + + // Random parts should be 3 base36 characters each (due to substring(2, 5)) + const randomParts = parts.map(([, , , random]) => random); + for (const random of randomParts) { + expect(random).toMatch(/^[0-9a-z]{3}$/); + } + + // Replay after the first event and ensure the remainder flow in order + const replayedMessages: JSONRPCMessage[] = []; + const returnedStreamId = await store.replayEventsAfter(eventIds[0], { + send: async (_eventId: string, message: JSONRPCMessage) => { + replayedMessages.push(message); + }, + }); + + expect(returnedStreamId).toBe(streamId); + expect(replayedMessages).toEqual(messages.slice(1)); + + // Now allow timestamp to advance to ensure counter resets + const nextTimestamp = fixedTimestamp + 1; + const secondSpy = vi.spyOn(Date, "now").mockReturnValue(nextTimestamp); + try { + const nextId = await store.storeEvent(streamId, { + id: 5, + jsonrpc: "2.0", + method: "step/five", + }); + const [, , counter, random] = nextId.split("_"); + expect(counter).toBe("0000"); + expect(random).toMatch(/^[0-9a-z]{3}$/); + } finally { + secondSpy.mockRestore(); + } + }); }); \ No newline at end of file diff --git a/src/InMemoryEventStore.ts b/src/InMemoryEventStore.ts index 295a42b..733f736 100644 --- a/src/InMemoryEventStore.ts +++ b/src/InMemoryEventStore.ts @@ -14,6 +14,8 @@ import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; export class InMemoryEventStore implements EventStore { private events: Map = new Map(); + private lastTimestamp = 0; + private lastTimestampCounter = 0; /** * Replays events that occurred after a specific event ID @@ -79,10 +81,25 @@ export class InMemoryEventStore implements EventStore { } /** - * Generates a unique event ID for a given stream ID + * Generates a monotonic unique event ID in + * `${streamId}_${timestamp}_${counter}_${random}` format. */ private generateEventId(streamId: string): string { - return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; + + const now = Date.now(); + + if (now === this.lastTimestamp) { + this.lastTimestampCounter++; + } else { + this.lastTimestampCounter = 0; + this.lastTimestamp = now; + } + + const timestamp = now.toString(); + const counter = this.lastTimestampCounter.toString(36).padStart(4, "0"); + const random = Math.random().toString(36).substring(2, 5); + + return `${streamId}_${timestamp}_${counter}_${random}`; } /** diff --git a/src/proxyServer.ts b/src/proxyServer.ts index 921d6d8..89a8ded 100644 --- a/src/proxyServer.ts +++ b/src/proxyServer.ts @@ -124,10 +124,12 @@ export const proxyServer = async ({ }); } - server.setRequestHandler(CompleteRequestSchema, async (args) => { - return client.complete( - args.params, - requestTimeout ? { timeout: requestTimeout } : undefined, - ); - }); + if (serverCapabilities?.completions) { + server.setRequestHandler(CompleteRequestSchema, async (args) => { + return client.complete( + args.params, + requestTimeout ? { timeout: requestTimeout } : undefined, + ); + }); + } };