Skip to content

Commit

Permalink
make mongodb optional again
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 6, 2024
1 parent e5db1d8 commit b9330e1
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 7 deletions.
5 changes: 3 additions & 2 deletions libs/core-functions/src/functions/lib/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Redis } from "ioredis";
import parse from "parse-duration";
import { MongoClient, ReadPreference, Collection } from "mongodb";
import { RetryError } from "@jitsu/functions-lib";
import { Singleton } from "juava";

export const defaultTTL = 60 * 60 * 24 * 31; // 31 days
export const maxAllowedTTL = 2147483647; // max allowed value for ttl in redis (68years)
Expand Down Expand Up @@ -64,7 +65,7 @@ const MongoCreatedCollections: Record<string, Collection<StoreValue>> = {};

export const createMongoStore = (
namespace: string,
mongo: MongoClient,
mongo: Singleton<MongoClient>,
useLocalCache: boolean,
fast: boolean
): TTLStore => {
Expand All @@ -87,7 +88,7 @@ export const createMongoStore = (
return collection;
}
try {
const db = mongo.db(dbName);
const db = mongo().db(dbName);

const col = db.collection<StoreValue>(namespace);
const collStatus = await col
Expand Down
2 changes: 1 addition & 1 deletion services/profiles/src/lib/functions-chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export function buildFunctionChain(
fetchTimeoutMs: number = 2000
): FuncChain {
const pbLongId = `${profileBuilder.workspaceId}-${profileBuilder.id}-v${profileBuilder.version}`;
const store = createMongoStore(profileBuilder.workspaceId, mongodb(), false, true);
const store = createMongoStore(profileBuilder.workspaceId, mongodb, false, true);

const chainCtx: FunctionChainContext = {
fetch: makeFetch(profileBuilder.id, eventsLogger, "info", fetchTimeoutMs),
Expand Down
2 changes: 1 addition & 1 deletion services/rotor/src/http/profiles-udf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const log = getLog("profile-udf-run");
export const ProfileUDFRunHandler = async (req, res) => {
const body = req.body as ProfileUDFTestRequest;
log.atInfo().log(`Running profile func: ${body?.id} workspace: ${body?.workspaceId}`, JSON.stringify(body));
body.store = createMongoStore(body?.workspaceId, mongodb(), true, false);
body.store = createMongoStore(body?.workspaceId, mongodb, true, false);
const result = await ProfileUDFTestRun(body);
if (result.error) {
log
Expand Down
2 changes: 1 addition & 1 deletion services/rotor/src/http/udf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const log = getLog("udf_run");
export const UDFRunHandler = async (req, res) => {
const body = req.body as UDFTestRequest;
log.atInfo().log(`Running function: ${body?.functionId} workspace: ${body?.workspaceId}`, JSON.stringify(body));
body.store = createMongoStore(body?.workspaceId, mongodb(), true, false);
body.store = createMongoStore(body?.workspaceId, mongodb, true, false);
const result = await UDFTestRun(body);
if (result.error) {
log
Expand Down
6 changes: 5 additions & 1 deletion services/rotor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ async function main() {
let redisClient: Redis | undefined;
try {
Prometheus.collectDefaultMetrics();
await mongodb.waitInit();
try {
await mongodb.waitInit();
} catch (e: any) {
log.atWarn().log("Failed to connect to mongodb. Functions Persistent Store won't work: " + e.message);
}
if (process.env.CLICKHOUSE_HOST || process.env.CLICKHOUSE_URL) {
eventsLogger = createClickhouseLogger();
} else {
Expand Down
2 changes: 1 addition & 1 deletion services/rotor/src/lib/functions-chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export function buildFunctionChain(
if (!store) {
store = createMongoStore(
connection.workspaceId,
mongodb(),
mongodb,
false,
fastStoreWorkspaceId.includes(connection.workspaceId)
);
Expand Down

0 comments on commit b9330e1

Please sign in to comment.