diff --git a/.changeset/poor-onions-decide.md b/.changeset/poor-onions-decide.md new file mode 100644 index 0000000..43258e9 --- /dev/null +++ b/.changeset/poor-onions-decide.md @@ -0,0 +1,6 @@ +--- +"graphql-eventbus": minor +"graphql-eventbus-google-pubsub": minor +--- + +allows for publishing extra attributes in google pubsub diff --git a/packages/core/src/GraphQLEventbus.test.ts b/packages/core/src/GraphQLEventbus.test.ts index eb8fa3c..37a34d0 100644 --- a/packages/core/src/GraphQLEventbus.test.ts +++ b/packages/core/src/GraphQLEventbus.test.ts @@ -122,6 +122,43 @@ test("Publishing fails if bus is not initialized", async () => { ).rejects.toThrow(); }); +test("extra payload is passed to the publish cb", async () => { + const publishCb = jest.fn(); + const bus = new GraphQLEventbus({ + publisher: { + schema: pubSchema, + publish: async (d) => { + publishCb(d); + }, + allowInvalidTopic: true, + }, + }); + await bus.init(); + await bus.publish({ + topic: "TestEvent", + payload: { + id: "123", + name: "name", + }, + extra: { + a: 1, + }, + }); + expect(publishCb).toBeCalledTimes(1); + expect(publishCb.mock.calls[0][0]).toMatchObject({ + baggage: { + payload: { + id: "123", + name: "name", + }, + }, + topic: "TestEvent", + extra: { + a: 1, + }, + }); +}); + test("valid events are consumed and hooks are called", async () => { const consumeCb = jest.fn(); let cbRef!: DataCb; diff --git a/packages/core/src/GraphQLEventbus.ts b/packages/core/src/GraphQLEventbus.ts index f5a5604..8faf638 100644 --- a/packages/core/src/GraphQLEventbus.ts +++ b/packages/core/src/GraphQLEventbus.ts @@ -92,6 +92,7 @@ export class GraphQLEventbus { publish: (args: { topic: string; baggage: Baggage; + extra?: Record; }) => Promise; allowInvalidTopic?: boolean; }; @@ -246,6 +247,7 @@ export class GraphQLEventbus { topic: string; payload: {}; metadata?: Partial; + extra?: Record; }) => { if (!this.isInitialized) { throw new Error("The eventbus must be initialized before publishing."); @@ -294,6 +296,7 @@ export class GraphQLEventbus { metadata, payload: props.payload, }, + extra: props.extra, }); if (publishSuccessHooks.length) { await Promise.all( diff --git a/packages/google-pubsub/src/PubSubEventBus.ts b/packages/google-pubsub/src/PubSubEventBus.ts index 427e4da..25f13b4 100644 --- a/packages/google-pubsub/src/PubSubEventBus.ts +++ b/packages/google-pubsub/src/PubSubEventBus.ts @@ -57,6 +57,7 @@ export class PubSubEventBus { publish: async (a) => { await this.publishTopics[a.topic].publishMessage({ data: Buffer.from(JSON.stringify(a.baggage)), + ...a.extra, }); }, } @@ -153,11 +154,20 @@ export class PubSubEventBus { topic: string; payload: Record; metadata?: Partial; + attributes?: + | { + [k: string]: string; + } + | null + | undefined; }) => { await this.bus.publish({ payload: a.payload, topic: a.topic, metadata: a.metadata, + extra: { + atttibutes: a.attributes, + }, }); }; }