Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ export const config = {
| "twilio"
| "africas_talking"
| "log",
alertEmail: process.env.NOTIFICATION_ALERT_EMAIL || "",
twilioAccountSid: process.env.TWILIO_ACCOUNT_SID || "",
twilioAuthToken: process.env.TWILIO_AUTH_TOKEN || "",
twilioFromNumber: process.env.TWILIO_FROM_NUMBER || "",
Expand Down
2 changes: 2 additions & 0 deletions src/config/rabbitmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ export const QUEUES = {
XLM_TO_ACBU_DLQ: "xlm_to_acbu_dlq",
USDC_CONVERT_AND_MINT: "usdc_convert_and_mint", // USDC deposit: convert USDC→XLM (backend), then mint
USDC_CONVERT_AND_MINT_DLQ: "usdc_convert_and_mint_dlq",
AUDIT_LOGS: "audit_logs",
AUDIT_LOGS_DLQ: "audit_logs_dlq",
} as const;

// Exchange names
Expand Down
4 changes: 4 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ async function startServer() {
await import("./jobs/notificationConsumer");
await startNotificationConsumer();

// Start audit consumer (AUDIT_LOGS → database)
const { startAuditConsumer } = await import("./jobs/auditConsumer");
await startAuditConsumer();
Comment on lines +108 to +110
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Isolate audit-consumer startup failure from API startup.

This await sits inside the outer startServer() try/catch, so any getRabbitMQChannel()/queue-assertion failure from startAuditConsumer() now takes the whole service down at Lines 188-190. That contradicts the earlier “continue without queue-based features” behavior and turns an audit-queue misconfiguration into a full API outage.

Suggested fix
-      const { startAuditConsumer } = await import("./jobs/auditConsumer");
-      await startAuditConsumer();
+      try {
+        const { startAuditConsumer } = await import("./jobs/auditConsumer");
+        await startAuditConsumer();
+      } catch (error) {
+        logger.error(
+          "Audit consumer failed to start; continuing with fallback-only audit persistence.",
+          error,
+        );
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Start audit consumer (AUDIT_LOGS → database)
const { startAuditConsumer } = await import("./jobs/auditConsumer");
await startAuditConsumer();
// Start audit consumer (AUDIT_LOGS → database)
try {
const { startAuditConsumer } = await import("./jobs/auditConsumer");
await startAuditConsumer();
} catch (error) {
logger.error(
"Audit consumer failed to start; continuing with fallback-only audit persistence.",
error,
);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/index.ts` around lines 108 - 110, The call to startAuditConsumer inside
startServer can throw (e.g., from getRabbitMQChannel/queue assertions) and
currently lives in the outer try/catch, causing the whole service to exit on
audit-queue failure; change this so audit consumer startup is isolated: move the
dynamic import("./jobs/auditConsumer")/startAuditConsumer invocation out of the
main startServer critical path and wrap it in its own try/catch (or start it
without awaiting) so any exception from startAuditConsumer or getRabbitMQChannel
is caught, logged (including error details) and the server continues running
without queue-based features.


// Start outbound webhook consumer (WEBHOOKS → deliver with HMAC-SHA256)
const { startWebhookConsumer } = await import("./jobs/webhookConsumer");
await startWebhookConsumer();
Expand Down
105 changes: 105 additions & 0 deletions src/jobs/auditConsumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { startAuditConsumer } from "./auditConsumer";
import { prisma } from "../config/database";
import {
getRabbitMQChannel,
QUEUES,
assertQueueWithDLQ,
} from "../config/rabbitmq";

jest.mock("../config/database", () => ({
prisma: {
auditTrail: {
create: jest.fn(),
},
},
}));

jest.mock("../config/rabbitmq", () => ({
getRabbitMQChannel: jest.fn(),
assertQueueWithDLQ: jest.fn().mockResolvedValue({}),
QUEUES: {
AUDIT_LOGS: "audit_logs",
},
}));

jest.mock("../config/logger", () => ({
logger: {
warn: jest.fn(),
error: jest.fn(),
info: jest.fn(),
},
}));

describe("AuditConsumer", () => {
const mockChannel = {
consume: jest.fn(),
ack: jest.fn(),
nack: jest.fn(),
};

const entry = {
eventType: "USER_LOGIN",
action: "LOGIN",
performedBy: "user-1",
timestamp: new Date().toISOString(),
};

const mockMsg = {
content: Buffer.from(JSON.stringify(entry)),
};

beforeEach(() => {
jest.clearAllMocks();
(getRabbitMQChannel as jest.Mock).mockReturnValue(mockChannel);
});

it("should consume and save audit entry to database", async () => {
(prisma.auditTrail.create as jest.Mock).mockResolvedValue({ id: "1" });

// Trigger the consumer callback manually
mockChannel.consume.mockImplementation((queue, callback) => {
callback(mockMsg);
});

await startAuditConsumer();

expect(assertQueueWithDLQ).toHaveBeenCalledWith(QUEUES.AUDIT_LOGS, {
durable: true,
});
expect(prisma.auditTrail.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
eventType: entry.eventType,
action: entry.action,
}),
}),
);
expect(mockChannel.ack).toHaveBeenCalledWith(mockMsg);
});

it("should retry on database failure and eventually nack to DLQ", async () => {
((prisma.auditTrail.create as jest.Mock).mockRejectedValue(
new Error("DB connection failed"),
),
// Fast forward timers for retry
jest.useFakeTimers());

mockChannel.consume.mockImplementation((queue, callback) => {
callback(mockMsg);
});

await startAuditConsumer();

// We need to resolve the promises inside the loop
for (let i = 0; i < 4; i++) {
await Promise.resolve(); // allow the loop to run
jest.runAllTimers();
await Promise.resolve();
}

expect(prisma.auditTrail.create).toHaveBeenCalledTimes(4); // 0, 1, 2, 3
expect(mockChannel.nack).toHaveBeenCalledWith(mockMsg, false, false);

jest.useRealTimers();
});
});
67 changes: 67 additions & 0 deletions src/jobs/auditConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { prisma } from "../config/database";
import { logger } from "../config/logger";
import { QUEUES, assertQueueWithDLQ } from "../config/rabbitmq";
import { getRabbitMQChannel } from "../config/rabbitmq";

const MAX_RETRIES = 3;
const INITIAL_BACKOFF_MS = 1000;

export async function startAuditConsumer() {
const channel = getRabbitMQChannel();

await assertQueueWithDLQ(QUEUES.AUDIT_LOGS, { durable: true });

channel.consume(QUEUES.AUDIT_LOGS, async (msg) => {
if (!msg) return;

const content = msg.content.toString();
const entry = JSON.parse(content);
Comment on lines +17 to +18
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Nack malformed messages instead of bypassing the DLQ path.

JSON.parse() happens before the retry/DLQ try/catch, so a bad payload exits the handler before either ack() or nack() runs. That leaves poison messages outside the intended DLQ flow.

Suggested fix
-    const content = msg.content.toString();
-    const entry = JSON.parse(content);
+    let entry: any;
+    try {
+      entry = JSON.parse(msg.content.toString());
+    } catch (error: any) {
+      logger.error("Invalid audit payload, moving message to DLQ", {
+        error: error.message || error,
+      });
+      channel.nack(msg, false, false);
+      return;
+    }

Also applies to: 39-55

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/jobs/auditConsumer.ts` around lines 17 - 18, The handler currently calls
JSON.parse on msg.content outside the retry/DLQ try/catch so malformed payloads
escape ack/nack handling; move the parsing of msg.content (the lines that create
content and entry) inside the existing try/catch used for processing, and in the
catch ensure you call msg.nack() (or msg.ack() as appropriate for fatal vs
retryable) so malformed messages are nacked and follow the retry/DLQ path;
update the same pattern for the other block referenced (lines 39-55) so every
JSON.parse is wrapped and leads to msg.nack() on parse errors.


let attempt = 0;
let success = false;

while (attempt <= MAX_RETRIES && !success) {
try {
await prisma.auditTrail.create({
data: {
eventType: entry.eventType,
entityType: entry.entityType ?? null,
entityId: entry.entityId ?? null,
action: entry.action,
oldValue: entry.oldValue ?? (undefined as any),
newValue: entry.newValue ?? (undefined as any),
performedBy: entry.performedBy ?? null,
timestamp: entry.timestamp ? new Date(entry.timestamp) : undefined,
},
});
success = true;
channel.ack(msg);
} catch (error: any) {
attempt++;
if (attempt <= MAX_RETRIES) {
const backoff = INITIAL_BACKOFF_MS * Math.pow(2, attempt - 1);
logger.warn(
`Audit consumer retry ${attempt}/${MAX_RETRIES} in ${backoff}ms`,
{
error: error.message || error,
eventType: entry.eventType,
},
);
await new Promise((resolve) => setTimeout(resolve, backoff));
} else {
logger.error(
"Audit consumer failed after max retries, moving to DLQ",
{
error: error.message || error,
entry,
},
);
// Reject to DLQ
channel.nack(msg, false, false);
}
}
}
});

logger.info("Audit consumer started");
}
123 changes: 123 additions & 0 deletions src/services/audit/auditService.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { logAudit } from "./auditService";
import { logger } from "../../config/logger";
import { sendEmail } from "../notification";
import fs from "fs";
import { config } from "../../config/env";
import { getRabbitMQChannel, QUEUES } from "../../config/rabbitmq";

jest.mock("../../config/rabbitmq", () => ({
getRabbitMQChannel: jest.fn(),
QUEUES: {
AUDIT_LOGS: "audit_logs",
},
}));

jest.mock("../notification", () => ({
sendEmail: jest.fn().mockResolvedValue(undefined),
}));

jest.mock("../../config/logger", () => ({
logger: {
warn: jest.fn(),
error: jest.fn(),
info: jest.fn(),
debug: jest.fn(),
verbose: jest.fn(),
},
}));

describe("AuditService Reliability (RabbitMQ)", () => {
const entry = {
eventType: "USER_LOGIN",
action: "LOGIN",
performedBy: "user-1",
};

const mockChannel = {
sendToQueue: jest.fn(),
};

beforeEach(() => {
jest.clearAllMocks();
(getRabbitMQChannel as jest.Mock).mockReturnValue(mockChannel);
});

it("should publish to RabbitMQ successfully", async () => {
mockChannel.sendToQueue.mockReturnValue(true);

await logAudit(entry);

expect(mockChannel.sendToQueue).toHaveBeenCalledWith(
QUEUES.AUDIT_LOGS,
expect.any(Buffer),
{ persistent: true },
);
expect(logger.debug).toHaveBeenCalledWith(
expect.stringContaining("Audit entry published to queue"),
expect.anything(),
);
});

it("should fall back to file if publish fails (returns false)", async () => {
mockChannel.sendToQueue.mockReturnValue(false);

// Mock FS
const appendFileSyncSpy = jest
.spyOn(fs, "appendFileSync")
.mockImplementation(() => {});
const mkdirSyncSpy = jest
.spyOn(fs, "mkdirSync")
.mockImplementation(() => "");
const existsSyncSpy = jest.spyOn(fs, "existsSync").mockReturnValue(true);

await logAudit(entry);

expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining("RabbitMQ audit publish failed"),
expect.anything(),
);
expect(appendFileSyncSpy).toHaveBeenCalled();

appendFileSyncSpy.mockRestore();
mkdirSyncSpy.mockRestore();
existsSyncSpy.mockRestore();
});

it("should fall back to file if RabbitMQ throws error", async () => {
(getRabbitMQChannel as jest.Mock).mockImplementation(() => {
throw new Error("RabbitMQ Down");
});

// Mock FS
const appendFileSyncSpy = jest
.spyOn(fs, "appendFileSync")
.mockImplementation(() => {});
const mkdirSyncSpy = jest
.spyOn(fs, "mkdirSync")
.mockImplementation(() => "");
const existsSyncSpy = jest.spyOn(fs, "existsSync").mockReturnValue(true);

// Set alert email in config for this test
const originalAlertEmail = config.notification.alertEmail;
config.notification.alertEmail = "admin@example.com";

await logAudit(entry);

expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining("CRITICAL: Audit logging failed"),
expect.anything(),
);
expect(appendFileSyncSpy).toHaveBeenCalled();
expect(sendEmail).toHaveBeenCalledWith(
"admin@example.com",
expect.stringContaining("CRITICAL: Audit Log System Failure"),
expect.anything(),
);

// Restore
config.notification.alertEmail = originalAlertEmail;
appendFileSyncSpy.mockRestore();
mkdirSyncSpy.mockRestore();
existsSyncSpy.mockRestore();
});
});
Loading
Loading