diff --git a/libs/core-functions/package.json b/libs/core-functions/package.json index b25a8c55e..49fed9e4d 100644 --- a/libs/core-functions/package.json +++ b/libs/core-functions/package.json @@ -40,6 +40,7 @@ "juava": "workspace:*", "lodash": "^4.17.21", "mongodb": "^5.9.2", + "@mongodb-js/zstd": "^1.2.0", "ioredis": "^5.3.2", "posthog-node": "^2.4.0", "tslib": "^2.4.0", diff --git a/libs/core-functions/src/functions/lib/mongodb.ts b/libs/core-functions/src/functions/lib/mongodb.ts index 22180944e..1330a3875 100644 --- a/libs/core-functions/src/functions/lib/mongodb.ts +++ b/libs/core-functions/src/functions/lib/mongodb.ts @@ -17,7 +17,7 @@ async function createClient() { log.atInfo().log(`Connecting to MongoDB server...`); // Create a new MongoClient - const client = new MongoClient(mongodbURL); + const client = new MongoClient(mongodbURL, { compressors: ["zstd"] }); // Connect the client to the server (optional starting in v4.7) await client.connect(); // Establish and verify connection diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 142f97d28..770011c18 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -261,6 +261,9 @@ importers: '@amplitude/ua-parser-js': specifier: ^0.7.33 version: 0.7.33 + '@mongodb-js/zstd': + specifier: ^1.2.0 + version: 1.2.0 agentkeepalive: specifier: 4.3.0 version: 4.3.0 @@ -281,7 +284,7 @@ importers: version: 4.17.21 mongodb: specifier: ^5.9.2 - version: 5.9.2 + version: 5.9.2(@mongodb-js/zstd@1.2.0) parse-duration: specifier: ^1.1.0 version: 1.1.0 @@ -5077,6 +5080,82 @@ packages: dev: false optional: true + /@mongodb-js/zstd-darwin-arm64@1.2.0: + resolution: {integrity: sha512-QWgW6IkWp3ErBXOvlOj9lw3lwMfey7eXh/p/Srb/7sEiu1e0yEO+LQ8IctmDWh8bfznKXmwUC0h7LKDbYR30yw==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [darwin] + requiresBuild: true + dev: false + optional: true + + /@mongodb-js/zstd-darwin-x64@1.2.0: + resolution: {integrity: sha512-VnxYO8P2SWubdnydGId5+6veO6Ki6nxCr/pTaDZd8s4Urn6bDdXSX6YsZ0r42dO3Fa0FVYzrlcVAuNB67e2b6w==} + engines: {node: '>= 10'} + cpu: [x64] + os: [darwin] + requiresBuild: true + dev: false + optional: true + + /@mongodb-js/zstd-linux-arm64-gnu@1.2.0: + resolution: {integrity: sha512-TYF0XgNJW6UrvtY2u4Uuo5HiVWNgWNZ/ae2BhVp8hNsDhwFqb/YNoyiZqBei6whUwr8hecMy0UaHAXm3h+O2+Q==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@mongodb-js/zstd-linux-arm64-musl@1.2.0: + resolution: {integrity: sha512-e2ClmJI1BvJq23VSLH14hgjjjcMOad3R/Ap7Q7dTa1uiVSJG4xKd2CmrWQgX1Az4/EfUMWEI7pb4yuanbdd2AQ==} + engines: {node: '>= 10'} + cpu: [arm64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@mongodb-js/zstd-linux-x64-gnu@1.2.0: + resolution: {integrity: sha512-JuoK8lxUlkFPDBfsBUJKnLxpXA5ar+v7G43lIUlBKgjOp5aEWO/qQp5sNgCRnYA7x6PItYqIkEJjsays4N6JOA==} + engines: {node: '>= 10'} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@mongodb-js/zstd-linux-x64-musl@1.2.0: + resolution: {integrity: sha512-pSb1iUF3Gc/qrJuP/Mi5ry4YFAUdUVFKNRZh1KTDDhSWyRCLd9gKcNdRnXqJjIdeGGEKf4bhtZAbYw4i/g0foA==} + engines: {node: '>= 10'} + cpu: [x64] + os: [linux] + requiresBuild: true + dev: false + optional: true + + /@mongodb-js/zstd-win32-x64-msvc@1.2.0: + resolution: {integrity: sha512-iz4Yl+WK3yr/4Yg6F4tKz3X9+yMZDK6pyBMA0CdXydSDZs6o2XQ2I0ZSu3oSk/ACfaZX3SNfRi3XTGgAM1eKZA==} + engines: {node: '>= 10'} + cpu: [x64] + os: [win32] + requiresBuild: true + dev: false + optional: true + + /@mongodb-js/zstd@1.2.0: + resolution: {integrity: sha512-sKHsJU2MXsp822IFXOHw/4mpFulScNHpZzVy1Zi5k5wBsdiAPx1QramyOXZkpacla+2QPEC/s7TxPlEhG/HuNQ==} + engines: {node: '>= 10'} + optionalDependencies: + '@mongodb-js/zstd-darwin-arm64': 1.2.0 + '@mongodb-js/zstd-darwin-x64': 1.2.0 + '@mongodb-js/zstd-linux-arm64-gnu': 1.2.0 + '@mongodb-js/zstd-linux-arm64-musl': 1.2.0 + '@mongodb-js/zstd-linux-x64-gnu': 1.2.0 + '@mongodb-js/zstd-linux-x64-musl': 1.2.0 + '@mongodb-js/zstd-win32-x64-msvc': 1.2.0 + dev: false + /@nangohq/frontend@0.21.11: resolution: {integrity: sha512-oNOqM8gZhRaaHt63zyaWxsWi3pip4a78S9IzstEBVvUUzEFOU3+ys0r3TmyQCbA1veaAZjzeMyq+LtZem/Nt9g==} dev: false @@ -9437,7 +9516,6 @@ packages: dependencies: webpack: 5.88.2(webpack-cli@5.1.4) webpack-cli: 5.1.4(@webpack-cli/generators@3.0.7)(webpack@5.88.2) - dev: true /@webpack-cli/generators@3.0.1(mem-fs-editor@9.7.0)(mem-fs@2.3.0)(prettier@2.8.7)(webpack-cli@5.0.1)(webpack@5.78.0): resolution: {integrity: sha512-kgtqwN13udxC2wV2LfEmJQ/yGz6+j8cgy17jE9ybyHrmT0PJcwjSpsvj1fDWYUb7brSOB4B03s1mEPjZTRfwCQ==} @@ -9484,7 +9562,6 @@ packages: - mem-fs - mem-fs-editor - supports-color - dev: true /@webpack-cli/info@2.0.1(webpack-cli@5.0.1)(webpack@5.78.0): resolution: {integrity: sha512-fE1UEWTwsAxRhrJNikE7v4EotYflkEhBL7EbajfkPlf6E37/2QshOy/D48Mw8G5XMFlQtS6YV42vtbG9zBpIQA==} @@ -9505,7 +9582,6 @@ packages: dependencies: webpack: 5.88.2(webpack-cli@5.1.4) webpack-cli: 5.1.4(@webpack-cli/generators@3.0.7)(webpack@5.88.2) - dev: true /@webpack-cli/serve@2.0.1(webpack-cli@5.0.1)(webpack@5.78.0): resolution: {integrity: sha512-0G7tNyS+yW8TdgHwZKlDWYXFA6OJQnoLCQvYKkQP0Q2X205PSQ6RNUj0M+1OB/9gRQaUZ/ccYfaxd0nhaWKfjw==} @@ -9534,7 +9610,6 @@ packages: dependencies: webpack: 5.88.2(webpack-cli@5.1.4) webpack-cli: 5.1.4(@webpack-cli/generators@3.0.7)(webpack@5.88.2) - dev: true /@xobotyi/scrollbar-width@1.9.5: resolution: {integrity: sha512-N8tkAACJx2ww8vFMneJmaAgmjAG1tnVBZJRLRcx061tmsLRZHSEZSLuGWnwPtunsSLvSqXQ2wfp7Mgqg1I+2dQ==} @@ -11205,7 +11280,6 @@ packages: /commander@10.0.1: resolution: {integrity: sha512-y4Mg2tXshplEbSGzx7amzPwKKOCGuoSRP/CjEdwwk0FOGlUbq6lKuoyDZTNZkmxHdJtp54hdfY/JUrdL7Xfdug==} engines: {node: '>=14'} - dev: true /commander@11.0.0: resolution: {integrity: sha512-9HMlXtt/BNoYr8ooyjjNRdIilOTkVJXB+GhxMTtOKwk0R4j4lS4NpjuqmRxroBfnfTSHQIHQB7wryHhXarNjmQ==} @@ -18990,7 +19064,7 @@ packages: whatwg-url: 11.0.0 dev: false - /mongodb@5.9.2: + /mongodb@5.9.2(@mongodb-js/zstd@1.2.0): resolution: {integrity: sha512-H60HecKO4Bc+7dhOv4sJlgvenK4fQNqqUIlXxZYQNbfEWSALGAwGoyJd/0Qwk4TttFXUOHJ2ZJQe/52ScaUwtQ==} engines: {node: '>=14.20.1'} peerDependencies: @@ -19011,6 +19085,7 @@ packages: snappy: optional: true dependencies: + '@mongodb-js/zstd': 1.2.0 bson: 5.5.0 mongodb-connection-string-url: 2.6.0 socks: 2.7.1 @@ -24514,7 +24589,7 @@ packages: schema-utils: 3.3.0 serialize-javascript: 6.0.1 terser: 5.19.4 - webpack: 5.88.2(webpack-cli@5.0.1) + webpack: 5.88.2(webpack-cli@5.1.4) /terser@4.8.1: resolution: {integrity: sha512-4GnLC0x667eJG0ewJTa6z/yXrbLGv80D9Ru6HIpCQmO+Q4PfEtBFi0ObSckqwL6VyQv/7ENJieXHo2ANmdQwgw==} @@ -25802,7 +25877,6 @@ packages: rechoir: 0.8.0 webpack: 5.88.2(webpack-cli@5.1.4) webpack-merge: 5.8.0 - dev: true /webpack-dev-middleware@5.3.3(webpack@5.88.2): resolution: {integrity: sha512-hj5CYrY0bZLB+eTO+x/j67Pkrquiy7kWepMHmUMoPsmcUaeEnQJqFzHJOyxgWlq746/wUuA64p9ta34Kyb01pA==} @@ -26025,7 +26099,6 @@ packages: - '@swc/core' - esbuild - uglify-js - dev: true /websocket-driver@0.7.4: resolution: {integrity: sha512-b17KeDIQVjvb0ssuSDF2cYXSg2iztliJ4B9WdsuB6J952qCPKmnVq4DyW5motImXHDC1cBT/1UezrJVsKw5zjg==} diff --git a/services/rotor/dist_package.json b/services/rotor/dist_package.json index 182e271e0..e6ac9fc2c 100644 --- a/services/rotor/dist_package.json +++ b/services/rotor/dist_package.json @@ -3,6 +3,7 @@ "version": "0.0.0", "dependencies": { "isolated-vm": "4.6.0", - "@sensejs/kafkajs-zstd-support": "^0.11.0" + "@sensejs/kafkajs-zstd-support": "^0.11.0", + "@mongodb-js/zstd": "^1.2.0" } } diff --git a/services/rotor/src/http/udf.ts b/services/rotor/src/http/udf.ts index ce19b73bf..3dd0124bc 100644 --- a/services/rotor/src/http/udf.ts +++ b/services/rotor/src/http/udf.ts @@ -1,12 +1,22 @@ -import { UDFTestRun, UDFTestRequest, mongodb, defaultTTL, createMongoStore } from "@jitsu/core-functions"; +import { + UDFTestRun, + UDFTestRequest, + mongodb, + defaultTTL, + createMongoStore, + createTtlStore, +} from "@jitsu/core-functions"; import { getLog } from "juava"; +import { redis } from "@jitsu-internal/console/lib/server/redis"; const log = getLog("udf_run"); export const UDFRunHandler = async (req, res) => { const body = req.body as UDFTestRequest; - log.atInfo().log(`Running function: ${body?.functionId} workspace: ${body?.workspaceId}`, JSON.stringify(body)); - body.store = createMongoStore(body?.workspaceId, await mongodb.waitInit(), defaultTTL); + //log.atInfo().log(`Running function: ${body?.functionId} workspace: ${body?.workspaceId}`, JSON.stringify(body)); + body.store = process.env.MONGODB_URL + ? createMongoStore(body?.workspaceId, mongodb(), defaultTTL) + : createTtlStore(body?.workspaceId, redis(), defaultTTL); const result = await UDFTestRun(body); if (result.error) { log diff --git a/services/rotor/src/index.ts b/services/rotor/src/index.ts index 4858666a9..51b251104 100644 --- a/services/rotor/src/index.ts +++ b/services/rotor/src/index.ts @@ -6,7 +6,7 @@ import { rotorConsumerGroupId, } from "./lib/kafka-config"; import { kafkaRotor } from "./lib/rotor"; - +import { mongodb } from "@jitsu/core-functions"; import minimist from "minimist"; import { glob } from "glob"; import fs from "fs"; @@ -16,6 +16,7 @@ import Prometheus from "prom-client"; import { FunctionsHandler, FunctionsHandlerMulti } from "./http/functions"; import { initMaxMindClient } from "./lib/maxmind"; import { rotorMessageHandler } from "./lib/message-handler"; +import { redis } from "@jitsu-internal/console/lib/server/redis"; export const log = getLog("rotor"); @@ -54,6 +55,8 @@ async function main() { }); log.atInfo().log("Starting kafka processing"); Prometheus.collectDefaultMetrics(); + await mongodb.waitInit(); + await redis.waitInit(); rotor .start() .then(chMetrics => { diff --git a/services/rotor/src/lib/message-handler.ts b/services/rotor/src/lib/message-handler.ts index f587a41d5..396a1da1a 100644 --- a/services/rotor/src/lib/message-handler.ts +++ b/services/rotor/src/lib/message-handler.ts @@ -108,8 +108,12 @@ export async function rotorMessageHandler( }, }; const oldStore = createTtlStore(connection.workspaceId, redis(), defaultTTL); - const newStore = createMongoStore(connection.workspaceId, await mongodb.waitInit(), defaultTTL); - const store = newStoreWorskpaceId.includes(connection.workspaceId) ? newStore : createMultiStore(newStore, oldStore); + const newStore = process.env.MONGODB_URL ? createMongoStore(connection.workspaceId, mongodb(), defaultTTL) : oldStore; + const store = newStoreWorskpaceId.includes(connection.workspaceId) + ? newStore + : newStore != oldStore + ? createMultiStore(newStore, oldStore) + : oldStore; const rl = await redisLogger.waitInit(); diff --git a/services/rotor/webpack.config.js b/services/rotor/webpack.config.js index 4a2dccaab..034eba502 100644 --- a/services/rotor/webpack.config.js +++ b/services/rotor/webpack.config.js @@ -6,6 +6,8 @@ const config = { target: "node", externals: { "isolated-vm": "require('isolated-vm')", + "@sensejs/kafkajs-zstd-support": "require('@sensejs/kafkajs-zstd-support')", + "@mongodb-js/zstd": "require('@mongodb-js/zstd')", }, node: { __dirname: false,