Skip to content

Commit

Permalink
console: added flag to disable fast store
Browse files Browse the repository at this point in the history
rotor: create db view on startup
  • Loading branch information
absorbb committed Dec 28, 2023
1 parent 0997a1b commit 1135ef1
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 73 deletions.
25 changes: 24 additions & 1 deletion services/rotor/src/lib/pg-config-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,30 @@ export function createPg(): Pool | undefined {
pool.on("error", error => {
log.atError().withCause(error).log("Pool error");
});

// TODO: remove this to console's prisma migration
setImmediate(() =>
pool.query(`create or replace view enriched_connections_push as select link.id as "id",
json_build_object('id', link.id,
'workspaceId', ws.id,
'destinationId', dst.id,
'streamId', src.id,
'usesBulker', link."data" ?& array['mode', 'dataLayout'] ,
'type', dst."config" ->> 'destinationType',
'options', link.data,
'updatedAt', to_char(link."updatedAt", 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
'credentials', dst.config,
'credentialsHash', md5(dst.config::text)
) as "enrichedConnection"
from "ConfigurationObjectLink" link
join "Workspace" ws on link."workspaceId" = ws.id and ws.deleted = false
join "ConfigurationObject" dst
on dst.id = link."toId" and dst.type = 'destination' and dst."workspaceId" = link."workspaceId" and
dst.deleted = false
join "ConfigurationObject" src
on src.id = link."fromId" and src.type = 'stream' and
src."workspaceId" = link."workspaceId" and src.deleted = false
where link.deleted = false`)
);
return pool;
}

Expand Down
165 changes: 93 additions & 72 deletions webapps/console/lib/server/fast-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { redis } from "./redis";
import { getServerLog } from "./log";
import hash from "object-hash";
import { IngestType } from "@jitsu/protocols/async-request";
import { isTruish } from "../shared/chores";

type RedisKey = { tmp(): string; used(): boolean; rename(redis: Redis): Promise<void>; name(): string };

Expand Down Expand Up @@ -231,11 +232,14 @@ function createStreamWithDestinations(
}

async function saveConnectionsToRedis(db: DatabaseConnection) {
const fastStoreEnabled = !isTruish(process.env.DISABLE_FAST_STORE);
const backupSupported = !!(process.env.S3_REGION && process.env.S3_ACCESS_KEY_ID && process.env.S3_SECRET_ACCESS_KEY);
const bulkerRedisKey = redisKeyRoutes.enrichedConnections();
//const domainsRedisKey = redisKeyRoutes.streamDomain();
//const idsRedisKey = redisKeyRoutes.streamIds();
//const apiKeys = redisKeyRoutes.apiKeys();

const domainsRedisKey = redisKeyRoutes.streamDomain();
const idsRedisKey = redisKeyRoutes.streamIds();
const apiKeys = redisKeyRoutes.apiKeys();

const query = `
select
greatest(link."updatedAt", src."updatedAt", dst."updatedAt") as "updatedAt",
Expand All @@ -260,29 +264,32 @@ async function saveConnectionsToRedis(db: DatabaseConnection) {
ws."deleted" = false and src."workspaceId" = link."workspaceId" and src."workspaceId" = link."workspaceId"
order by "fromId", "toId"
`;
//let destinationsBuffer: ShortDestinationConfig[] = [];
//let lastStreamConfig: StreamConfig | undefined = undefined;
//let lastBackupEnabled: boolean = false;
//let streamsByDomain: Record<string, Record<string, StreamWithDestinations>> = {};
//const noDomainKey = "no-domain";
// const addStreamByDomain = (
// streamConfig: StreamConfig,
// destinationsBuffer: ShortDestinationConfig[],
// backupEnabled: boolean
// ) => {
// for (const domain of streamConfig.domains?.length ? streamConfig.domains : [noDomainKey]) {
// let streams = streamsByDomain[domain.toLowerCase()];
// const stream = createStreamWithDestinations(streamConfig, destinationsBuffer, backupEnabled);
// streams = streams ? { ...streams, [stream.stream.id]: stream } : { [stream.stream.id]: stream };
// streamsByDomain[domain.toLowerCase()] = streams;
// }
// };
// // to make sure we have all the streams in redis (even with no destinations) fill the buffer with all the streams first
// await selectConfigObjectRows("stream", db, async row => {
// const obj = ConfigurationObjectDbModel.parse(prefixedWith(row, "obj_"));
// const streamConfig = flatten(obj);
// addStreamByDomain(streamConfig, [], row.backupEnabled);
// });

let destinationsBuffer: ShortDestinationConfig[] = [];
let lastStreamConfig: StreamConfig | undefined = undefined;
let lastBackupEnabled: boolean = false;
let streamsByDomain: Record<string, Record<string, StreamWithDestinations>> = {};
const noDomainKey = "no-domain";
const addStreamByDomain = (
streamConfig: StreamConfig,
destinationsBuffer: ShortDestinationConfig[],
backupEnabled: boolean
) => {
for (const domain of streamConfig.domains?.length ? streamConfig.domains : [noDomainKey]) {
let streams = streamsByDomain[domain.toLowerCase()];
const stream = createStreamWithDestinations(streamConfig, destinationsBuffer, backupEnabled);
streams = streams ? { ...streams, [stream.stream.id]: stream } : { [stream.stream.id]: stream };
streamsByDomain[domain.toLowerCase()] = streams;
}
};
if (fastStoreEnabled) {
// to make sure we have all the streams in redis (even with no destinations) fill the buffer with all the streams first
await selectConfigObjectRows("stream", db, async row => {
const obj = ConfigurationObjectDbModel.parse(prefixedWith(row, "obj_"));
const streamConfig = flatten(obj);
addStreamByDomain(streamConfig, [], row.backupEnabled);
});
}

await db.pgHelper().streamQuery(query, async row => {
const workspaceId = row.workspaceId;
Expand Down Expand Up @@ -350,53 +357,65 @@ async function saveConnectionsToRedis(db: DatabaseConnection) {
backupDestCreated[row.workspaceId] = true;
}

// // when we reach new stream, we need to save destinationsBuffer for the previous stream
// if (lastStreamConfig && streamConfig.id !== lastStreamConfig.id) {
// addStreamByDomain(lastStreamConfig, destinationsBuffer, row.backupEnabled);
// destinationsBuffer = [];
// }
// destinationsBuffer.push(destinationConfig);
// lastStreamConfig = streamConfig;
// lastBackupEnabled = row.backupEnabled;
if (fastStoreEnabled) {
// when we reach new stream, we need to save destinationsBuffer for the previous stream
if (lastStreamConfig && streamConfig.id !== lastStreamConfig.id) {
addStreamByDomain(lastStreamConfig, destinationsBuffer, row.backupEnabled);
destinationsBuffer = [];
}
destinationsBuffer.push(destinationConfig);
lastStreamConfig = streamConfig;
lastBackupEnabled = row.backupEnabled;
}
});

// // save the last stream
// if (lastStreamConfig) {
// addStreamByDomain(lastStreamConfig, destinationsBuffer, lastBackupEnabled);
// }
// for (const [domain, streamDsts] of Object.entries(streamsByDomain)) {
// //store array of StreamWithDestinations by domain
// await redis().hset(domainsRedisKey.tmp(), domain.toLowerCase(), JSON.stringify(Object.values(streamDsts)));
//
// for (const id in streamDsts) {
// const strm = streamDsts[id];
// //store each StreamWithDestinations by stream id separately
// await redis().hset(idsRedisKey.tmp(), id, JSON.stringify(strm));
//
// await Promise.all(
// (strm.stream.publicKeys ?? [])
// .filter(k => !!k.hash)
// .map(k =>
// redis().hset(
// apiKeys.tmp(),
// k.id,
// JSON.stringify({ hash: k.hash as string, streamId: id, keyType: "browser" })
// )
// )
// );
// await Promise.all(
// (strm.stream.privateKeys ?? [])
// .filter(k => !!k.hash)
// .map(k =>
// redis().hset(apiKeys.tmp(), k.id, JSON.stringify({ hash: k.hash as string, streamId: id, keyType: "s2s" }))
// )
// );
// }
// }

// await apiKeys.rename(redis());
// await idsRedisKey.rename(redis());
// await domainsRedisKey.rename(redis());
if (fastStoreEnabled) {
// save the last stream
if (lastStreamConfig) {
addStreamByDomain(lastStreamConfig, destinationsBuffer, lastBackupEnabled);
}
for (const [domain, streamDsts] of Object.entries(streamsByDomain)) {
//store array of StreamWithDestinations by domain
await redis().hset(domainsRedisKey.tmp(), domain.toLowerCase(), JSON.stringify(Object.values(streamDsts)));

for (const id in streamDsts) {
const strm = streamDsts[id];
//store each StreamWithDestinations by stream id separately
await redis().hset(idsRedisKey.tmp(), id, JSON.stringify(strm));

await Promise.all(
(strm.stream.publicKeys ?? [])
.filter(k => !!k.hash)
.map(k =>
redis().hset(
apiKeys.tmp(),
k.id,
JSON.stringify({ hash: k.hash as string, streamId: id, keyType: "browser" })
)
)
);
await Promise.all(
(strm.stream.privateKeys ?? [])
.filter(k => !!k.hash)
.map(k =>
redis().hset(
apiKeys.tmp(),
k.id,
JSON.stringify({
hash: k.hash as string,
streamId: id,
keyType: "s2s",
})
)
)
);
}
}

await apiKeys.rename(redis());
await idsRedisKey.rename(redis());
await domainsRedisKey.rename(redis());
}
await bulkerRedisKey.rename(redis());
}

Expand Down Expand Up @@ -431,7 +450,9 @@ export function createFastStore({ db }: { db: DatabaseConnection }): FastStore {
},
async fullRefresh() {
const sw = stopwatch();
await saveConfigObjectsToRedis(["function"], db);
if (!isTruish(process.env.DISABLE_FAST_STORE)) {
await saveConfigObjectsToRedis(["function"], db);
}
await saveConnectionsToRedis(db);
getServerLog().atInfo().log("Export to redis took", sw.elapsedPretty());
},
Expand Down

1 comment on commit 1135ef1

@vercel
Copy link

@vercel vercel bot commented on 1135ef1 Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

new-jitsu – ./webapps/console

ag.ru
logu.au
ozon.ru
sse.ere
erxes.io
baidu.dom
ilmiya.io
sambla.se
bobsec.com
sambla.com
agro4u.life
bluetick.ai
myilmiya.io
protontv.eu
t.quenti.io
alicesec.com
d.askloan.tw
dev.aclis.io
docs.dh19.de
docs.dh19.eu
joseviso.com
mydomain.dom
t.democo.dev
t.shoppub.io
t2.jitsu.com
timeplus.com
zoopsign.com
*.d.jitsu.com
beta.mitzu.io
d.versatus.io
data.light.so
data.loudy.co
data.schej.it
dog.jitsu.com
imusician.app
imusician.pro
jitsu.logu.au
jitsu.www1.ru
t.thequack.ai
thinkr.com.br
use.jitsu.com
usepolygon.io
www.sambla.se
ajewellers.com
data.uselog.io
gpt.whatfa.com
sidetrekai.com
t.papermark.io
t.saasmonk.app
use2.jitsu.com
w.d2.jitsu.com
www.kellen.top
*.dataspecc.com
app.bluetick.ai
caddy.jitsu.com
data.askloan.tw
enterticket.com
events.mitzu.io
ildar.jitsu.com
jitsu.efeer.com
jitsu.ivve.tech
krestomatio.com
sevenbillion.co
w2.d2.jitsu.com
xrt.webxr.tools
app.jotverse.com
caddy2.jitsu.com
cname2.jitsu.com
data.mysitee.com
data.toptere.com
dev-t.democo.dev
events.quenti.io
utils.doogma.com
worthsystems.com
data.music2me.com
data.timeplus.com
event-gateway.com
https.bluetick.ai
teste.fazcomex.com.br
analytics.dev.knekt.io
loraboutiquedental.com
notion.twelftree.co.uk
dev-portal.zoopsign.com
event.tradejobsnz.co.nz
investing-poc.jitsu.dev
savvy-replay.jitsu.tech
data.analytics-smart.com
data.handelsregister.app
event.clickncruise.co.uk
jt.fairhopeweb.github.io
savvy-replay2.jitsu.tech
savvy-replay3.jitsu.tech
savvy-replay4.jitsu.tech
track.alquimiaweb.com.br
track.pressance-group.jp
track.uniquecafes.com.br
colectha.agenciavoolu.com
kolectha.agenciavoolu.com
lp.loraboutiquedental.com
stage-portal.zoopsign.com

Please sign in to comment.