Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/newjitsu/rotor http #1075

Merged
merged 5 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions libs/core-functions/__tests__/lib/mem-store.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { AnonymousEventsStore, SetOpts, Store } from "@jitsu/protocols/functions";
import { AnonymousEventsStore, SetOpts, Store, TTLStore } from "@jitsu/protocols/functions";
import { AnalyticsServerEvent } from "@jitsu/protocols/analytics";

export function createStore(): Store {
export function createStore(): TTLStore {
return {
del(key: string): Promise<void> {
throw new Error("Method not implemented.");
},
get(key: string): Promise<any> {
throw new Error("Method not implemented.");
},
getWithTTL(key: string): Promise<{ value: any; ttl: number } | undefined> {
throw new Error("Method not implemented.");
},
set(key: string, value: any, opts?: SetOpts): Promise<void> {
throw new Error("Method not implemented.");
},
Expand Down
24 changes: 14 additions & 10 deletions libs/core-functions/src/functions/amplitude-destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,26 @@ const AmplitudeDestination: JitsuFunction<AnalyticsServerEvent, AmplitudeDestina
) => {
try {
const deviceId = event.anonymousId;
let sessionId: string | undefined = undefined;
let sessionId: number | undefined = undefined;
if (deviceId) {
const systemContext = requireDefined((ctx as any as SystemContext).$system, `$system context is not available`);
const ttlStore = systemContext.store;
const ttlSec = 60 * (props.sessionWindow ?? 30);
const sessionKey = `${ctx.source.id}_${deviceId}_sess`;
const newSession = new Date().getTime();
const savedSession = await ttlStore.get(sessionKey);
if (savedSession) {
log.debug(
`Amplitude session found: ${savedSession} for deviceId: ${deviceId} ttl: ${await ttlStore.ttl(sessionKey)}`
);
const savedSessionValue = await ttlStore.getWithTTL(sessionKey);
if (savedSessionValue) {
sessionId = savedSessionValue.value;
const ttl = savedSessionValue.ttl;
log.debug(`Amplitude session found: ${sessionId} for deviceId: ${deviceId} ttl: ${ttl}`);
if (ttl < ttlSec - 60) {
// refresh ttl not often than once per minute
await ttlStore.set(sessionKey, sessionId, { ttl: ttlSec });
}
} else {
log.debug(`Amplitude session not found for deviceId: ${deviceId} new session: ${newSession}`);
sessionId = new Date().getTime();
log.debug(`Amplitude session not found for deviceId: ${deviceId} new session: ${sessionId}`);
await ttlStore.set(sessionKey, sessionId, { ttl: ttlSec });
}
sessionId = savedSession || newSession;
await ttlStore.set(sessionKey, sessionId, { ttl: 60 * (props.sessionWindow ?? 30) });
}
const groupType = props.groupType || "group";
const endpoint =
Expand Down
7 changes: 6 additions & 1 deletion libs/core-functions/src/functions/ga4-destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ function pageViewEvent(event: AnalyticsServerEvent): Ga4Event {
};
}

function adjustName(name: string): string {
name.replace(/[^a-zA-Z0-9_]/g, "_");
return name.substring(0, 40);
}

function trackEvent(event: AnalyticsServerEvent): Ga4Event {
const evp = event.properties || {};
let params: Record<string, any> = {};
Expand Down Expand Up @@ -275,7 +280,7 @@ function trackEvent(event: AnalyticsServerEvent): Ga4Event {
params.items = getItems(event);
break;
default:
name = eventName;
name = adjustName(eventName);
params = { ...evp };
params = removeProperties(params, StandardProperties);
params.currency = evp.currency;
Expand Down
5 changes: 3 additions & 2 deletions libs/core-functions/src/functions/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
ProcessingContext,
ServerContextReservedProps,
} from "@jitsu/protocols/analytics";
import { AnonymousEventsStore, Store } from "@jitsu/protocols/functions";
import { AnonymousEventsStore, EventsStore, TTLStore } from "@jitsu/protocols/functions";

export type MetricsMeta = {
workspaceId: string;
Expand All @@ -24,7 +24,8 @@ export type SystemContext = {
$system: {
anonymousEventsStore: AnonymousEventsStore;
metricsMeta: MetricsMeta;
store: Store;
store: TTLStore;
eventsStore: EventsStore;
};
};

Expand Down
2 changes: 1 addition & 1 deletion libs/core-functions/src/functions/lib/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async function createClient() {
log.atInfo().log(`Connecting to MongoDB server...`);

// Create a new MongoClient
const client = new MongoClient(mongodbURL, { compressors: ["zstd"] });
const client = new MongoClient(mongodbURL);
// Connect the client to the server (optional starting in v4.7)
await client.connect();
// Establish and verify connection
Expand Down
15 changes: 13 additions & 2 deletions libs/core-functions/src/functions/lib/store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SetOpts, Store } from "@jitsu/protocols/functions";
import { SetOpts, Store, TTLStore } from "@jitsu/protocols/functions";
import type { Redis } from "ioredis";
import parse from "parse-duration";
import type { MongoClient } from "mongodb";
Expand Down Expand Up @@ -62,7 +62,7 @@ export const createTtlStore = (namespace: string, redisClient: Redis, defaultTtl
},
});

export const createMongoStore = (namespace: string, mongo: MongoClient, defaultTtlSec: number): Store => {
export const createMongoStore = (namespace: string, mongo: MongoClient, defaultTtlSec: number): TTLStore => {
interface StoreValue {
_id: string;
value: any;
Expand Down Expand Up @@ -101,6 +101,17 @@ export const createMongoStore = (namespace: string, mongo: MongoClient, defaultT
.findOne({ _id: key }, { readPreference: "nearest" });
return res ? res.value : undefined;
},
getWithTTL: async (key: string) => {
const res = await mongo
.db(dbName)
.collection<StoreValue>(namespace)
.findOne({ _id: key }, { readPreference: "nearest" });
if (!res) {
return undefined;
}
const ttl = res.expireAt ? Math.max(Math.floor((res.expireAt.getTime() - new Date().getTime()) / 1000), 0) : -1;
return { value: res.value, ttl };
},
set: async (key: string, obj: any, opts?: SetOpts) => {
await ensureCollection();
const colObj: any = { value: obj };
Expand Down
1 change: 1 addition & 0 deletions libs/core-functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ export * as posthogDestination from "./functions/posthog-destination";
export * as mongodbDestination from "./functions/mongodb-destination";
export { mongodb, mongoAnonymousEventsStore } from "./functions/lib/mongodb";
export type { SystemContext, MetricsMeta } from "./functions/lib/index";
export { httpAgent, httpsAgent } from "./functions/lib/http-agent";
export * from "./functions/lib/store";
export * from "./functions/lib/ua";
Loading
Loading