Skip to content

Commit

Permalink
rotor: GA4 bring back engagement_time_msec workaround
Browse files Browse the repository at this point in the history
rotor: error and metrics reporting improvements.
  • Loading branch information
absorbb committed Nov 28, 2023
1 parent d59ee62 commit a68c3f4
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 13 deletions.
6 changes: 5 additions & 1 deletion libs/core-functions/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ export function createFullContext(
headers: init?.headers ? hideSensitiveHeaders(init.headers) : undefined,
event: event,
};
const timeout = 10000;
const controller = new AbortController();
setTimeout(() => {
controller.abort();
}, 10000);
}, timeout);

let internalInit: RequestInit = {
...init,
Expand All @@ -49,6 +50,9 @@ export function createFullContext(
try {
fetchResult = await nodeFetch(url, internalInit);
} catch (err) {
if (err.name === "AbortError") {
err.message = `Fetch request exceeded timeout ${timeout}ms and was aborted`;
}
if (logToRedis) {
eventsStore.log(connectionId, true, { ...baseInfo, error: getErrorMessage(err), elapsedMs: sw.elapsedMs() });
}
Expand Down
2 changes: 2 additions & 0 deletions libs/core-functions/src/functions/ga4-destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ function pageViewEvent(event: AnalyticsServerEvent): Ga4Event {
page_location: customProperties.url || "",
page_referrer: customProperties.referrer || "",
page_title: customProperties.title || "",
engagement_time_msec: 1,
},
};
}
Expand Down Expand Up @@ -281,6 +282,7 @@ function trackEvent(event: AnalyticsServerEvent): Ga4Event {
params.value = evp.value || evp.total || evp.revenue;
break;
}
params.engagement_time_msec = 1;
return {
name,
params,
Expand Down
22 changes: 12 additions & 10 deletions services/rotor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,18 @@ export async function rotorMessageHandler(
.filter(f => f.functionId.startsWith("udf."))
.map(async f => {
const functionId = f.functionId.substring(4);
const userFunctionObj = requireDefined(
await getCachedOrLoad(functionsCache, functionId, key => {
return fastStore.getConfig("function", key);
}),
`Unknown function: ${functionId}`
);
if (userFunctionObj.workspaceId !== connection.workspaceId) {
throw newError(
`Function ${functionId} is not in the same workspace as connection ${connection.id} (${connection.workspaceId})`
);
const userFunctionObj = await getCachedOrLoad(functionsCache, functionId, key => {
return fastStore.getConfig("function", key);
});
if (!userFunctionObj || userFunctionObj.workspaceId !== connection.workspaceId) {
return {
id: f.functionId as string,
config: {},
exec: async (event, ctx) => {
throw newError(`Function ${functionId} not found in workspace: ${connection.workspaceId}`);
},
context: {},
};
}
const code = userFunctionObj.code;
const codeHash = hash(code);
Expand Down
8 changes: 7 additions & 1 deletion services/rotor/src/lib/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ export function createMetrics(producer: Producer): Metrics {
const buffer: MetricsEvent[] = [];

const flush = async (buf: MetricsEvent[]) => {
const d = new Date();
d.setMilliseconds(0);
d.setSeconds(0);
await Promise.all([
producer.send({
topic: `in.id.metrics.m.batch.t.metrics`,
compression: getCompressionType(),
messages: buf.map(m => ({
value: JSON.stringify(m),
value: JSON.stringify({
...m,
timestamp: d.toISOString(),
}),
})),
}),
producer.send({
Expand Down
4 changes: 3 additions & 1 deletion services/rotor/src/lib/rotor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor {
.atError()
.withCause(e)
.log(
`Failed to process message for ${message.key || "(no key set)"}. ${retryLogMessage(retryPolicy, retries)}`
`Failed to process function ${e.functionId} for message ${
message.key || "(no key set)"
}. ${retryLogMessage(retryPolicy, retries)}`
);
if (!retryTime) {
messagesDeadLettered.inc({ topic });
Expand Down

0 comments on commit a68c3f4

Please sign in to comment.