diff --git a/libs/core-functions/src/functions/lib/index.ts b/libs/core-functions/src/functions/lib/index.ts index 1c7df6d20..ecf6a6440 100644 --- a/libs/core-functions/src/functions/lib/index.ts +++ b/libs/core-functions/src/functions/lib/index.ts @@ -77,7 +77,7 @@ export type FuncChainResult = { }; export type FunctionExecRes = { - receivedAt?: any; + receivedAt?: Date; eventIndex: number; event?: any; metricsMeta?: MetricsMeta; diff --git a/services/rotor/src/lib/functions-chain.ts b/services/rotor/src/lib/functions-chain.ts index 56990a42a..fc73fe8f8 100644 --- a/services/rotor/src/lib/functions-chain.ts +++ b/services/rotor/src/lib/functions-chain.ts @@ -288,10 +288,10 @@ export async function runChain( const event = events[i]; let result: FuncReturn = undefined; const sw = stopwatch(); - const rat = new Date(event.receivedAt) as any; + const rat = new Date(event.receivedAt); const execLogMeta = { eventIndex: i, - receivedAt: rat && rat != "Invalid Date" ? rat : new Date(), + receivedAt: !isNaN(rat.getTime()) ? rat : new Date(), functionId: f.id, metricsMeta: metricsMeta, }; diff --git a/services/rotor/src/lib/metrics.ts b/services/rotor/src/lib/metrics.ts index 99de26d05..acd99adee 100644 --- a/services/rotor/src/lib/metrics.ts +++ b/services/rotor/src/lib/metrics.ts @@ -69,7 +69,8 @@ export function createMetrics( value: JSON.stringify({ timestamp: d, workspaceId: m.workspaceId, - messageId: m.messageId, + // to count active events use composed key: messageId_eventIndex_receivedAt + messageId: m.key, }), }; }), @@ -183,7 +184,7 @@ export function createMetrics( return prefix + status; })(el); buffer.push({ - key: crypto.randomUUID(), + key: el.metricsMeta.messageId + "_" + el.eventIndex + "_" + d.getTime(), timestamp: d, ...omit(el.metricsMeta, "retries"), functionId: el.functionId,