From 11305d068c9d7fc2f05804dfed8ad49bfbdfa473 Mon Sep 17 00:00:00 2001 From: Vladimir Klimontovich Date: Wed, 22 Nov 2023 14:42:49 -0500 Subject: [PATCH 1/2] feat: product analytics improvements, now certain events are being sent server to server --- libs/jitsu-js/src/analytics-plugin.ts | 41 ++++-- libs/jitsu-js/src/index.ts | 14 +- types/protocols/analytics.d.ts | 2 +- .../PageLayout/WorkspacePageLayout.tsx | 12 +- .../WorkspaceNameAndSlugEditor.tsx | 11 +- webapps/console/lib/nextauth.config.ts | 4 + webapps/console/lib/schema/index.ts | 1 - webapps/console/lib/server/telemetry.ts | 126 +++++++++++++++--- webapps/console/pages/_app.tsx | 1 - .../api/[workspaceId]/config/[type]/index.ts | 10 +- webapps/console/pages/api/app-config.ts | 7 +- webapps/console/pages/api/init-user.ts | 8 +- webapps/console/pages/api/ping.ts | 2 +- .../workspace/[workspaceIdOrSlug]/index.ts | 23 +++- webapps/console/pages/api/workspace/index.ts | 3 +- 15 files changed, 217 insertions(+), 48 deletions(-) diff --git a/libs/jitsu-js/src/analytics-plugin.ts b/libs/jitsu-js/src/analytics-plugin.ts index 7b27ac9f1..76b642d2b 100644 --- a/libs/jitsu-js/src/analytics-plugin.ts +++ b/libs/jitsu-js/src/analytics-plugin.ts @@ -1,7 +1,7 @@ /* global analytics */ import { JitsuOptions, PersistentStorage, RuntimeFacade } from "./jitsu"; -import { AnalyticsClientEvent, Callback, ID, JSONObject, Options } from "@jitsu/protocols/analytics"; +import { AnalyticsClientEvent, Callback, DispatchedEvent, ID, JSONObject, Options } from "@jitsu/protocols/analytics"; import parse from "./index"; import { AnalyticsInstance, AnalyticsPlugin } from "analytics"; @@ -63,11 +63,18 @@ function safeCall(f: () => T, defaultVal?: T): T | undefined { } function restoreTraits(storage: PersistentStorage) { - const val = storage.getItem("__user_traits"); + let val = storage.getItem("__user_traits"); if (typeof val === "string") { - return safeCall(() => JSON.parse(val), {}); + val = safeCall(() => JSON.parse(val), {}); } - return val; + let groupVal = storage.getItem("__group_traits"); + if (typeof groupVal === "string") { + groupVal = safeCall(() => JSON.parse(groupVal), {}); + } + return { + ...(groupVal || {}), + ...(val || {}), //user traits override group traits + }; } export type StorageFactory = (cookieDomain: string, cookie2key: Record) => PersistentStorage; @@ -326,6 +333,7 @@ function adjustPayload(payload: any, config: JitsuOptions, storage: PersistentSt sentAt: new Date().toISOString(), messageId: randomId(properties.path || (parsedUrl && parsedUrl.pathname)), writeKey: maskWriteKey(config.writeKey), + groupId: storage.getItem("__group_id"), context: deepMerge(context, customContext), }; delete withContext.meta; @@ -478,7 +486,7 @@ async function send( jitsuConfig: Required, instance: AnalyticsInstance, store: PersistentStorage -): Promise { +): Promise { if (jitsuConfig.echoEvents) { console.log(`[JITSU DEBUG] sending '${method}' event:`, payload); return; @@ -544,11 +552,19 @@ async function send( } if (responseJson.destinations) { - if (jitsuConfig.debug) { - console.log(`[JITSU] Processing device destinations: `, JSON.stringify(responseJson.destinations, null, 2)); + if (jitsuConfig.s2s) { + console.warn( + `[JITSU] ${payload.type} responded with list of ${responseJson.destinations.length} destinations. However, this code is running in server-to-server mode, so destinations will be ignored`, + jitsuConfig.debug ? JSON.stringify(responseJson.destinations, null, 2) : undefined + ); + } else { + if (jitsuConfig.debug) { + console.log(`[JITSU] Processing device destinations: `, JSON.stringify(responseJson.destinations, null, 2)); + } + return processDestinations(responseJson.destinations, method, adjustedPayload, !!jitsuConfig.debug, instance); } - return processDestinations(responseJson.destinations, method, adjustedPayload, !!jitsuConfig.debug, instance); } + return adjustedPayload; } export type JitsuPluginConfig = JitsuOptions & { @@ -610,6 +626,11 @@ const jitsuAnalyticsPlugin = (pluginConfig: JitsuPluginConfig = {}): AnalyticsPl methods: { //analytics doesn't support group as a base method, so we need to add it manually group(groupId?: ID, traits?: JSONObject | null, options?: Options, callback?: Callback) { + if (typeof groupId === "number") { + //fix potential issues with group id being used incorrectly + groupId = groupId + ""; + } + const analyticsInstance = this.instance; const cacheWrap = pluginConfig.storageWrapper ? pluginConfig.storageWrapper(analyticsInstance.storage) @@ -617,6 +638,10 @@ const jitsuAnalyticsPlugin = (pluginConfig: JitsuPluginConfig = {}): AnalyticsPl const user = analyticsInstance.user(); const userId = options?.userId || user?.userId; const anonymousId = options?.anonymousId || user?.anonymousId || cacheWrap.getItem("__anon_id"); + cacheWrap.setItem("__group_id", groupId); + if (traits && typeof traits === "object") { + cacheWrap.setItem("__group_traits", traits); + } return send( "group", { type: "group", groupId, traits, ...(anonymousId ? { anonymousId } : {}), ...(userId ? { userId } : {}) }, diff --git a/libs/jitsu-js/src/index.ts b/libs/jitsu-js/src/index.ts index ff6553462..4d1ab9521 100644 --- a/libs/jitsu-js/src/index.ts +++ b/libs/jitsu-js/src/index.ts @@ -133,13 +133,21 @@ function createUnderlyingAnalyticsInstance( (analytics as any).setAnonymousId(id); } }, - group(groupId?: ID, traits?: JSONObject | null, options?: Options, callback?: Callback): Promise { + async group( + groupId?: ID, + traits?: JSONObject | null, + options?: Options, + callback?: Callback + ): Promise { + const results: any[] = []; for (const plugin of Object.values(analytics.plugins)) { if (plugin["group"]) { - plugin["group"](groupId, traits, options, callback); + results.push(await plugin["group"](groupId, traits, options, callback)); } } - return Promise.resolve({}); + //It's incorrect at many levels. First, it's not a dispatched event. Second, we take a first result + //However, since returned values are used for debugging purposes only, it's ok + return results[0]; }, } as AnalyticsInterface; } diff --git a/types/protocols/analytics.d.ts b/types/protocols/analytics.d.ts index b0fb8afa9..ac056cd3c 100644 --- a/types/protocols/analytics.d.ts +++ b/types/protocols/analytics.d.ts @@ -63,7 +63,7 @@ interface ProcessingContext { export type AnalyticsServerEvent = AnalyticsClientEvent & ServerContext & ProcessingContext; -export type JSONPrimitive = string | number | boolean | null; +export type JSONPrimitive = string | number | boolean | null | undefined; export type JSONValue = JSONPrimitive | JSONObject | JSONArray; export type JSONObject = { [member: string]: JSONValue }; export type JSONArray = Array; diff --git a/webapps/console/components/PageLayout/WorkspacePageLayout.tsx b/webapps/console/components/PageLayout/WorkspacePageLayout.tsx index 394859573..0395d4c82 100644 --- a/webapps/console/components/PageLayout/WorkspacePageLayout.tsx +++ b/webapps/console/components/PageLayout/WorkspacePageLayout.tsx @@ -577,7 +577,14 @@ function PageHeader() { ); } -const WorkspaceSettingsModal: React.FC<{ onSuccess: () => void }> = ({ onSuccess }) => { +/** + * @param onboarding if the dialog is shown on onboarding page. For onboarding, + * we should issue an event that onboarding is completed + */ +const WorkspaceSettingsModal: React.FC<{ onSuccess: () => void; onboarding: boolean }> = ({ + onSuccess, + onboarding, +}) => { const appConfig = useAppConfig(); const domains = getDomains(appConfig); const { analytics } = useJitsu(); @@ -610,7 +617,7 @@ const WorkspaceSettingsModal: React.FC<{ onSuccess: () => void }> = ({ onSuccess {domains.appBase}/your-slug {" "} - +
Got here by mistake?{" "} > =
{!workspace.slug && ( { router.reload(); }} diff --git a/webapps/console/components/WorkspaceNameAndSlugEditor/WorkspaceNameAndSlugEditor.tsx b/webapps/console/components/WorkspaceNameAndSlugEditor/WorkspaceNameAndSlugEditor.tsx index f43bd4d76..fd99582d1 100644 --- a/webapps/console/components/WorkspaceNameAndSlugEditor/WorkspaceNameAndSlugEditor.tsx +++ b/webapps/console/components/WorkspaceNameAndSlugEditor/WorkspaceNameAndSlugEditor.tsx @@ -28,14 +28,20 @@ function pickSlug(email, name): string { return ensureLength(username.replace(/[^a-z0-9]/g, "")); } +/** + * @param onboarding if the dialog is shown on onboarding page. For onboarding, + * we should issue an event that onboarding is completed + */ export function WorkspaceNameAndSlugEditor({ onSuccess, displayId, offerClassic, + onboarding, }: { onSuccess?: (newVals: { name: string; slug: string }) => void; displayId?: boolean; offerClassic?: boolean; + onboarding?: boolean; }) { const workspace = useWorkspace(); const appConfig = useAppConfig(); @@ -144,7 +150,10 @@ export function WorkspaceNameAndSlugEditor({ return; } } - await get(`/api/workspace/${workspace.id}`, { method: "PUT", body: { name, slug } }); + await get(`/api/workspace/${workspace.id}?onboarding=${!!onboarding}`, { + method: "PUT", + body: { name, slug }, + }); feedbackSuccess("Workspace name has been saved"); if (onSuccess) { onSuccess({ name, slug }); diff --git a/webapps/console/lib/nextauth.config.ts b/webapps/console/lib/nextauth.config.ts index 876bd0750..daa885566 100644 --- a/webapps/console/lib/nextauth.config.ts +++ b/webapps/console/lib/nextauth.config.ts @@ -6,6 +6,7 @@ import { db } from "./server/db"; import { checkHash, requireDefined } from "juava"; import { ApiError } from "./shared/errors"; import { getServerLog } from "./server/log"; +import { withProductAnalytics } from "./server/telemetry"; const crypto = require("crypto"); @@ -88,6 +89,9 @@ export async function getOrCreateUser(opts: { admin, }, }); + await withProductAnalytics(p => p.track("user_created"), { + user: { email, name, internalId: user.id, externalId, loginProvider }, + }); } else if (user.name !== name || user.email !== email) { await db.prisma().userProfile.update({ where: { id: user.id }, data: { name, email } }); } diff --git a/webapps/console/lib/schema/index.ts b/webapps/console/lib/schema/index.ts index 60aff18c4..7a67519e1 100644 --- a/webapps/console/lib/schema/index.ts +++ b/webapps/console/lib/schema/index.ts @@ -75,7 +75,6 @@ export const AppConfig = z.object({ frontendTelemetry: z.object({ enabled: z.boolean(), host: z.string().optional(), - writeKey: z.string().optional(), }), logLevel: z.enum(["debug", "info", "warn", "error"]), syncs: z.object({ diff --git a/webapps/console/lib/server/telemetry.ts b/webapps/console/lib/server/telemetry.ts index 3913039a9..2b7bbb70d 100644 --- a/webapps/console/lib/server/telemetry.ts +++ b/webapps/console/lib/server/telemetry.ts @@ -1,42 +1,136 @@ import { db } from "./db"; import { getLog, randomId } from "juava"; import { isTruish } from "../shared/chores"; -import { emptyAnalytics, jitsuAnalytics } from "@jitsu/js/compiled/src"; +import { AnalyticsInterface, emptyAnalytics, jitsuAnalytics } from "@jitsu/js/compiled/src"; +import { SessionUser } from "../schema"; +import { Workspace } from "@prisma/client"; /** * Server telemetry is enabled by default. We need it to see the usage * of self-hosted instance. It's disabled for Jitsu Cloud. */ -export const serverTelemetryEnabled = !isTruish(process.env.DISABLE_JITSU_TELEMETRY); -export const serverTelemetryJitsuKey = - process.env.JITSU_SERVER_TELEMETRY_KEY || "n5ZDLVfXZD5JkqpcSly8I66xz3we16sv:YszSaTV83VAc6vNcFuz7Ry9dCHHV8zJY"; +export const anonymousTelemetryEnabled = !isTruish(process.env.JITSU_DISABLE_ANONYMOUS_TELEMETRY); +export const anonymousTelemetryJitsuKey = + process.env.JITSU_SERVER_ANONYMOUS_TELEMETRY_KEY || + "n5ZDLVfXZD5JkqpcSly8I66xz3we16sv:YszSaTV83VAc6vNcFuz7Ry9dCHHV8zJY"; /** * Frontend telemetry is opposite from server telemetry. It's disabled by default, * since we need it ONLY for Jitsu Cloud. We don't need to track UI usage of self-hosted instances */ -export const frontendTelemetryHost = process.env.TELEMETRY_HOST || process.env.JITSU_FRONTEND_TELEMETRY_HOST; //support old and new env vars - -export const frontendTelemetryEnabled = !!frontendTelemetryHost; - -//support old and new env vars. Do we ever need to use telemetry key? -export const frontendTelemetryWriteKey = - process.env.TELEMETRY_WRITE_KEY || process.env.JITSU_FRONTEND_TELEMETRY_WRITE_KEY; +export const productTelemetryHost = process.env.JITSU_PRODUCT_TELEMETRY_HOST; //support old and new env vars +export const productTelemetryEnabled = !!productTelemetryHost; +export const productTelemetryBackendKey = process.env.JITSU_PRODUCT_BACKEND_TELEMETRY_WRITE_KEY; const log = getLog("telemetry"); -const jitsu = serverTelemetryEnabled +const anonymousTelemetry = anonymousTelemetryEnabled ? jitsuAnalytics({ host: "https://use.jitsu.com", - writeKey: serverTelemetryJitsuKey, - debug: true, + writeKey: anonymousTelemetryJitsuKey, }) : emptyAnalytics; +function createAnalytics() { + return productTelemetryEnabled + ? jitsuAnalytics({ + host: productTelemetryHost, + writeKey: productTelemetryBackendKey, + s2s: true, + //debug: true, + }) + : emptyAnalytics; +} + +type WorkspaceProps = { slug?: string; name?: string }; +type WorkspaceIdAndProps = { id: string } & WorkspaceProps; + +type TrackEvents = + | "user_created" + | "workspace_created" + | "workspace_onboarded" + | "workspace_ping" + | "workspace_access" + | "create_object"; + +export interface ProductAnalytics extends AnalyticsInterface { + identifyUser(sessionUser: TrackedUser): Promise; + + workspace(workspaceId: string, opts?: WorkspaceProps): Promise; + + workspace(workspace: Workspace): Promise; + + /** + * Typed version of track method + */ + track(event: TrackEvents, props?: any): Promise; +} + +function createProductAnalytics(analytics: AnalyticsInterface): ProductAnalytics { + return { + identifyUser(sessionUser: SessionUser): Promise { + return analytics.identify(sessionUser.internalId, { + email: sessionUser.email, + name: sessionUser.name, + externalId: sessionUser.externalId, + }); + }, + workspace(idOrObject: string | Workspace, opts?: WorkspaceProps) { + if (typeof idOrObject === "string") { + return analytics.group(idOrObject, opts ? { workspaceSlug: opts.slug, workspaceName: opts.name } : {}); + } else { + return analytics.group(idOrObject.id, { workspaceSlug: idOrObject.slug, workspaceName: idOrObject.name }); + } + }, + ...analytics, + }; +} + +export type TrackedUser = Pick; + +/** + * Entry point for all analytics events. The method makes sure that all indentify events + * are properly sent, and calls a `callback` on configured analytics instance. + */ +export function withProductAnalytics( + callback: (p: ProductAnalytics) => Promise, + opts: { + user: TrackedUser; + workspace?: Workspace | WorkspaceIdAndProps; + } +): Promise { + //we create new instance every time since analytics.js saves state in props and not thread safe + //creating of an instance is cheap operation + const instance = createProductAnalytics(createAnalytics()); + const allPromises: Promise[] = []; + if (opts.user) { + allPromises.push(instance.identifyUser(opts.user)); + } + if (opts.workspace) { + allPromises.push( + instance.workspace(opts.workspace.id, { + slug: opts.workspace.slug || undefined, + name: opts.workspace.name || undefined, + }) + ); + } + allPromises.push( + (async () => { + try { + return await callback(instance); + } catch (e) { + log.atWarn().withCause(e).log("Failed to send product analytics event"); + } + return {}; + })() + ); + return Promise.all(allPromises); +} + let deploymentId = "unknown"; export async function initTelemetry(): Promise<{ deploymentId: string }> { - if (serverTelemetryEnabled) { + if (anonymousTelemetryEnabled) { try { const instanceIdVal = await db.prisma().globalProps.findFirst({ where: { name: "deploymentId" } }); if (instanceIdVal) { @@ -55,7 +149,7 @@ export async function initTelemetry(): Promise<{ deploymentId: string }> { export async function trackTelemetryEvent(event: string, props: any = {}): Promise { try { - const result = await jitsu.track(`console.${event}`, { + const result = await anonymousTelemetry.track(`console.${event}`, { ...props, deploymentId, source: "console", diff --git a/webapps/console/pages/_app.tsx b/webapps/console/pages/_app.tsx index bea6786c3..c4aadb36a 100644 --- a/webapps/console/pages/_app.tsx +++ b/webapps/console/pages/_app.tsx @@ -389,7 +389,6 @@ function AppLoader({ children, pageProps }: PropsWithChildren) { //debug: data?.logLevel === "debug", debug: false, host: trackingHost, - writeKey: data!.frontendTelemetry.writeKey, } : { disabled: true } } diff --git a/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts b/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts index 153191d47..eaadabe80 100644 --- a/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts +++ b/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts @@ -7,7 +7,7 @@ import { getConfigObjectType, parseObject } from "../../../../../lib/schema/conf import { ApiError } from "../../../../../lib/shared/errors"; import { isReadOnly } from "../../../../../lib/server/read-only-mode"; import { enableAuditLog } from "../../../../../lib/server/audit-log"; -import { trackTelemetryEvent } from "../../../../../lib/server/telemetry"; +import { trackTelemetryEvent, withProductAnalytics } from "../../../../../lib/server/telemetry"; export const config = { api: { @@ -65,6 +65,14 @@ export const api: Api = { }); await trackTelemetryEvent("config-object-delete", { objectType: type }); await fastStore.fullRefresh(); + await withProductAnalytics( + p => + p.track("create_object", { + objectType: type, + }), + //there's no workspace name / id available. Maybe that's fine? + { user, workspace: { id: workspaceId } } + ); if (enableAuditLog) { await db.prisma().auditLog.create({ data: { diff --git a/webapps/console/pages/api/app-config.ts b/webapps/console/pages/api/app-config.ts index d637c11c9..5ef0b3505 100644 --- a/webapps/console/pages/api/app-config.ts +++ b/webapps/console/pages/api/app-config.ts @@ -7,7 +7,7 @@ import { isFirebaseEnabled, requireFirebaseOptions } from "../../lib/server/fire import { nangoConfig } from "../../lib/server/oauth/nango-config"; import { isTruish } from "../../lib/shared/chores"; import { readOnlyUntil } from "../../lib/server/read-only-mode"; -import { frontendTelemetryEnabled, frontendTelemetryHost, frontendTelemetryWriteKey } from "../../lib/server/telemetry"; +import { productTelemetryEnabled, productTelemetryHost } from "../../lib/server/telemetry"; export default createRoute() .GET({ result: AppConfig, auth: false }) @@ -39,9 +39,8 @@ export default createRoute() }, jitsuClassicUrl: process.env.JITSU_CLASSIC_URL || "https://cloud.jitsu.com", frontendTelemetry: { - enabled: frontendTelemetryEnabled, - host: frontendTelemetryHost === "__self__" ? publicEndpoints.baseUrl : frontendTelemetryHost, - writeKey: frontendTelemetryWriteKey, + enabled: productTelemetryEnabled, + host: productTelemetryHost === "__self__" ? publicEndpoints.baseUrl : productTelemetryHost, }, publicEndpoints: { protocol: publicEndpoints.protocol, diff --git a/webapps/console/pages/api/init-user.ts b/webapps/console/pages/api/init-user.ts index 734178f05..2c6a07a93 100644 --- a/webapps/console/pages/api/init-user.ts +++ b/webapps/console/pages/api/init-user.ts @@ -7,7 +7,7 @@ import { publicEmailDomains } from "../../lib/shared/email-domains"; import { getUserPreferenceService } from "../../lib/server/user-preferences"; import { ApiError } from "../../lib/shared/errors"; import { z } from "zod"; -import { initTelemetry } from "../../lib/server/telemetry"; +import { initTelemetry, withProductAnalytics } from "../../lib/server/telemetry"; function capitalize(s: string) { return s.charAt(0).toUpperCase() + s.slice(1); @@ -38,7 +38,7 @@ export default createRoute() }), }) .handler(async ({ query, user }) => { - initTelemetry(); + await initTelemetry(); getServerLog() .atInfo() .log(`Looking for workspace for user ${JSON.stringify(user)}`); @@ -93,9 +93,7 @@ export default createRoute() externalId: user.externalId, }, }); - await db.prisma().$queryRaw`UPDATE "UserProfile" - SET "id" = ${user.internalId} - WHERE "id" = ${newUser.id}`; + await withProductAnalytics(p => p.track("user_created"), { user: { ...newUser, internalId: newUser.id } }); } const newWorkspace = await db .prisma() diff --git a/webapps/console/pages/api/ping.ts b/webapps/console/pages/api/ping.ts index 11577e17b..4672ef5a2 100644 --- a/webapps/console/pages/api/ping.ts +++ b/webapps/console/pages/api/ping.ts @@ -3,7 +3,7 @@ import { initTelemetry, trackTelemetryEvent } from "../../lib/server/telemetry"; export default createRoute() .GET({ auth: false }) - .handler(async () => { + .handler(async ({ user }) => { const { deploymentId } = await initTelemetry(); await trackTelemetryEvent("ping"); return { diff --git a/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts b/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts index d7b2b0f5a..82f60c293 100644 --- a/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts +++ b/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts @@ -5,6 +5,7 @@ import { ApiError } from "../../../../lib/shared/errors"; import { getUserPreferenceService } from "../../../../lib/server/user-preferences"; import { getServerLog } from "../../../../lib/server/log"; import { SessionUser } from "../../../../lib/schema"; +import { withProductAnalytics } from "../../../../lib/server/telemetry"; const log = getServerLog(); @@ -54,6 +55,14 @@ export const api: Api = { { status: 403 } ); } + //if slug is not set, means that workspace is not yet onboarded. We shouldn't track + if (workspace.slug) { + //send event asynchronously to prevent increased response time + //theoretically, event can get lost, however this is not the type of event that + //requires 100% reliability + withProductAnalytics(callback => callback.track("workspace_access"), { user, workspace }); + } + //it doesn't have to by sync since the preferences are optional savePreferences(user, workspace).catch(e => { log @@ -68,13 +77,21 @@ export const api: Api = { auth: true, types: { body: z.object({ name: z.string(), slug: z.string() }), - query: z.object({ workspaceIdOrSlug: z.string() }), + query: z.object({ + //true if the changed done during onboarding + onboarding: z.boolean().optional(), + workspaceIdOrSlug: z.string(), + }), }, - handle: async ({ query: { workspaceIdOrSlug }, body, user }) => { + handle: async ({ query: { workspaceIdOrSlug, onboarding }, body, user }) => { await verifyAccess(user, workspaceIdOrSlug); - return await db + const workspace = await db .prisma() .workspace.update({ where: { id: workspaceIdOrSlug }, data: { name: body.name, slug: body.slug } }); + if (onboarding) { + await withProductAnalytics(callback => callback.track("workspace_onboarded"), { user, workspace }); + } + return workspace; }, }, }; diff --git a/webapps/console/pages/api/workspace/index.ts b/webapps/console/pages/api/workspace/index.ts index aeb5762d2..7a914ac5c 100644 --- a/webapps/console/pages/api/workspace/index.ts +++ b/webapps/console/pages/api/workspace/index.ts @@ -2,6 +2,7 @@ import { Api, inferUrl, nextJsApiHandler } from "../../../lib/api"; import { z } from "zod"; import { db } from "../../../lib/server/db"; import { requireDefined } from "juava"; +import { withProductAnalytics } from "../../../lib/server/telemetry"; const api: Api = { url: inferUrl(__filename), @@ -35,7 +36,7 @@ const api: Api = { const newWorkspace = await db.prisma().workspace.create({ data: { name: body.name || `${user.name || user.email || user.externalId}'s new workspace` }, }); - await db.prisma().workspaceAccess.create({ data: { userId: user.internalId, workspaceId: newWorkspace.id } }); + await withProductAnalytics(p => p.workspace("workspace_created"), { user, workspace: newWorkspace }); return { id: newWorkspace.id }; }, }, From f1a45517e5d5ce04ddc3e315f7c4226df654d6b3 Mon Sep 17 00:00:00 2001 From: Vladimir Klimontovich Date: Wed, 22 Nov 2023 15:36:56 -0500 Subject: [PATCH 2/2] feat: product analytics - improved tracking precision --- types/protocols/async-request.d.ts | 2 ++ webapps/console/lib/nextauth.config.ts | 4 ++++ webapps/console/lib/server/telemetry.ts | 16 +++++++++++++--- .../api/[workspaceId]/config/[type]/index.ts | 4 ++-- webapps/console/pages/api/fb-auth/create-user.ts | 1 + webapps/console/pages/api/init-user.ts | 4 ++-- webapps/console/pages/api/s/[...type].ts | 16 +++++++++++++++- .../api/workspace/[workspaceIdOrSlug]/index.ts | 8 ++++---- webapps/console/pages/api/workspace/index.ts | 4 ++-- 9 files changed, 45 insertions(+), 14 deletions(-) diff --git a/types/protocols/async-request.d.ts b/types/protocols/async-request.d.ts index 8f7afceee..c1fbdae83 100644 --- a/types/protocols/async-request.d.ts +++ b/types/protocols/async-request.d.ts @@ -11,6 +11,8 @@ export type IngestMessage = { messageId: string; //currently this not being filled connectionId: string; + //id of a stream where this message should eventually go. For debugging purposes so far + streamId?: string; type: string; origin: { baseUrl: string; diff --git a/webapps/console/lib/nextauth.config.ts b/webapps/console/lib/nextauth.config.ts index daa885566..a86e49e01 100644 --- a/webapps/console/lib/nextauth.config.ts +++ b/webapps/console/lib/nextauth.config.ts @@ -7,6 +7,7 @@ import { checkHash, requireDefined } from "juava"; import { ApiError } from "./shared/errors"; import { getServerLog } from "./server/log"; import { withProductAnalytics } from "./server/telemetry"; +import { NextApiRequest } from "next"; const crypto = require("crypto"); @@ -70,6 +71,8 @@ export async function getOrCreateUser(opts: { loginProvider: string; name?: string; email: string; + // we only need this for product analytics, so it's optional + req?: NextApiRequest; }): Promise { const { externalId, loginProvider, email, name = email } = opts; log.atDebug().log(`Signing in user ${JSON.stringify(opts)}`); @@ -91,6 +94,7 @@ export async function getOrCreateUser(opts: { }); await withProductAnalytics(p => p.track("user_created"), { user: { email, name, internalId: user.id, externalId, loginProvider }, + req: opts.req, }); } else if (user.name !== name || user.email !== email) { await db.prisma().userProfile.update({ where: { id: user.id }, data: { name, email } }); diff --git a/webapps/console/lib/server/telemetry.ts b/webapps/console/lib/server/telemetry.ts index 2b7bbb70d..11fefd47a 100644 --- a/webapps/console/lib/server/telemetry.ts +++ b/webapps/console/lib/server/telemetry.ts @@ -4,6 +4,8 @@ import { isTruish } from "../shared/chores"; import { AnalyticsInterface, emptyAnalytics, jitsuAnalytics } from "@jitsu/js/compiled/src"; import { SessionUser } from "../schema"; import { Workspace } from "@prisma/client"; +import { NextApiRequest } from "next"; +import { AnalyticsContext } from "@jitsu/protocols/analytics"; /** * Server telemetry is enabled by default. We need it to see the usage @@ -66,8 +68,9 @@ export interface ProductAnalytics extends AnalyticsInterface { track(event: TrackEvents, props?: any): Promise; } -function createProductAnalytics(analytics: AnalyticsInterface): ProductAnalytics { +function createProductAnalytics(analytics: AnalyticsInterface, req?: NextApiRequest): ProductAnalytics { return { + ...analytics, identifyUser(sessionUser: SessionUser): Promise { return analytics.identify(sessionUser.internalId, { email: sessionUser.email, @@ -82,7 +85,13 @@ function createProductAnalytics(analytics: AnalyticsInterface): ProductAnalytics return analytics.group(idOrObject.id, { workspaceSlug: idOrObject.slug, workspaceName: idOrObject.name }); } }, - ...analytics, + track(event: TrackEvents, props?: any): Promise { + const context: AnalyticsContext = { + ip: (req?.headers["x-forwarded-for"] as string)?.split(",")[0]?.trim() || req?.socket?.remoteAddress, + //userAgent: req?.headers["user-agent"] as string, + }; + return analytics.track(event, { ...(props || {}), context }); + }, }; } @@ -97,11 +106,12 @@ export function withProductAnalytics( opts: { user: TrackedUser; workspace?: Workspace | WorkspaceIdAndProps; + req?: NextApiRequest; } ): Promise { //we create new instance every time since analytics.js saves state in props and not thread safe //creating of an instance is cheap operation - const instance = createProductAnalytics(createAnalytics()); + const instance = createProductAnalytics(createAnalytics(), opts?.req); const allPromises: Promise[] = []; if (opts.user) { allPromises.push(instance.identifyUser(opts.user)); diff --git a/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts b/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts index eaadabe80..b0b94c1af 100644 --- a/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts +++ b/webapps/console/pages/api/[workspaceId]/config/[type]/index.ts @@ -50,7 +50,7 @@ export const api: Api = { query: z.object({ workspaceId: z.string(), type: z.string() }), body: z.any(), }, - handle: async ({ body, user, query: { workspaceId, type } }) => { + handle: async ({ req, body, user, query: { workspaceId, type } }) => { await verifyAccess(user, workspaceId); if (isReadOnly) { throw new ApiError("Console is in read-only mode. Modifications of objects are not allowed"); @@ -71,7 +71,7 @@ export const api: Api = { objectType: type, }), //there's no workspace name / id available. Maybe that's fine? - { user, workspace: { id: workspaceId } } + { user, workspace: { id: workspaceId }, req } ); if (enableAuditLog) { await db.prisma().auditLog.create({ diff --git a/webapps/console/pages/api/fb-auth/create-user.ts b/webapps/console/pages/api/fb-auth/create-user.ts index 8e352ff6a..53e525303 100644 --- a/webapps/console/pages/api/fb-auth/create-user.ts +++ b/webapps/console/pages/api/fb-auth/create-user.ts @@ -15,6 +15,7 @@ export const api: Api = { loginProvider: "firebase", email: user.email, name: user.name || user.email, + req, }); await linkFirebaseUser(user.externalId, dbUser.id); } diff --git a/webapps/console/pages/api/init-user.ts b/webapps/console/pages/api/init-user.ts index 2c6a07a93..0adf8db8d 100644 --- a/webapps/console/pages/api/init-user.ts +++ b/webapps/console/pages/api/init-user.ts @@ -37,7 +37,7 @@ export default createRoute() invite: z.string().optional(), }), }) - .handler(async ({ query, user }) => { + .handler(async ({ req, query, user }) => { await initTelemetry(); getServerLog() .atInfo() @@ -93,7 +93,7 @@ export default createRoute() externalId: user.externalId, }, }); - await withProductAnalytics(p => p.track("user_created"), { user: { ...newUser, internalId: newUser.id } }); + await withProductAnalytics(p => p.track("user_created"), { user: { ...newUser, internalId: newUser.id }, req }); } const newWorkspace = await db .prisma() diff --git a/webapps/console/pages/api/s/[...type].ts b/webapps/console/pages/api/s/[...type].ts index 997a69e82..06fdba3d5 100644 --- a/webapps/console/pages/api/s/[...type].ts +++ b/webapps/console/pages/api/s/[...type].ts @@ -22,6 +22,7 @@ import { satisfyDomainFilter, satisfyFilter, } from "@jitsu/js/compiled/src/destination-plugins"; + const jsondiffpatchInstance = jsondiffpatch.create({}); function isInternalHeader(headerName: string) { @@ -225,6 +226,7 @@ export async function sendEventToBulker(req: NextApiRequest, ingestType: IngestT const message: IngestMessage = { geo: fromHeaders(req.headers), connectionId: "", + ingestType, messageCreated: new Date().toISOString(), messageId: event.messageId, @@ -269,6 +271,7 @@ export async function sendEventToBulker(req: NextApiRequest, ingestType: IngestT let stream: StreamWithDestinations | undefined; try { stream = await getStream(loc); + message.streamId = stream?.stream.id; if (stream) { if (stream.asynchronousDestinations?.length > 0 || stream.synchronousDestinations?.length > 0) { response = await buildResponse(message, stream); @@ -386,13 +389,24 @@ export function patchEvent( event.request_ip = (req.headers["x-real-ip"] as string) || (req.headers["x-forwarded-for"] as string) || req.socket.remoteAddress; + //make sure that context is initialized + event.context = event.context || {}; + if (ingestType === "browser") { - event.context = event.context || {}; + //if ip comes from browser, don't trust i event.context.ip = event.request_ip; } if (context) { event.context = { ...context, ...event.context }; } + if (!event.context.userAgent) { + event.context.userAgent = req.headers["user-agent"] as string; + } + if (!event.context.locale) { + event.context.locale = (req.headers["accept-language"] as string | undefined)?.split(",")[0]?.trim() || undefined; + } + //get geo from headers, so we can display it in the console + event.context.geo = fromHeaders(req.headers); const nowIsoDate = new Date().toISOString(); event.receivedAt = nowIsoDate; diff --git a/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts b/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts index 82f60c293..feb367a7f 100644 --- a/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts +++ b/webapps/console/pages/api/workspace/[workspaceIdOrSlug]/index.ts @@ -37,7 +37,7 @@ export const api: Api = { description: "Get workspace", auth: true, types: { query: z.object({ workspaceIdOrSlug: z.string() }) }, - handle: async ({ query: { workspaceIdOrSlug }, user }) => { + handle: async ({ req, query: { workspaceIdOrSlug }, user }) => { const workspace = await db .prisma() .workspace.findFirst({ where: { OR: [{ id: workspaceIdOrSlug }, { slug: workspaceIdOrSlug }] } }); @@ -60,7 +60,7 @@ export const api: Api = { //send event asynchronously to prevent increased response time //theoretically, event can get lost, however this is not the type of event that //requires 100% reliability - withProductAnalytics(callback => callback.track("workspace_access"), { user, workspace }); + withProductAnalytics(callback => callback.track("workspace_access"), { user, workspace, req }); } //it doesn't have to by sync since the preferences are optional @@ -83,13 +83,13 @@ export const api: Api = { workspaceIdOrSlug: z.string(), }), }, - handle: async ({ query: { workspaceIdOrSlug, onboarding }, body, user }) => { + handle: async ({ req, query: { workspaceIdOrSlug, onboarding }, body, user }) => { await verifyAccess(user, workspaceIdOrSlug); const workspace = await db .prisma() .workspace.update({ where: { id: workspaceIdOrSlug }, data: { name: body.name, slug: body.slug } }); if (onboarding) { - await withProductAnalytics(callback => callback.track("workspace_onboarded"), { user, workspace }); + await withProductAnalytics(callback => callback.track("workspace_onboarded"), { user, workspace, req }); } return workspace; }, diff --git a/webapps/console/pages/api/workspace/index.ts b/webapps/console/pages/api/workspace/index.ts index 7a914ac5c..33485f055 100644 --- a/webapps/console/pages/api/workspace/index.ts +++ b/webapps/console/pages/api/workspace/index.ts @@ -32,11 +32,11 @@ const api: Api = { types: { body: z.object({ name: z.string().optional() }), }, - handle: async ({ user, body }) => { + handle: async ({ req, user, body }) => { const newWorkspace = await db.prisma().workspace.create({ data: { name: body.name || `${user.name || user.email || user.externalId}'s new workspace` }, }); - await withProductAnalytics(p => p.workspace("workspace_created"), { user, workspace: newWorkspace }); + await withProductAnalytics(p => p.workspace("workspace_created"), { user, workspace: newWorkspace, req }); return { id: newWorkspace.id }; }, },