Skip to content

Commit 9532e76

Browse files
authored
Merge pull request #5 from ricash-org/Aissata
Add RabbitMQ
2 parents f518835 + 1368004 commit 9532e76

File tree

14 files changed

+406
-10
lines changed

14 files changed

+406
-10
lines changed

Docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ services:
1010
env_file: .env
1111
depends_on:
1212
- postgres
13+
- rabbitmq
1314
networks:
1415
- notification-net
1516

@@ -27,6 +28,18 @@ services:
2728
- pgdata:/var/lib/postgresql/data
2829
networks:
2930
- notification-net
31+
32+
rabbitmq:
33+
image: rabbitmq:3-management
34+
container_name: rabbitmq
35+
ports:
36+
- "5672:5672"
37+
- "15672:15672"
38+
environment:
39+
RABBITMQ_DEFAULT_USER: ricash
40+
RABBITMQ_DEFAULT_PASS: ricash123
41+
42+
3043

3144
networks:
3245
notification-net:

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,7 @@ COPY --from=builder /app/dist ./dist
3131
# Exposer le port
3232
EXPOSE 3000
3333

34+
3435
# Lancer la version compilée
3536
CMD ["node", "dist/server.js"]
37+

package-lock.json

Lines changed: 53 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22
"name": "notification-service",
33
"version": "1.0.0",
44
"description": "",
5-
"main": "dist/server.js",
5+
"main": "src/server.ts",
66
"scripts": {
77
"dev": "ts-node-dev src/server.ts",
8-
"start": "node dist/server.js",
8+
"start": "node src/server.ts",
99
"build": "tsc"
1010
},
1111
"keywords": [],
1212
"author": "",
1313
"license": "ISC",
1414
"type": "commonjs",
1515
"dependencies": {
16+
"amqplib": "^0.10.9",
1617
"body-parser": "^2.2.0",
1718
"dotenv": "^17.2.3",
1819
"express": "^5.1.0",
@@ -23,6 +24,7 @@
2324
"typeorm": "^0.3.27"
2425
},
2526
"devDependencies": {
27+
"@types/amqplib": "^0.10.8",
2628
"@types/express": "^5.0.5",
2729
"@types/jsonwebtoken": "^9.0.10",
2830
"@types/node": "^24.10.0",

src/config/rabbitmq.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// import amqp from "amqplib";
2+
3+
// let channel: amqp.Channel;
4+
5+
// export const connectRabbitMQ = async () => {
6+
// try {
7+
// const connection = await amqp.connect("amqp://admin:admin@rabbitmq:5672");
8+
// channel = await connection.createChannel();
9+
// console.log("RabbitMQ connecté");
10+
// } catch (error) {
11+
// console.error("Erreur RabbitMQ:", error);
12+
// }
13+
// };
14+
15+
// export const getChannel = () => channel;
16+
17+
// src/messaging/rabbitmq.ts
18+
/**
19+
* Connexion RabbitMQ partagée pour otp-service
20+
* - garde la même connexion/channel pendant tout le runtime
21+
* - gère reconnect automatique en cas de fermeture/erreur
22+
*
23+
* Variables d'environnement :
24+
* - RABBITMQ_URL (ex: amqp://admin:admin@rabbitmq:5672)
25+
*/
26+
27+
// src/messaging/rabbitmq.ts
28+
import * as amqp from "amqplib";
29+
import type { Connection, Channel } from "amqplib";
30+
31+
let connection: amqp.Connection | null = null;
32+
let channel: Channel | null = null;
33+
34+
// Nom des files
35+
export const QUEUE_MAIN = "notifications.main";
36+
export const QUEUE_RETRY = "notifications.retry";
37+
export const QUEUE_DLQ = "notifications.dlq";
38+
39+
// Fonction de connexion + reconnexion automatique
40+
export async function ensureChannel() {
41+
try {
42+
console.log("Tentative de connexion à RabbitMQ...");
43+
44+
let connection = await amqp.connect(process.env.RABBITMQ_URL!);
45+
46+
// garder une référence locale non-nulle pour satisfaire TypeScript
47+
const conn = connection!;
48+
49+
conn.on("error", (err) => {
50+
console.error("⚠️ Erreur RabbitMQ :", err);
51+
// connection = null;
52+
});
53+
54+
conn.on("close", () => {
55+
console.error("Connexion RabbitMQ fermée. Reconnexion...");
56+
//connection = null;
57+
setTimeout(ensureChannel, 3000);
58+
});
59+
60+
channel = await conn.createChannel();
61+
const ch = channel!;
62+
63+
console.log(" Connecté à RabbitMQ !");
64+
65+
// ---- Déclaration des files ----
66+
67+
// File principale
68+
await ch.assertQueue(QUEUE_MAIN, { durable: true });
69+
70+
// File de retry avec TTL de 5s
71+
await ch.assertQueue(QUEUE_RETRY, {
72+
durable: true,
73+
arguments: {
74+
"x-message-ttl": 5000,
75+
"x-dead-letter-exchange": "",
76+
"x-dead-letter-routing-key": QUEUE_MAIN,
77+
},
78+
});
79+
80+
// Dead-Letter Queue
81+
await ch.assertQueue(QUEUE_DLQ, { durable: true });
82+
83+
return ch;
84+
} catch (err) {
85+
console.error("Erreur de connexion à RabbitMQ :", err);
86+
setTimeout(ensureChannel, 3000);
87+
}
88+
}
89+
90+
export function getRabbitChannel(): Channel {
91+
if (!channel) {
92+
throw new Error("RabbitMQ non connecté !");
93+
}
94+
return channel;
95+
}

src/controllers/notificationController.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Request, Response } from "express";
22
import { NotificationService } from "../services/notificationService";
3+
import { publishNotification } from "../messaging/publisher";
34

45
const service = new NotificationService();
56

@@ -16,3 +17,11 @@ export const getNotifications = async (req: Request, res: Response) => {
1617
const list = await service.getAll();
1718
res.json(list);
1819
};
20+
21+
22+
export async function testRabbitMQ(req: Request, res: Response) {
23+
const { queueName, message } = req.body;
24+
await publishNotification(queueName, message);
25+
res.json({ success: true });
26+
}
27+

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import { Router } from "express";
22
import { generateOtp, verifyOtp } from "./controllers/optController";
3-
import { envoyerNotification } from "./controllers/notificationController";
3+
import { envoyerNotification, testRabbitMQ } from "./controllers/notificationController";
44

55
const router = Router();
66

77
// Notifications
88
router.post("/notifications/envoyer", envoyerNotification);
9+
router.post("/rabbitmq", testRabbitMQ);
910

1011
// OTP
1112
router.post("/otp/generate", generateOtp);

src/messaging/consumer.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
2+
// import { getChannel } from "../config/rabbitmq";
3+
// import { NotificationService } from "../services/notificationService";
4+
5+
// export const startConsumer = async () => {
6+
// const channel = getChannel();
7+
// const queue = "notifications_queue";
8+
// const exchange = "notifications_exchange";
9+
10+
// await channel.assertExchange(exchange, "direct", { durable: true });
11+
// await channel.assertQueue(queue, { durable: true });
12+
// await channel.bindQueue(queue, exchange, "send-notification");
13+
14+
// console.log("📥 En attente de messages...");
15+
16+
// channel.consume(queue, async (msg) => {
17+
// if (msg) {
18+
// const data = JSON.parse(msg.content.toString());
19+
// console.log("📩 Message reçu :", data);
20+
21+
// const service = new NotificationService();
22+
// await service.envoyerNotification(data);
23+
24+
// channel.ack(msg);
25+
// }
26+
// });
27+
// };
28+
29+
30+
31+
import { getRabbitChannel, QUEUE_MAIN, QUEUE_RETRY, QUEUE_DLQ } from "../config/rabbitmq";
32+
import { NotificationService } from "../services/notificationService";
33+
34+
const notifService = new NotificationService();
35+
36+
export async function startConsumer() {
37+
const channel = getRabbitChannel();
38+
39+
console.log("Consumer prêt. En attente de messages...");
40+
41+
channel.consume(QUEUE_MAIN, async (msg) => {
42+
if (!msg) return;
43+
44+
const content = JSON.parse(msg.content.toString());
45+
console.log("Message reçu :", content);
46+
47+
try {
48+
// Envoi de la vraie notification
49+
await notifService.envoyerNotification(content);
50+
51+
channel.ack(msg);
52+
console.log("Notification envoyée avec succès !");
53+
} catch (error) {
54+
console.error("Erreur lors du traitement :", error);
55+
56+
const retryCount = Number(msg.properties.headers?.["x-retries"] ?? 0);
57+
58+
if (retryCount < 3) {
59+
console.log(`Retry ${retryCount + 1}/3...`);
60+
61+
channel.sendToQueue(QUEUE_RETRY, msg.content, {
62+
headers: { "x-retries": retryCount + 1 },
63+
});
64+
} else {
65+
console.log("Trop d'erreurs → envoi en DLQ");
66+
67+
const errorMessage = error instanceof Error ? error.message : String(error);
68+
69+
channel.sendToQueue(QUEUE_DLQ, msg.content, {
70+
headers: { "x-final-error": errorMessage },
71+
});
72+
}
73+
74+
channel.ack(msg);
75+
}
76+
});
77+
}

0 commit comments

Comments
 (0)