Skip to content

Commit

Permalink
Merge pull request #51 from quant-daddy/aws-poll
Browse files Browse the repository at this point in the history
Aws poll
  • Loading branch information
quant-daddy authored Dec 11, 2024
2 parents 0f3214e + 68667ee commit d8422b3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/thick-apples-confess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"graphql-eventbus-aws-eventbus": patch
---

fix for polling
37 changes: 20 additions & 17 deletions packages/aws-eventbus/src/AWSEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class AWSEventBus {
public sqsClient: SQSClient;
public stsClient: STSClient;
private publishTopics: { [topicName: string]: string } = {};
private pollTimers: NodeJS.Timer[] = [];
private closeSignal = false;
private ongoingPublishes = new Set();
private existingTopicsArns: ListTopicsCommandOutput | undefined;
private bus: GraphQLEventbus;
Expand Down Expand Up @@ -199,33 +199,36 @@ export class AWSEventBus {
const receiveMessageCommand = new ReceiveMessageCommand({
QueueUrl: queueUrl, // The URL of the SQS queue
MaxNumberOfMessages: this.config.subscriber?.maxNumberOfMessages ?? 10, // Number of messages to retrieve (max is 10)
WaitTimeSeconds: this.config.subscriber?.pollingTimeSeconds ?? 0, // Long polling (wait for messages up to 20 seconds)
WaitTimeSeconds: this.config.subscriber?.pollingTimeSeconds ?? 20, // Long polling (wait for messages up to 20 seconds)
VisibilityTimeout: 30, // The time for which a message is hidden after being received
AttributeNames: ["All"], // Optionally retrieve additional message attributes
MessageAttributeNames: ["All"], // Optionally retrieve all message attributes
});

// Send the command to receive the message
const response = await this.sqsClient.send(receiveMessageCommand);

if (response.Messages && response.Messages.length > 0) {
const message = response.Messages[0];
for (const message of response.Messages || []) {
const messageBody = JSON.parse(message.Body || "");
// Process the message
// Your custom logic for processing the message goes here
await cb(JSON.parse(messageBody.Message));
// Delete the message from the queue to avoid reprocessing it
const deleteMessageCommand = new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle, // Required to delete the message
cb(JSON.parse(messageBody.Message)).then(() => {
const deleteMessageCommand = new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle, // Required to delete the message
});
// Delete the message from the queue to avoid reprocessing it
this.sqsClient.send(deleteMessageCommand);
});
await this.sqsClient.send(deleteMessageCommand);
}
} catch (error) {
console.error("Error receiving or deleting message:", error);
}
};
private pollQueue = (queueUrl: string, topicName: string, cb: DataCb) => {
private pollQueue = async (
queueUrl: string,
topicName: string,
cb: DataCb,
) => {
const foo = () =>
this.receiveMessageFromQueue(queueUrl, async (baggage) => {
await cb({
Expand All @@ -236,8 +239,10 @@ export class AWSEventBus {
},
});
});
foo();
this.pollTimers.push(setInterval(foo, 20000));
while (!this.closeSignal) {
await foo();
}
// this.pollTimers.push(setInterval(foo, 1000));
};
private createTopic = async (topicName: string): Promise<string> => {
if (!this.existingTopicsArns) {
Expand Down Expand Up @@ -338,9 +343,7 @@ export class AWSEventBus {
};
};
closeConsumer = async () => {
this.pollTimers.forEach((timer) => {
clearInterval(timer);
});
this.closeSignal = true;
};
closePublisher = async () => {
await Promise.all(this.ongoingPublishes);
Expand Down

0 comments on commit d8422b3

Please sign in to comment.