Skip to content

Commit

Permalink
rotor: fast & slow mode for mongo store. fixes and tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 28, 2023
1 parent 1135ef1 commit bcd814f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 44 deletions.
8 changes: 6 additions & 2 deletions libs/core-functions/src/functions/lib/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ async function ensureMongoCollection(mongo: MongoClient, collectionName: string,
}
try {
const db = mongo.db();
const collStatus = await db.collection(collectionName).stats();
if (collStatus.wiredTiger) {
const collStatus = await db
.collection(collectionName)
.aggregate([{ $collStats: { count: {} } }])
.next()
.catch(e => {});
if (collStatus) {
//collection already exists
MongoCreatedCollections.add(collectionName);
return;
Expand Down
117 changes: 77 additions & 40 deletions libs/core-functions/src/functions/lib/store.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { SetOpts, Store, TTLStore } from "@jitsu/protocols/functions";
import type { Redis } from "ioredis";
import parse from "parse-duration";
import type { MongoClient } from "mongodb";
import { MongoClient, ReadPreference, Collection, ClientSession } from "mongodb";
import { getLog } from "juava";

export const defaultTTL = 60 * 60 * 24 * 31; // 31 days
export const maxAllowedTTL = 2147483647; // max allowed value for ttl in redis (68years)

export const log = getLog("store");

function getTtlSec(opts?: SetOpts): number {
let seconds = defaultTTL;
if (typeof opts === "number") {
Expand Down Expand Up @@ -62,82 +65,116 @@ export const createTtlStore = (namespace: string, redisClient: Redis, defaultTtl
},
});

export const createMongoStore = (namespace: string, mongo: MongoClient, defaultTtlSec: number): TTLStore => {
interface StoreValue {
_id: string;
value: any;
expireAt: Date;
}
const collections = new Set<string>();
interface StoreValue {
_id: string;
value: any;
expireAt: Date;
}

const MongoCreatedCollections: Record<string, Collection<StoreValue>> = {};

export const createMongoStore = (namespace: string, mongo: MongoClient, fast: boolean): TTLStore => {
const localCache: Record<string, StoreValue> = {};
const readOptions = fast ? { readPreference: ReadPreference.NEAREST } : {};
const writeOptions = fast ? { writeConcern: { w: 1, journal: false } } : {};

const dbName = `persistent_store`;
async function ensureCollection() {
if (collections.has(namespace)) {
return;

async function ensureCollection(): Promise<Collection<StoreValue>> {
let collection = MongoCreatedCollections[namespace];
if (collection) {
return collection;
}
try {
const db = mongo.db(dbName);
const collStatus = await db.collection<StoreValue>(namespace).stats();
if (collStatus.wiredTiger) {

const col = db.collection<StoreValue>(namespace);
const collStatus = await col
.aggregate([{ $collStats: { count: {} } }])
.next()
.catch(e => {});
if (collStatus) {
//collection already exists
collections.add(namespace);
return;
MongoCreatedCollections[namespace] = col;
return col;
}
const collection = await db.createCollection<StoreValue>(namespace, {
writeConcern: { w: 1, journal: false },
collection = await db.createCollection<StoreValue>(namespace, {
storageEngine: { wiredTiger: { configString: "block_compressor=zstd" } },
});
await collection.createIndex({ expireAt: 1 }, { expireAfterSeconds: 0 });
collections.add(namespace);
MongoCreatedCollections[namespace] = collection;
return collection;
} catch (err) {
throw new Error(`Failed to create collection ${namespace}: ${err}`);
}
}

return {
get: async (key: string) => {
const res = await mongo
.db(dbName)
.collection<StoreValue>(namespace)
.findOne({ _id: key }, { readPreference: "nearest" });
const res =
localCache[key] ||
(await ensureCollection()
.then(c => c.findOne({ _id: key }, readOptions))
.catch(e => {
log.atError().withCause(e).log(`Error getting key ${key} from mongo store ${namespace}`);
}));
return res ? res.value : undefined;
},
getWithTTL: async (key: string) => {
const res = await mongo
.db(dbName)
.collection<StoreValue>(namespace)
.findOne({ _id: key }, { readPreference: "nearest" });
const res =
localCache[key] ||
(await ensureCollection()
.then(c => c.findOne({ _id: key }, readOptions))
.catch(e => {
log.atError().withCause(e).log(`Error getting key ${key} from mongo store ${namespace}`);
}));
if (!res) {
return undefined;
}
const ttl = res.expireAt ? Math.max(Math.floor((res.expireAt.getTime() - new Date().getTime()) / 1000), 0) : -1;
return { value: res.value, ttl };
},
set: async (key: string, obj: any, opts?: SetOpts) => {
await ensureCollection();
const colObj: any = { value: obj };
const ttl = getTtlSec(opts);
if (ttl >= 0) {
const expireAt = new Date();
expireAt.setSeconds(expireAt.getSeconds() + ttl);
colObj.expireAt = expireAt;
}
await mongo
.db(dbName)
.collection<StoreValue>(namespace)
.replaceOne({ _id: key }, colObj, { upsert: true, writeConcern: { w: 1, journal: false } });

await ensureCollection()
.then(c =>
c.replaceOne({ _id: key }, colObj, {
upsert: true,
...writeOptions,
})
)
.then(() => {
localCache[key] = colObj;
})
.catch(e => {
log.atError().withCause(e).log(`Error setting key ${key} from mongo store ${namespace}`);
});
},
del: async (key: string) => {
await ensureCollection();
await mongo
.db(dbName)
.collection<StoreValue>(namespace)
.deleteOne({ _id: key }, { writeConcern: { w: 1, journal: false } });
await ensureCollection()
.then(c => c.deleteOne({ _id: key }, writeOptions))
.then(() => {
delete localCache[key];
})
.catch(e => {
log.atError().withCause(e).log(`Error deleting key ${key} from mongo store ${namespace}`);
});
},
ttl: async (key: string) => {
const res = await mongo
.db(dbName)
.collection<StoreValue>(namespace)
.findOne({ _id: key }, { readPreference: "nearest" });
const res =
localCache[key] ||
(await ensureCollection()
.then(c => c.findOne({ _id: key }, readOptions))
.catch(e => {
log.atError().withCause(e).log(`Error getting key ${key} from mongo store ${namespace}`);
}));
return res
? res.expireAt
? Math.max(Math.floor((res.expireAt.getTime() - new Date().getTime()) / 1000), 0)
Expand Down
2 changes: 1 addition & 1 deletion services/rotor/src/http/udf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const UDFRunHandler = async (req, res) => {
const body = req.body as UDFTestRequest;
log.atInfo().log(`Running function: ${body?.functionId} workspace: ${body?.workspaceId}`, JSON.stringify(body));
body.store = process.env.MONGODB_URL
? createMongoStore(body?.workspaceId, mongodb(), defaultTTL)
? createMongoStore(body?.workspaceId, mongodb(), false)
: createTtlStore(body?.workspaceId, redis(), defaultTTL);
const result = await UDFTestRun(body);
if (result.error) {
Expand Down
7 changes: 6 additions & 1 deletion services/rotor/src/lib/message-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { buildFunctionChain, checkError, runChain } from "./functions-chain";
export const log = getLog("rotor");

const anonymousEventsStore = mongoAnonymousEventsStore();
const fastStoreWorskpaceId = (process.env.FAST_STORE_WORKSPACE_ID ?? "").split(",").filter(x => x.length > 0);

export async function rotorMessageHandler(
_message: string | object | undefined,
Expand Down Expand Up @@ -86,7 +87,11 @@ export async function rotorMessageHandler(
connectionId: connection.id,
retries,
};
const store = createMongoStore(connection.workspaceId, mongodb(), defaultTTL);
const store = createMongoStore(
connection.workspaceId,
mongodb(),
fastStoreWorskpaceId.includes(connection.workspaceId)
);
//system context for builtin functions only
const systemContext: SystemContext = {
$system: {
Expand Down

1 comment on commit bcd814f

@vercel
Copy link

@vercel vercel bot commented on bcd814f Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

new-jitsu – ./webapps/console

thinkr.com.br
use.jitsu.com
usepolygon.io
www.sambla.se
ajewellers.com
data.uselog.io
gpt.whatfa.com
sidetrekai.com
t.papermark.io
t.saasmonk.app
use2.jitsu.com
w.d2.jitsu.com
www.kellen.top
*.dataspecc.com
app.bluetick.ai
caddy.jitsu.com
data.askloan.tw
enterticket.com
events.mitzu.io
ildar.jitsu.com
jitsu.efeer.com
jitsu.ivve.tech
krestomatio.com
sevenbillion.co
w2.d2.jitsu.com
xrt.webxr.tools
app.jotverse.com
caddy2.jitsu.com
cname2.jitsu.com
data.mysitee.com
data.toptere.com
dev-t.democo.dev
events.quenti.io
utils.doogma.com
worthsystems.com
data.music2me.com
data.timeplus.com
event-gateway.com
https.bluetick.ai
ji.degulesider.dk
jitsu.ivve.health
metabase.erxes.io
t.clickncruise.hu
test.d2.jitsu.com
cloud.yupaopao.com
data.investing.com
data.mycompany.com
data.usepolygon.io
demosite.jitsu.com
dev.driverdeck.app
n8n.paziresh24.com
new.enterticket.es
t-dev.papermark.io
test2.d2.jitsu.com
uniquecafes.com.br
www.sidetrekai.com
colectha.voolu.shop
crm.myguestcare.com
data.sidetrekai.com
data.timeplus.cloud
localhost.jitsu.com
report.improvado.io
www.sevenbillion.co
teste.fazcomex.com.br
analytics.dev.knekt.io
loraboutiquedental.com
notion.twelftree.co.uk
dev-portal.zoopsign.com
event.tradejobsnz.co.nz
investing-poc.jitsu.dev
savvy-replay.jitsu.tech
data.analytics-smart.com
data.handelsregister.app
event.clickncruise.co.uk
jt.fairhopeweb.github.io
savvy-replay2.jitsu.tech
savvy-replay3.jitsu.tech
savvy-replay4.jitsu.tech
track.alquimiaweb.com.br
track.pressance-group.jp
track.uniquecafes.com.br
colectha.agenciavoolu.com
kolectha.agenciavoolu.com
lp.loraboutiquedental.com
stage-portal.zoopsign.com
new-jitsu-jitsu.vercel.app
lodercom-colectha.voolu.shop
warehouse1.trendstyle.com.au
d0.livingdesignsfurniture.com
ingest-load-testing.jitsu.dev
jitsu.precisaosistemas.com.br
analytics.inspiresolutions.app
betteruptime-monitoring.jitsu.dev
canvas.livingdesignsfurniture.com
analytics.dev.inspiresolutions.app
cl9vt45z50001znkunc6v8fmm.d.jitsu.com
clm2jikrm00002v6r5l6niws3.d.jitsu.com
new-jitsu-git-newjitsu-jitsu.vercel.app
3000-rajaraodv-customerdemo-nmpsqwflswt.ws-us102.gitpod.io
new.jitsu.dev

Please sign in to comment.