feat(inbound-mail): write request log row early to avoid loss on worker crash fixes NV-7936#11391
Conversation
Move the canonical inbound-email `requests` row from the worker into
`apps/inbound-mail` so the row is created immediately after attachment
processing, before the BullMQ enqueue. Worker crashes, exhausted retries,
and silent shared-agent drops can no longer skip the analytics record.
- Add `InboundMailRequestLogger` and `InboundMailTenantResolver` in
application-generic with the three lifecycle phases:
1. inbound-mail: `logReceived` writes the row (status 202) plus
`request_received` trace.
2. inbound-mail: `logQueued` / `logQueueFailed` trace after BullMQ.
3. worker: `logCompleted` writes the terminal
`request_delivered` / `request_failed` trace linked via
`requestLogId` on the queue payload.
- Tenant resolution: reply-to addresses get environmentId from the
encoded address, domain-route addresses look up the domain via
`DomainRepository.findByName`.
- Worker refactor: stop writing the row; emit only the terminal trace.
Silent shared-agent drops, malformed addresses (`BadRequestException`),
and unsupported route types now surface as warning traces. A BullMQ
`failed` event handler emits the terminal trace once retries are
exhausted for unhandled exceptions.
- Schema: add `request_delivered` event type and title mapping.
Co-authored-by: George Djabarov <djabarovgeorge@users.noreply.github.com>
…letion - Rewrite worker LogInboundEmailRequest spec for trace-only behavior (request_delivered / request_failed via requestLogId) - Add unit tests for InboundMailRequestLogger / InboundMailTenantResolver and the metadata helpers in application-generic - Add SMTP integration test in inbound-mail that asserts logReceived runs before BullMQ, requestLogId is threaded onto the queue payload, and request_queued / request_failed traces fire on the right paths - Expose the inboundMailService singleton from inbound-mail/index.ts as __testInboundMailService for stub injection in tests Co-authored-by: George Djabarov <djabarovgeorge@users.noreply.github.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
LaunchDarkly flag references🔍 1 flag added or modified
|
| if (error instanceof BadRequestException) { | ||
| await this.logInboundEmailRequest.logUnresolvedFailure({ | ||
| requestLogId: command.requestLogId ?? '', | ||
| message: extractMessage(error), | ||
| severity: 'warning', | ||
| }); | ||
| } | ||
|
|
||
| // For all other throws (DB error, unhandled exception, etc.) we let | ||
| // BullMQ retry. The terminal trace for retries that exhaust comes from | ||
| // the `failed` handler on the worker process. | ||
| throw error; | ||
| } |
There was a problem hiding this comment.
BadRequestException falls through to throw error — BullMQ will retry the job and write duplicate traces
The BadRequestException block writes a terminal request_failed trace but does not return afterwards, so execution falls through to the unconditional throw error on line 100. BullMQ will then retry the job; on every subsequent attempt the usecase catches the same BadRequestException, writes another trace, and re-throws again. On the final exhausted attempt the safety net in InboundParseWorker fires a third time. The design doc comments this path as "non-retriable (malformed address / unknown domain)", but the missing return makes it retriable in practice. Compare with the InboundParseDroppedError handler directly above, which does return after logging.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/worker/src/app/workflow/usecases/inbound-email-parse/inbound-email-parse.usecase.ts
Line: 89-101
Comment:
**`BadRequestException` falls through to `throw error` — BullMQ will retry the job and write duplicate traces**
The `BadRequestException` block writes a terminal `request_failed` trace but does **not** `return` afterwards, so execution falls through to the unconditional `throw error` on line 100. BullMQ will then retry the job; on every subsequent attempt the usecase catches the same `BadRequestException`, writes another trace, and re-throws again. On the final exhausted attempt the safety net in `InboundParseWorker` fires a third time. The design doc comments this path as "non-retriable (malformed address / unknown domain)", but the missing `return` makes it retriable in practice. Compare with the `InboundParseDroppedError` handler directly above, which does `return` after logging.
How can I resolve this? If you propose a fix, please make it concise.| * Safety net: BullMQ's `failed` event fires after every failed attempt. We | ||
| * only emit a terminal `request_failed` trace once BullMQ has exhausted all | ||
| * configured retries — intermediate 5xx retries do not get duplicate traces | ||
| * on the request, only the final outcome does. Handles unhandled exceptions | ||
| * that bypass `InboundEmailParse.execute()`'s own catch block (e.g. | ||
| * unexpected throws during DB access, OOM, etc.). | ||
| */ | ||
| private registerFailedSafetyNet(): void { | ||
| const worker = this.bullMqWorker; | ||
|
|
||
| if (!worker) { | ||
| return; | ||
| } | ||
|
|
||
| worker.on('failed', (job, error) => { | ||
| if (!job) { | ||
| return; | ||
| } | ||
|
|
||
| const attemptsMade = job.attemptsMade ?? 0; | ||
| const maxAttempts = job.opts?.attempts ?? 1; | ||
|
|
||
| // Wait until the final attempt before recording the terminal trace, so | ||
| // retries don't generate noise. Strategies still write terminal traces | ||
| // for resolved failures via `InboundEmailParse` — this handler only | ||
| // catches unhandled exceptions and exhausted retries. | ||
| if (attemptsMade < maxAttempts) { | ||
| return; | ||
| } | ||
|
|
||
| const data = job.data as IInboundParseDataDto | undefined; | ||
| if (!data?.requestLogId) { | ||
| return; | ||
| } | ||
|
|
||
| this.inboundMailRequestLogger | ||
| .logCompleted({ | ||
| requestLogId: data.requestLogId, | ||
| organizationId: '', | ||
| environmentId: '', | ||
| transactionId: data.messageId ?? '', | ||
| delivered: false, | ||
| severity: 'error', | ||
| message: error instanceof Error ? error.message : 'Inbound mail processing failed after exhausted retries', | ||
| }) | ||
| .catch((traceError) => { | ||
| Logger.warn( | ||
| { err: traceError, jobId: job.id, requestLogId: data.requestLogId }, | ||
| 'Failed to write inbound-email exhausted-retries trace', | ||
| LOG_CONTEXT | ||
| ); | ||
| }); | ||
| }); | ||
| } |
There was a problem hiding this comment.
Safety net fires for
InboundParseProcessingError on the final attempt — duplicate terminal trace with empty org/env
The failed event handler fires for every failure that reaches BullMQ's final attempt, including InboundParseProcessingError. Since InboundEmailParse.execute() catches InboundParseProcessingError, writes a trace (with the correct organizationId/environmentId from the outcome), and then re-throws, BullMQ still marks the job as failed. On the final attempt the usecase already wrote a request_failed trace with outcome context; then this safety net fires and writes a second request_failed trace with organizationId: '' and environmentId: ''. The result is two conflicting terminal traces in the dashboard for every InboundParseProcessingError that exhausts its retry budget. The safety net was intended for unhandled exceptions that never reach the usecase catch block, but it lacks a signal to distinguish that case.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/worker/src/app/workflow/workers/inbound-parse.worker.service.ts
Line: 46-99
Comment:
**Safety net fires for `InboundParseProcessingError` on the final attempt — duplicate terminal trace with empty org/env**
The `failed` event handler fires for every failure that reaches BullMQ's final attempt, including `InboundParseProcessingError`. Since `InboundEmailParse.execute()` catches `InboundParseProcessingError`, writes a trace (with the correct `organizationId`/`environmentId` from the outcome), and then re-throws, BullMQ still marks the job as failed. On the final attempt the usecase already wrote a `request_failed` trace with outcome context; then this safety net fires and writes a second `request_failed` trace with `organizationId: ''` and `environmentId: ''`. The result is two conflicting terminal traces in the dashboard for every `InboundParseProcessingError` that exhausts its retry budget. The safety net was intended for unhandled exceptions that never reach the usecase catch block, but it lacks a signal to distinguish that case.
How can I resolve this? If you propose a fix, please make it concise.| * without standing up real ClickHouse / MongoDB. Production code should not | ||
| * read from this export. | ||
| */ | ||
| export const __testInboundMailService = inboundMailService; | ||
|
|
||
| class Mailin extends events.EventEmitter { | ||
| public configuration: IConfiguration; |
There was a problem hiding this comment.
Test-only backdoor exported from production module
__testInboundMailService is exported from the module that is the main entry point of the inbound-mail service so tests can inject mock requestLogger/tenantResolver. Exporting internal state from a production file couples the test fixture to the runtime module graph; if this file is imported by anything other than tests in CI the export pollutes the public surface. Consider injecting the service instance via a dedicated test-helper module or using dependency injection rather than exporting module-level state.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/inbound-mail/src/server/index.ts
Line: 33-39
Comment:
**Test-only backdoor exported from production module**
`__testInboundMailService` is exported from the module that is the main entry point of the `inbound-mail` service so tests can inject mock `requestLogger`/`tenantResolver`. Exporting internal state from a production file couples the test fixture to the runtime module graph; if this file is imported by anything other than tests in CI the export pollutes the public surface. Consider injecting the service instance via a dedicated test-helper module or using dependency injection rather than exporting module-level state.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
4bfe9fd
into
cursor/inbound-mail-request-logs-a083
Summary
Move the inbound-email
requestsrow creation from the worker intoapps/inbound-mailso the row is written immediately after attachment processing, before the BullMQ enqueue. Worker crashes, exhausted retries, and silent shared-agent drops can no longer skip the analytics record.Lifecycle
sequenceDiagram participant MTA as Sending_MTA participant IM as inbound-mail participant CH as ClickHouse participant Redis as BullMQ participant W as worker MTA->>IM: SMTP DATA IM->>IM: parse + attachments IM->>CH: requests row (202) + request_received IM->>Redis: enqueue with requestLogId alt queue success IM->>CH: request_queued IM-->>MTA: 250 OK else queue failure IM->>CH: request_failed IM-->>MTA: 451 retry end Redis->>W: dequeue alt delivery success W->>CH: request_delivered else terminal failure W->>CH: request_failed endWhat changed
libs/application-generic/src/services/inbound-mail-logging/— newInboundMailRequestLogger,InboundMailTenantResolver, and metadata helpers shared by both services.apps/inbound-mail— wiresClickHouseService+DalService+DomainRepositorybehind the existingIS_ANALYTICS_LOGS_ENABLED/IS_INBOUND_ANALYTICS_LOGS_ENABLEDflags; callslogReceivedbetween attachment processing andpostQueue, thenlogQueued/logQueueFailedaround the BullMQadd.IInboundParseDataDtonow carriesrequestLogId?: stringso the worker can append its terminal trace.apps/worker—LogInboundEmailRequestbecomes trace-only (request_delivered/request_failed).InboundEmailParsenow writes a terminal trace for every exit path: 200 (success), 422 (warning), 502 (error),BadRequestException,InboundParseDroppedError. A newfailedevent handler onInboundParseWorkercovers unhandled exceptions on the final BullMQ attempt (5/5).request_deliveredevent type + title; status issuccess/warning(4xx, non-retriable) /error(5xx, retriable).InboundParseDroppedErrorinstead of returningundefined, so every drop produces a trace with the reason.status_codeon the early row is202(accepted for processing). Final outcome (200/422/502) lives in the terminal trace in the request detail view. The existing dashboard list keeps its filter semantics; surfacing terminal status in the list view is a separate follow-up.All analytics writes are best-effort and never block SMTP acceptance.
Environment variable update
inbound-mail
IS_ANALYTICS_LOGS_ENABLED=true
IS_INBOUND_ANALYTICS_LOGS_ENABLED=true
MONGO_URL
CLICK_HOUSE_URL
CLICK_HOUSE_DATABASE
CLICK_HOUSE_USER
CLICK_HOUSE_PASSWORD
LAUNCH_DARKLY_SDK_KEY
worker
IS_INBOUND_ANALYTICS_LOGS_ENABLED=true
dashboard
VITE_IS_INBOUND_LOGS_ENABLED=true
Testing
pnpm --filter @novu/application-generic buildpnpm --filter @novu/worker buildpnpm --filter @novu/inbound-mail buildpnpm --filter @novu/worker test --grep "LogInboundEmailRequest|Should handle the new arrived mail"— 14 passing (8 new + 6 existing)PORT=2525 pnpm --filter @novu/inbound-mail test— 26 passing (3 new SMTP integration tests + 23 existing; pre-existingCannot obliterate queue with active jobsfailure is unrelated to this change)pnpm --filter @novu/application-generic jest— pre-existing jest-environment-node version mismatch in this package prevents the new specs from running locally; they follow the package's existing jest style and will run once the upstream config is fixed.Manual end-to-end testing (SMTP → worker → ClickHouse) was not performed: it requires the full SMTP stack plus live ClickHouse and the analytics feature flags toggled on, which is more invasive than the SMTP integration test already covers.
Greptile Summary
This PR moves the inbound-mail
requestsrow creation from the BullMQ worker intoapps/inbound-mailso that the analytics record is written immediately after attachment processing, before the job is enqueued. Worker crashes, exhausted retries, and silent shared-agent drops can no longer cause a missing analytics row.InboundMailRequestLogger,InboundMailTenantResolver,InboundRequestMetadatainlibs/application-generic) centralises all write logic;apps/inbound-mailcallslogReceived→logQueued/logQueueFailed, and the worker callslogCompletedvia the trimmedLogInboundEmailRequestusecase.return undefinedpaths inDomainRouteStrategy's shared-agent handler are replaced byInboundParseDroppedErrorthrows so every drop produces a traceablerequest_failedevent.failedevent handler onInboundParseWorkerwrites a terminal trace when BullMQ exhausts retries for unhandled exceptions; two issues affect correctness:BadRequestExceptionis re-thrown after trace write (causing retry loops and duplicate traces), andInboundParseProcessingErroralso produces duplicate traces on the final attempt because both the usecase catch block and the safety net fire independently.Confidence Score: 3/5
The SMTP acceptance path is safe and best-effort analytics writes never block mail delivery, but the worker-side trace deduplication logic has gaps that will produce incorrect analytics data for certain error paths.
The
BadRequestExceptionhandler writes a terminal trace but does notreturn, so BullMQ retries the job and duplicate traces accumulate on every retry. Separately,InboundParseProcessingErroron the final retry gets two conflicting terminal traces — one from the usecase catch block (with correct org/env) and one from the safety net (with empty strings).apps/worker/src/app/workflow/usecases/inbound-email-parse/inbound-email-parse.usecase.ts and apps/worker/src/app/workflow/workers/inbound-parse.worker.service.ts need attention for the trace-deduplication issues.
Important Files Changed
returnafter BadRequestException trace means non-retriable errors are re-thrown and re-queued, producing duplicate traces per retry.failedsafety-net handler; the handler fires unconditionally on the final attempt, causing duplicate terminal traces for InboundParseProcessingError that already wrote a trace inside the usecase before re-throwing.return undefineddrops in the shared-agent path to InboundParseDroppedError throws; enables tracing of previously invisible drops.Sequence Diagram
sequenceDiagram participant MTA as Sending MTA participant IM as inbound-mail participant CH as ClickHouse participant Redis as BullMQ participant W as Worker MTA->>IM: SMTP DATA IM->>IM: parse + attachments IM->>CH: requests row (202) + request_received IM->>Redis: enqueue with requestLogId alt queue success IM->>CH: request_queued (fire-and-forget) IM-->>MTA: 250 OK else queue failure IM->>CH: request_failed IM-->>MTA: 451 retry end Redis->>W: dequeue alt outcome resolved (200) W->>CH: request_delivered via logCompleted else InboundParseProcessingError (re-thrown) W->>CH: request_failed via logCompleted W->>CH: duplicate request_failed via safety net (final attempt) else BadRequestException (missing return) W->>CH: request_failed per retry attempt W->>Redis: BullMQ retries non-retriable job else InboundParseDroppedError W->>CH: request_failed warning (returns, no retry) else unhandled exception W->>Redis: BullMQ retries W->>CH: request_failed via safety net (final attempt only) endPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "test(inbound-mail): cover early request ..." | Re-trigger Greptile