From 70852f9612edcb3aa3ded32ed5ae6e156bbfbc04 Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Mon, 27 Jan 2025 11:32:52 +0400 Subject: [PATCH] rotor: count each multiplied event sent to destination as additional "active event" --- services/rotor/src/lib/metrics.ts | 2 +- webapps/console/prisma/metrics.sql | 131 ++++++++++++++++------------- 2 files changed, 73 insertions(+), 60 deletions(-) diff --git a/services/rotor/src/lib/metrics.ts b/services/rotor/src/lib/metrics.ts index acd99adee..8c026be46 100644 --- a/services/rotor/src/lib/metrics.ts +++ b/services/rotor/src/lib/metrics.ts @@ -184,7 +184,7 @@ export function createMetrics( return prefix + status; })(el); buffer.push({ - key: el.metricsMeta.messageId + "_" + el.eventIndex + "_" + d.getTime(), + key: el.metricsMeta.messageId + "_" + el.eventIndex + "_" + (el.receivedAt || new Date()).getTime(), timestamp: d, ...omit(el.metricsMeta, "retries"), functionId: el.functionId, diff --git a/webapps/console/prisma/metrics.sql b/webapps/console/prisma/metrics.sql index 4ac360ca8..79dd708d6 100644 --- a/webapps/console/prisma/metrics.sql +++ b/webapps/console/prisma/metrics.sql @@ -28,44 +28,35 @@ GROUP BY workspaceId, timestamp; -CREATE MATERIALIZED VIEW newjitsu_metrics.mv_active_incoming on cluster jitsu_cluster - ( - `timestamp` DateTime, - `workspaceId` LowCardinality(String), - `count` AggregateFunction(uniq, String) - ) - ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_active_incoming5', - '{replica}') - ORDER BY (timestamp, workspaceId) - SETTINGS index_granularity = 8192 + + +create table newjitsu_metrics.mv_active_incoming3 on cluster jitsu_cluster +( + `workspaceId` LowCardinality(String), + `timestamp` DateTime, + `messageId` String +) + engine = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_active_incoming3_1', '{replica}') + ORDER BY (workspaceId, timestamp, messageId) + PARTITION BY toYYYYMM(timestamp) +; + +--drop table newjitsu_metrics.mv_active_incoming3 on cluster jitsu_cluster; +--drop view newjitsu_metrics.to_mv_active_incoming3 on cluster jitsu_cluster; + +CREATE MATERIALIZED VIEW newjitsu_metrics.to_mv_active_incoming3 on cluster jitsu_cluster + TO newjitsu_metrics.mv_active_incoming3 + ( + `workspaceId` LowCardinality(String), + `timestamp` DateTime, + `messageId` String + ) AS SELECT - timestamp, workspaceId, - uniqState(messageId) AS count -FROM newjitsu_metrics.active_incoming -GROUP BY timestamp, - workspaceId; --- --- create table newjitsu_metrics.metrics on cluster jitsu_cluster --- ( --- timestamp DateTime, --- messageId String, --- workspaceId LowCardinality(String), --- streamId LowCardinality(String), --- connectionId LowCardinality(String), --- functionId LowCardinality(String), --- destinationId LowCardinality(String), --- status LowCardinality(String), --- events Int64, --- tmpTotal Int64, --- tmpUniq Int64 --- ) --- engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/metrics_real', '{replica}') --- PARTITION BY toYYYYMM(timestamp) --- ORDER BY (workspaceId, connectionId, timestamp) --- SETTINGS index_granularity = 8192; + messageId +FROM newjitsu_metrics.active_incoming; CREATE TABLE IF NOT EXISTS newjitsu_metrics.metrics on cluster jitsu_cluster @@ -82,29 +73,51 @@ CREATE TABLE IF NOT EXISTS newjitsu_metrics.metrics on cluster jitsu_cluster ) ENGINE = Null; -CREATE MATERIALIZED VIEW newjitsu_metrics.mv_metrics on cluster jitsu_cluster - ( - timestamp DateTime, - workspaceId LowCardinality(String), - streamId LowCardinality(String), - connectionId LowCardinality(String), - functionId LowCardinality(String), - destinationId LowCardinality(String), - status LowCardinality(String), - events AggregateFunction(sum, Int64), - uniqEvents AggregateFunction(uniq, String) +create table newjitsu_metrics.mv_metrics on cluster jitsu_cluster +( + timestamp DateTime, + workspaceId LowCardinality(String), + streamId LowCardinality(String), + connectionId LowCardinality(String), + functionId LowCardinality(String), + destinationId LowCardinality(String), + status LowCardinality(String), + events AggregateFunction(sum, Int64) +) + engine = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_metrics', '{replica}') + ORDER BY (timestamp, workspaceId, streamId, connectionId, functionId, destinationId, status) + SETTINGS index_granularity = 8192; + + +CREATE MATERIALIZED VIEW newjitsu_metrics.to_mv_metrics on cluster jitsu_cluster + TO newjitsu_metrics.mv_metrics + ( + `timestamp` DateTime, + `workspaceId` LowCardinality(String), + `streamId` LowCardinality(String), + `connectionId` LowCardinality(String), + `functionId` LowCardinality(String), + `destinationId` LowCardinality(String), + `status` LowCardinality(String), + `events` AggregateFunction(sum, Int64) + ) +AS +SELECT + timestamp, + workspaceId, + streamId, + connectionId, + functionId, + destinationId, + status, + sumState(events) AS events +FROM newjitsu_metrics.metrics +GROUP BY + timestamp, + workspaceId, + streamId, + connectionId, + functionId, + destinationId, + status; - ) - ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_metrics', '{replica}') ORDER BY (timestamp, workspaceId, streamId, connectionId, functionId, destinationId, status) -AS SELECT - timestamp, - workspaceId, - streamId, - connectionId, - functionId, - destinationId, - status, - sumState(events) AS events, - uniqState(messageId) AS uniqEvents - FROM newjitsu_metrics.metrics - GROUP BY timestamp, workspaceId, streamId, connectionId, functionId, destinationId, status; \ No newline at end of file