Skip to content

Commit 8214052

Browse files
committed
migrated to bullmq
1 parent 185703f commit 8214052

File tree

8 files changed

+289
-196
lines changed

8 files changed

+289
-196
lines changed

server/package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"axios": "^1.4.0",
5757
"bcryptjs": "^2.4.3",
5858
"bull": "^4.10.4",
59+
"bullmq": "^5.7.1",
5960
"cheerio": "1.0.0-rc.12",
6061
"cohere-ai": "^6.2.1",
6162
"concurrently": "^7.0.0",
@@ -71,7 +72,7 @@
7172
"grammy": "^1.16.2",
7273
"html-to-text": "^9.0.5",
7374
"ignore": "^5.2.4",
74-
"ioredis": "^5.3.2",
75+
"ioredis": "^5.4.1",
7576
"langchain": "^0.1.25",
7677
"mammoth": "^1.6.0",
7778
"pdf-parse": "^1.1.1",

server/src/app.ts

+30
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import fastifySession from "@fastify/session";
1010
import { getSessionSecret, isCookieSecure } from "./utils/session";
1111
import swagger from "@fastify/swagger";
1212
import swaggerUi from "@fastify/swagger-ui";
13+
import { pathToFileURL } from "url";
14+
import { Worker } from "bullmq";
1315
declare module "fastify" {
1416
interface Session {
1517
is_bot_allowed: boolean;
@@ -82,5 +84,33 @@ const app: FastifyPluginAsync<AppOptions> = async (
8284
});
8385
};
8486

87+
const redis_url = process.env.DB_REDIS_URL || process.env.REDIS_URL;
88+
if (!redis_url) {
89+
throw new Error("Redis url is not defined");
90+
}
91+
const username = redis_url.split(":")[1].replace("//", "");
92+
const password = redis_url.split(":")[2].split("@")[0];
93+
const host = redis_url.split("@")[1].split(":")[0];
94+
const port = parseInt(redis_url.split(":")[3]);
95+
const path = join(__dirname, "./queue/index.js");
96+
const workerUrl = pathToFileURL(path);
97+
const concurrency = parseInt(process.env.DB_QUEUE_CONCURRENCY || "1");
98+
const workerThreads = process.env.DB_QUEUE_THREADS || "false";
99+
const worker = new Worker("vector", workerUrl, {
100+
connection: {
101+
host,
102+
port,
103+
password,
104+
username,
105+
},
106+
concurrency,
107+
useWorkerThreads: workerThreads === "true",
108+
});
109+
110+
process.on("SIGINT", async () => {
111+
await worker.close();
112+
process.exit();
113+
});
114+
85115
export default app;
86116
export { app, options };

0 commit comments

Comments
 (0)