diff --git a/src/config/env.ts b/src/config/env.ts index 90de182..0376c80 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -243,6 +243,7 @@ export const config = { | "twilio" | "africas_talking" | "log", + alertEmail: process.env.NOTIFICATION_ALERT_EMAIL || "", twilioAccountSid: process.env.TWILIO_ACCOUNT_SID || "", twilioAuthToken: process.env.TWILIO_AUTH_TOKEN || "", twilioFromNumber: process.env.TWILIO_FROM_NUMBER || "", diff --git a/src/config/rabbitmq.ts b/src/config/rabbitmq.ts index 93666d2..9586039 100644 --- a/src/config/rabbitmq.ts +++ b/src/config/rabbitmq.ts @@ -112,6 +112,8 @@ export const QUEUES = { XLM_TO_ACBU_DLQ: "xlm_to_acbu_dlq", USDC_CONVERT_AND_MINT: "usdc_convert_and_mint", // USDC deposit: convert USDC→XLM (backend), then mint USDC_CONVERT_AND_MINT_DLQ: "usdc_convert_and_mint_dlq", + AUDIT_LOGS: "audit_logs", + AUDIT_LOGS_DLQ: "audit_logs_dlq", } as const; // Exchange names diff --git a/src/index.ts b/src/index.ts index b05bce0..053af8a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -105,6 +105,10 @@ async function startServer() { await import("./jobs/notificationConsumer"); await startNotificationConsumer(); + // Start audit consumer (AUDIT_LOGS → database) + const { startAuditConsumer } = await import("./jobs/auditConsumer"); + await startAuditConsumer(); + // Start outbound webhook consumer (WEBHOOKS → deliver with HMAC-SHA256) const { startWebhookConsumer } = await import("./jobs/webhookConsumer"); await startWebhookConsumer(); diff --git a/src/jobs/auditConsumer.test.ts b/src/jobs/auditConsumer.test.ts new file mode 100644 index 0000000..a04e9c5 --- /dev/null +++ b/src/jobs/auditConsumer.test.ts @@ -0,0 +1,105 @@ +import { startAuditConsumer } from "./auditConsumer"; +import { prisma } from "../config/database"; +import { + getRabbitMQChannel, + QUEUES, + assertQueueWithDLQ, +} from "../config/rabbitmq"; + +jest.mock("../config/database", () => ({ + prisma: { + auditTrail: { + create: jest.fn(), + }, + }, +})); + +jest.mock("../config/rabbitmq", () => ({ + getRabbitMQChannel: jest.fn(), + assertQueueWithDLQ: jest.fn().mockResolvedValue({}), + QUEUES: { + AUDIT_LOGS: "audit_logs", + }, +})); + +jest.mock("../config/logger", () => ({ + logger: { + warn: jest.fn(), + error: jest.fn(), + info: jest.fn(), + }, +})); + +describe("AuditConsumer", () => { + const mockChannel = { + consume: jest.fn(), + ack: jest.fn(), + nack: jest.fn(), + }; + + const entry = { + eventType: "USER_LOGIN", + action: "LOGIN", + performedBy: "user-1", + timestamp: new Date().toISOString(), + }; + + const mockMsg = { + content: Buffer.from(JSON.stringify(entry)), + }; + + beforeEach(() => { + jest.clearAllMocks(); + (getRabbitMQChannel as jest.Mock).mockReturnValue(mockChannel); + }); + + it("should consume and save audit entry to database", async () => { + (prisma.auditTrail.create as jest.Mock).mockResolvedValue({ id: "1" }); + + // Trigger the consumer callback manually + mockChannel.consume.mockImplementation((queue, callback) => { + callback(mockMsg); + }); + + await startAuditConsumer(); + + expect(assertQueueWithDLQ).toHaveBeenCalledWith(QUEUES.AUDIT_LOGS, { + durable: true, + }); + expect(prisma.auditTrail.create).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + eventType: entry.eventType, + action: entry.action, + }), + }), + ); + expect(mockChannel.ack).toHaveBeenCalledWith(mockMsg); + }); + + it("should retry on database failure and eventually nack to DLQ", async () => { + ((prisma.auditTrail.create as jest.Mock).mockRejectedValue( + new Error("DB connection failed"), + ), + // Fast forward timers for retry + jest.useFakeTimers()); + + mockChannel.consume.mockImplementation((queue, callback) => { + callback(mockMsg); + }); + + await startAuditConsumer(); + + // We need to resolve the promises inside the loop + for (let i = 0; i < 4; i++) { + await Promise.resolve(); // allow the loop to run + jest.runAllTimers(); + await Promise.resolve(); + } + + expect(prisma.auditTrail.create).toHaveBeenCalledTimes(4); // 0, 1, 2, 3 + expect(mockChannel.nack).toHaveBeenCalledWith(mockMsg, false, false); + + jest.useRealTimers(); + }); +}); diff --git a/src/jobs/auditConsumer.ts b/src/jobs/auditConsumer.ts new file mode 100644 index 0000000..0ddc7b3 --- /dev/null +++ b/src/jobs/auditConsumer.ts @@ -0,0 +1,67 @@ +import { prisma } from "../config/database"; +import { logger } from "../config/logger"; +import { QUEUES, assertQueueWithDLQ } from "../config/rabbitmq"; +import { getRabbitMQChannel } from "../config/rabbitmq"; + +const MAX_RETRIES = 3; +const INITIAL_BACKOFF_MS = 1000; + +export async function startAuditConsumer() { + const channel = getRabbitMQChannel(); + + await assertQueueWithDLQ(QUEUES.AUDIT_LOGS, { durable: true }); + + channel.consume(QUEUES.AUDIT_LOGS, async (msg) => { + if (!msg) return; + + const content = msg.content.toString(); + const entry = JSON.parse(content); + + let attempt = 0; + let success = false; + + while (attempt <= MAX_RETRIES && !success) { + try { + await prisma.auditTrail.create({ + data: { + eventType: entry.eventType, + entityType: entry.entityType ?? null, + entityId: entry.entityId ?? null, + action: entry.action, + oldValue: entry.oldValue ?? (undefined as any), + newValue: entry.newValue ?? (undefined as any), + performedBy: entry.performedBy ?? null, + timestamp: entry.timestamp ? new Date(entry.timestamp) : undefined, + }, + }); + success = true; + channel.ack(msg); + } catch (error: any) { + attempt++; + if (attempt <= MAX_RETRIES) { + const backoff = INITIAL_BACKOFF_MS * Math.pow(2, attempt - 1); + logger.warn( + `Audit consumer retry ${attempt}/${MAX_RETRIES} in ${backoff}ms`, + { + error: error.message || error, + eventType: entry.eventType, + }, + ); + await new Promise((resolve) => setTimeout(resolve, backoff)); + } else { + logger.error( + "Audit consumer failed after max retries, moving to DLQ", + { + error: error.message || error, + entry, + }, + ); + // Reject to DLQ + channel.nack(msg, false, false); + } + } + } + }); + + logger.info("Audit consumer started"); +} diff --git a/src/services/audit/auditService.test.ts b/src/services/audit/auditService.test.ts new file mode 100644 index 0000000..d9e6d90 --- /dev/null +++ b/src/services/audit/auditService.test.ts @@ -0,0 +1,123 @@ +import { logAudit } from "./auditService"; +import { logger } from "../../config/logger"; +import { sendEmail } from "../notification"; +import fs from "fs"; +import { config } from "../../config/env"; +import { getRabbitMQChannel, QUEUES } from "../../config/rabbitmq"; + +jest.mock("../../config/rabbitmq", () => ({ + getRabbitMQChannel: jest.fn(), + QUEUES: { + AUDIT_LOGS: "audit_logs", + }, +})); + +jest.mock("../notification", () => ({ + sendEmail: jest.fn().mockResolvedValue(undefined), +})); + +jest.mock("../../config/logger", () => ({ + logger: { + warn: jest.fn(), + error: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + verbose: jest.fn(), + }, +})); + +describe("AuditService Reliability (RabbitMQ)", () => { + const entry = { + eventType: "USER_LOGIN", + action: "LOGIN", + performedBy: "user-1", + }; + + const mockChannel = { + sendToQueue: jest.fn(), + }; + + beforeEach(() => { + jest.clearAllMocks(); + (getRabbitMQChannel as jest.Mock).mockReturnValue(mockChannel); + }); + + it("should publish to RabbitMQ successfully", async () => { + mockChannel.sendToQueue.mockReturnValue(true); + + await logAudit(entry); + + expect(mockChannel.sendToQueue).toHaveBeenCalledWith( + QUEUES.AUDIT_LOGS, + expect.any(Buffer), + { persistent: true }, + ); + expect(logger.debug).toHaveBeenCalledWith( + expect.stringContaining("Audit entry published to queue"), + expect.anything(), + ); + }); + + it("should fall back to file if publish fails (returns false)", async () => { + mockChannel.sendToQueue.mockReturnValue(false); + + // Mock FS + const appendFileSyncSpy = jest + .spyOn(fs, "appendFileSync") + .mockImplementation(() => {}); + const mkdirSyncSpy = jest + .spyOn(fs, "mkdirSync") + .mockImplementation(() => ""); + const existsSyncSpy = jest.spyOn(fs, "existsSync").mockReturnValue(true); + + await logAudit(entry); + + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining("RabbitMQ audit publish failed"), + expect.anything(), + ); + expect(appendFileSyncSpy).toHaveBeenCalled(); + + appendFileSyncSpy.mockRestore(); + mkdirSyncSpy.mockRestore(); + existsSyncSpy.mockRestore(); + }); + + it("should fall back to file if RabbitMQ throws error", async () => { + (getRabbitMQChannel as jest.Mock).mockImplementation(() => { + throw new Error("RabbitMQ Down"); + }); + + // Mock FS + const appendFileSyncSpy = jest + .spyOn(fs, "appendFileSync") + .mockImplementation(() => {}); + const mkdirSyncSpy = jest + .spyOn(fs, "mkdirSync") + .mockImplementation(() => ""); + const existsSyncSpy = jest.spyOn(fs, "existsSync").mockReturnValue(true); + + // Set alert email in config for this test + const originalAlertEmail = config.notification.alertEmail; + config.notification.alertEmail = "admin@example.com"; + + await logAudit(entry); + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("CRITICAL: Audit logging failed"), + expect.anything(), + ); + expect(appendFileSyncSpy).toHaveBeenCalled(); + expect(sendEmail).toHaveBeenCalledWith( + "admin@example.com", + expect.stringContaining("CRITICAL: Audit Log System Failure"), + expect.anything(), + ); + + // Restore + config.notification.alertEmail = originalAlertEmail; + appendFileSyncSpy.mockRestore(); + mkdirSyncSpy.mockRestore(); + existsSyncSpy.mockRestore(); + }); +}); diff --git a/src/services/audit/auditService.ts b/src/services/audit/auditService.ts index e5b4663..59a7453 100644 --- a/src/services/audit/auditService.ts +++ b/src/services/audit/auditService.ts @@ -1,8 +1,9 @@ -/** - * Audit logging for sensitive actions. Writes to AuditTrail for compliance and debugging. - */ -import { prisma } from "../../config/database"; import { logger } from "../../config/logger"; +import { config } from "../../config/env"; +import fs from "fs"; +import path from "path"; +import { sendEmail } from "../notification"; +import { getRabbitMQChannel, QUEUES } from "../../config/rabbitmq"; export interface AuditEntry { eventType: string; @@ -14,20 +15,89 @@ export interface AuditEntry { performedBy?: string; } +/** + * logAudit: Publishes audit entry to RabbitMQ for asynchronous processing. + * Falls back to local file and alerting if RabbitMQ is unavailable. + */ export async function logAudit(entry: AuditEntry): Promise { try { - await prisma.auditTrail.create({ - data: { + const channel = getRabbitMQChannel(); + const payload = { + ...entry, + timestamp: new Date().toISOString(), + }; + + const success = channel.sendToQueue( + QUEUES.AUDIT_LOGS, + Buffer.from(JSON.stringify(payload)), + { persistent: true }, + ); + + if (success) { + logger.debug("Audit entry published to queue", { eventType: entry.eventType, - entityType: entry.entityType ?? undefined, - entityId: entry.entityId ?? undefined, action: entry.action, - oldValue: entry.oldValue ?? undefined, - newValue: entry.newValue ?? undefined, - performedBy: entry.performedBy ?? undefined, - }, + }); + return; + } + + // If queue is full or other internal issue, treat as failure + throw new Error("RabbitMQ sendToQueue returned false"); + } catch (error: any) { + logger.warn("RabbitMQ audit publish failed, using fallback", { + error: error.message || error, + eventType: entry.eventType, + }); + handleAuditFailure(entry, error); + } +} + +/** + * handleAuditFailure: Final fallback if RabbitMQ publish fails. + */ +function handleAuditFailure(entry: AuditEntry, error: any): void { + logger.error("CRITICAL: Audit logging failed (RabbitMQ unavailable)", { + eventType: entry.eventType, + error: error.message || error, + entry, + }); + + try { + const logDir = path.dirname(config.logFile); + if (!fs.existsSync(logDir)) { + fs.mkdirSync(logDir, { recursive: true }); + } + + const fallbackPath = path.join(logDir, "lost-audits.log"); + const fallbackEntry = { + timestamp: new Date().toISOString(), + ...entry, + error: error instanceof Error ? error.message : String(error), + }; + + fs.appendFileSync(fallbackPath, JSON.stringify(fallbackEntry) + "\n"); + logger.info(`Audit entry saved to fallback file: ${fallbackPath}`); + + // Alert admin + if (config.notification.alertEmail) { + const subject = `CRITICAL: Audit Log System Failure - ${entry.eventType}`; + const body = + `Audit logging failed to publish to RabbitMQ.\n\n` + + `Event Type: ${entry.eventType}\n` + + `Action: ${entry.action}\n` + + `Error: ${error.message || error}\n\n` + + `The audit entry has been saved to the fallback file: ${fallbackPath}\n\n` + + `Entry Data: ${JSON.stringify(entry, null, 2)}`; + + sendEmail(config.notification.alertEmail, subject, body).catch((e) => { + logger.error("Failed to send audit failure alert email", { + error: e.message || e, + }); + }); + } + } catch (fallbackError: any) { + logger.error("FATAL: Failed to write to audit fallback file", { + error: fallbackError.message || fallbackError, }); - } catch (e) { - logger.error("Audit log failed", { entry: entry.eventType, error: e }); } } diff --git a/tests/simple.test.ts b/tests/simple.test.ts new file mode 100644 index 0000000..1084a66 --- /dev/null +++ b/tests/simple.test.ts @@ -0,0 +1,5 @@ +describe('Simple', () => { + it('should pass', () => { + expect(1).toBe(1); + }); +});