From 600aa846b970001cfe06c3d947f79ca0e5f89b8d Mon Sep 17 00:00:00 2001 From: marian2js Date: Sun, 29 Oct 2023 13:43:51 +0100 Subject: [PATCH] feat: improve campaign retries --- .../api/src/chat/entities/campaign-message.ts | 2 +- .../src/chat/services/broadcast.consumer.ts | 58 ++++++++++++++----- 2 files changed, 45 insertions(+), 15 deletions(-) diff --git a/apps/api/src/chat/entities/campaign-message.ts b/apps/api/src/chat/entities/campaign-message.ts index 1ea6154f..ed610b4f 100644 --- a/apps/api/src/chat/entities/campaign-message.ts +++ b/apps/api/src/chat/entities/campaign-message.ts @@ -11,6 +11,6 @@ export class CampaignMessage extends BaseEntity { @prop({ required: true }) readonly address: string - @prop({ required: true }) + @prop() messageId: string } diff --git a/apps/api/src/chat/services/broadcast.consumer.ts b/apps/api/src/chat/services/broadcast.consumer.ts index 08623374..2b60393e 100644 --- a/apps/api/src/chat/services/broadcast.consumer.ts +++ b/apps/api/src/chat/services/broadcast.consumer.ts @@ -1,4 +1,5 @@ import { JobNonRetriableError } from '@app/common/errors/job-non-retriable-error' +import { wait } from '@app/common/utils/async.utils' import { XmtpLib } from '@app/definitions/integration-definitions/xmtp/xmtp.lib' import { getWalletName } from '@app/definitions/utils/address.utils' import { sendXmtpMessage } from '@chainjet/tools/dist/messages' @@ -62,15 +63,6 @@ export class BroadcastConsumer { }) const client = await XmtpLib.getClient(accountCredential.credentials.keys) - // if this is a retry, filter out contacts that have already been sent a message - if (job.attemptsMade > 0) { - const campaignMessages = await this.campaignMessageService.find({ - campaign: campaign._id, - }) - const campaignMessageAddresses = campaignMessages.map((campaignMessage) => campaignMessage.address) - contacts = contacts.filter((contact) => !campaignMessageAddresses.includes(contact.address)) - } - this.logger.log(`Sending campaign ${campaign._id} to ${contacts.length} contacts`) if (campaign.state === CampaignState.Pending) { @@ -87,9 +79,17 @@ export class BroadcastConsumer { ) } - campaign.delivered = 0 + campaign.delivered = campaign.delivered ?? 0 campaign.total = contacts.length const uniqueAddresses = new Set() + let failed = 0 + + // filter out contacts that have already been sent a message for this campaign + const campaignMessages = await this.campaignMessageService.find({ + campaign: campaign._id, + }) + const campaignMessageAddresses = campaignMessages.map((campaignMessage) => campaignMessage.address) + contacts = contacts.filter((contact) => !campaignMessageAddresses.includes(contact.address)) const walletName = (await getWalletName(user.address)) ?? user.address const unsubscribeMessage = `To unsubscribe from these messages: https://unsubscribe.chainjet.io/${walletName}` @@ -120,10 +120,21 @@ export class BroadcastConsumer { this.logger.log( `Sent broadcast message from ${user.address} to ${sendTo} (${campaign.processed}/${campaign.total})`, ) - } catch {} - campaign.processed++ - job.progress(campaign.processed / campaign.total) - + campaign.processed++ + job.progress(campaign.processed / campaign.total) + } catch (e) { + if (e.message.includes('is not on the XMTP network')) { + await this.campaignMessageService.createOne({ + campaign: campaign._id, + address: contact.address, + }) + campaign.processed++ + job.progress(campaign.processed / campaign.total) + } else { + this.logger.error(`Failed to send broadcast message from ${user.address} to ${sendTo}: ${e.message}`) + failed++ + } + } // update the campaign status every 100 contacts if (campaign.processed > 0 && campaign.processed % 100 === 0) { await this.campaignService.updateOneNative( @@ -141,6 +152,25 @@ export class BroadcastConsumer { } } + // if any messages failed to send with unexpected reasons, retry the job + if (failed > 0) { + await this.campaignService.updateOneNative( + { + _id: campaign._id, + }, + { + $set: { + delivered: campaign.delivered, + processed: campaign.processed, + total: campaign.total, + }, + }, + ) + this.logger.error(`Failed to send ${failed}/${campaign.total} messages for campaign ${campaign._id}. Retrying...`) + await wait(10000) + return await this.send(job) + } + await this.campaignService.updateOneNative( { _id: campaign._id,