-
Notifications
You must be signed in to change notification settings - Fork 0
Async Processing
Core uses Amazon SQS (Simple Queue Service) for asynchronous task processing. This document explains how to use the SQS integration for background tasks.
The system includes:
- Main Queue - Processes general async tasks (member provisioning, emails)
- Sales Email Queue - Specifically for sales-related emails
- Dead Letter Queue (DLQ) - Captures failed messages
Tasks are processed by a dedicated Lambda function that's triggered by new messages.
Messages are batched by SQS and sent to our lambda, with a batching configuration defined in cloudformation/sqs.yml. Our SQS wrapper handles partial failures and error reporting.
Message types are defined in src/common/types/sqsMessage.ts
:
export enum AvailableSQSFunctions {
Ping = "ping",
EmailMembershipPass = "emailMembershipPass",
ProvisionNewMember = "provisionNewMember",
SendSaleEmail = "sendSaleEmail",
}
Each message type has a corresponding schema and handler.
To queue a task for asynchronous processing:
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
import {
SQSPayload,
AvailableSQSFunctions,
} from "../../common/types/sqsMessage.js";
// Create payload
const sqsPayload = {
function: AvailableSQSFunctions.EmailMembershipPass,
metadata: {
initiator: request.username || "anonymous",
reqId: request.id,
},
payload: {
email: "[email protected]",
},
};
// Initialize SQS client if needed
if (!fastify.sqsClient) {
fastify.sqsClient = new SQSClient({
region: genericConfig.AwsRegion,
});
}
// Send message
const result = await fastify.sqsClient.send(
new SendMessageCommand({
QueueUrl: fastify.environmentConfig.SqsQueueUrl,
MessageBody: JSON.stringify(sqsPayload),
})
);
SQS message handlers are defined in src/api/sqs/handlers.ts
and registered in src/api/sqs/index.ts
.
To add a new handler:
- Define the handler function:
export const myTaskHandler: SQSHandlerFunction
AvailableSQSFunctions.MyTask
> = async (payload, metadata, logger) => {
logger.info(`Processing task with ID ${payload.taskId}`);
// Your processing logic
logger.info(`Task ${payload.taskId} completed successfully`);
};
- Register the handler in
src/api/sqs/index.ts
:
const handlers: SQSFunctionPayloadTypes = {
// Existing handlers...
[AvailableSQSFunctions.MyTask]: myTaskHandler,
};
If a handler function throws an exception:
- The message will be retried (up to 3 times)
- After all retries fail, it goes to the Dead Letter Queue (DLQ)
- CloudWatch alarms monitor the DLQ for failed messages
If an error occurs, simply throw one of our defined error types in the handler function - the SQS wrapper will handle bubbling them up and dealing with partial successes.
The system uses SQS for:
- Email Sending - Membership and sales confirmation emails
- Membership Provisioning - Setting up new member accounts
- Stripe Webhook Processing - Handling Stripe payment events
For larger or more complex tasks, consider using SQS to process them asynchronously rather than blocking API responses.