From a68c3f4e419264f5302b7b23bbaf65dcee621c70 Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Tue, 28 Nov 2023 11:26:56 +0400 Subject: [PATCH] rotor: GA4 bring back engagement_time_msec workaround rotor: error and metrics reporting improvements. --- libs/core-functions/src/context.ts | 6 ++++- .../src/functions/ga4-destination.ts | 2 ++ services/rotor/src/index.ts | 22 ++++++++++--------- services/rotor/src/lib/metrics.ts | 8 ++++++- services/rotor/src/lib/rotor.ts | 4 +++- 5 files changed, 29 insertions(+), 13 deletions(-) diff --git a/libs/core-functions/src/context.ts b/libs/core-functions/src/context.ts index d3410ce97..50aa3d6b9 100644 --- a/libs/core-functions/src/context.ts +++ b/libs/core-functions/src/context.ts @@ -35,10 +35,11 @@ export function createFullContext( headers: init?.headers ? hideSensitiveHeaders(init.headers) : undefined, event: event, }; + const timeout = 10000; const controller = new AbortController(); setTimeout(() => { controller.abort(); - }, 10000); + }, timeout); let internalInit: RequestInit = { ...init, @@ -49,6 +50,9 @@ export function createFullContext( try { fetchResult = await nodeFetch(url, internalInit); } catch (err) { + if (err.name === "AbortError") { + err.message = `Fetch request exceeded timeout ${timeout}ms and was aborted`; + } if (logToRedis) { eventsStore.log(connectionId, true, { ...baseInfo, error: getErrorMessage(err), elapsedMs: sw.elapsedMs() }); } diff --git a/libs/core-functions/src/functions/ga4-destination.ts b/libs/core-functions/src/functions/ga4-destination.ts index 60829b946..66a2f20ce 100644 --- a/libs/core-functions/src/functions/ga4-destination.ts +++ b/libs/core-functions/src/functions/ga4-destination.ts @@ -154,6 +154,7 @@ function pageViewEvent(event: AnalyticsServerEvent): Ga4Event { page_location: customProperties.url || "", page_referrer: customProperties.referrer || "", page_title: customProperties.title || "", + engagement_time_msec: 1, }, }; } @@ -281,6 +282,7 @@ function trackEvent(event: AnalyticsServerEvent): Ga4Event { params.value = evp.value || evp.total || evp.revenue; break; } + params.engagement_time_msec = 1; return { name, params, diff --git a/services/rotor/src/index.ts b/services/rotor/src/index.ts index 78320b755..54c50c777 100644 --- a/services/rotor/src/index.ts +++ b/services/rotor/src/index.ts @@ -175,16 +175,18 @@ export async function rotorMessageHandler( .filter(f => f.functionId.startsWith("udf.")) .map(async f => { const functionId = f.functionId.substring(4); - const userFunctionObj = requireDefined( - await getCachedOrLoad(functionsCache, functionId, key => { - return fastStore.getConfig("function", key); - }), - `Unknown function: ${functionId}` - ); - if (userFunctionObj.workspaceId !== connection.workspaceId) { - throw newError( - `Function ${functionId} is not in the same workspace as connection ${connection.id} (${connection.workspaceId})` - ); + const userFunctionObj = await getCachedOrLoad(functionsCache, functionId, key => { + return fastStore.getConfig("function", key); + }); + if (!userFunctionObj || userFunctionObj.workspaceId !== connection.workspaceId) { + return { + id: f.functionId as string, + config: {}, + exec: async (event, ctx) => { + throw newError(`Function ${functionId} not found in workspace: ${connection.workspaceId}`); + }, + context: {}, + }; } const code = userFunctionObj.code; const codeHash = hash(code); diff --git a/services/rotor/src/lib/metrics.ts b/services/rotor/src/lib/metrics.ts index 5692de840..dec6459b9 100644 --- a/services/rotor/src/lib/metrics.ts +++ b/services/rotor/src/lib/metrics.ts @@ -28,12 +28,18 @@ export function createMetrics(producer: Producer): Metrics { const buffer: MetricsEvent[] = []; const flush = async (buf: MetricsEvent[]) => { + const d = new Date(); + d.setMilliseconds(0); + d.setSeconds(0); await Promise.all([ producer.send({ topic: `in.id.metrics.m.batch.t.metrics`, compression: getCompressionType(), messages: buf.map(m => ({ - value: JSON.stringify(m), + value: JSON.stringify({ + ...m, + timestamp: d.toISOString(), + }), })), }), producer.send({ diff --git a/services/rotor/src/lib/rotor.ts b/services/rotor/src/lib/rotor.ts index c9675e517..e27faeafa 100644 --- a/services/rotor/src/lib/rotor.ts +++ b/services/rotor/src/lib/rotor.ts @@ -147,7 +147,9 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor { .atError() .withCause(e) .log( - `Failed to process message for ${message.key || "(no key set)"}. ${retryLogMessage(retryPolicy, retries)}` + `Failed to process function ${e.functionId} for message ${ + message.key || "(no key set)" + }. ${retryLogMessage(retryPolicy, retries)}` ); if (!retryTime) { messagesDeadLettered.inc({ topic });