Skip to content

Commit

Permalink
allows for publishing extra attributes in google pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
Suraj Keshri committed Nov 12, 2024
1 parent bba5831 commit f031b22
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .changeset/poor-onions-decide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"graphql-eventbus": minor
"graphql-eventbus-google-pubsub": minor
---

allows for publishing extra attributes in google pubsub
37 changes: 37 additions & 0 deletions packages/core/src/GraphQLEventbus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,43 @@ test("Publishing fails if bus is not initialized", async () => {
).rejects.toThrow();
});

test("extra payload is passed to the publish cb", async () => {
const publishCb = jest.fn();
const bus = new GraphQLEventbus({
publisher: {
schema: pubSchema,
publish: async (d) => {
publishCb(d);
},
allowInvalidTopic: true,
},
});
await bus.init();
await bus.publish({
topic: "TestEvent",
payload: {
id: "123",
name: "name",
},
extra: {
a: 1,
},
});
expect(publishCb).toBeCalledTimes(1);
expect(publishCb.mock.calls[0][0]).toMatchObject({
baggage: {
payload: {
id: "123",
name: "name",
},
},
topic: "TestEvent",
extra: {
a: 1,
},
});
});

test("valid events are consumed and hooks are called", async () => {
const consumeCb = jest.fn();
let cbRef!: DataCb;
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/GraphQLEventbus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export class GraphQLEventbus {
publish: (args: {
topic: string;
baggage: Baggage;
extra?: Record<string, unknown>;
}) => Promise<unknown>;
allowInvalidTopic?: boolean;
};
Expand Down Expand Up @@ -246,6 +247,7 @@ export class GraphQLEventbus {
topic: string;
payload: {};
metadata?: Partial<GraphQLEventbusMetadata>;
extra?: Record<string, unknown>;
}) => {
if (!this.isInitialized) {
throw new Error("The eventbus must be initialized before publishing.");
Expand Down Expand Up @@ -294,6 +296,7 @@ export class GraphQLEventbus {
metadata,
payload: props.payload,
},
extra: props.extra,
});
if (publishSuccessHooks.length) {
await Promise.all(
Expand Down
10 changes: 10 additions & 0 deletions packages/google-pubsub/src/PubSubEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class PubSubEventBus {
publish: async (a) => {
await this.publishTopics[a.topic].publishMessage({
data: Buffer.from(JSON.stringify(a.baggage)),
...a.extra,
});
},
}
Expand Down Expand Up @@ -153,11 +154,20 @@ export class PubSubEventBus {
topic: string;
payload: Record<string, unknown>;
metadata?: Partial<GraphQLEventbusMetadata>;
attributes?:
| {
[k: string]: string;
}
| null
| undefined;
}) => {
await this.bus.publish({
payload: a.payload,
topic: a.topic,
metadata: a.metadata,
extra: {
atttibutes: a.attributes,
},
});
};
}

0 comments on commit f031b22

Please sign in to comment.