Skip to content

Commit

Permalink
rotor: count each multiplied event sent to destination as additional …
Browse files Browse the repository at this point in the history
…"active event"
  • Loading branch information
absorbb committed Jan 27, 2025
1 parent 901c174 commit 70852f9
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 60 deletions.
2 changes: 1 addition & 1 deletion services/rotor/src/lib/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export function createMetrics(
return prefix + status;
})(el);
buffer.push({
key: el.metricsMeta.messageId + "_" + el.eventIndex + "_" + d.getTime(),
key: el.metricsMeta.messageId + "_" + el.eventIndex + "_" + (el.receivedAt || new Date()).getTime(),
timestamp: d,
...omit(el.metricsMeta, "retries"),
functionId: el.functionId,
Expand Down
131 changes: 72 additions & 59 deletions webapps/console/prisma/metrics.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,35 @@ GROUP BY
workspaceId,
timestamp;

CREATE MATERIALIZED VIEW newjitsu_metrics.mv_active_incoming on cluster jitsu_cluster
(
`timestamp` DateTime,
`workspaceId` LowCardinality(String),
`count` AggregateFunction(uniq, String)
)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_active_incoming5',
'{replica}')
ORDER BY (timestamp, workspaceId)
SETTINGS index_granularity = 8192


create table newjitsu_metrics.mv_active_incoming3 on cluster jitsu_cluster
(
`workspaceId` LowCardinality(String),
`timestamp` DateTime,
`messageId` String
)
engine = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_active_incoming3_1', '{replica}')
ORDER BY (workspaceId, timestamp, messageId)
PARTITION BY toYYYYMM(timestamp)
;

--drop table newjitsu_metrics.mv_active_incoming3 on cluster jitsu_cluster;
--drop view newjitsu_metrics.to_mv_active_incoming3 on cluster jitsu_cluster;

CREATE MATERIALIZED VIEW newjitsu_metrics.to_mv_active_incoming3 on cluster jitsu_cluster
TO newjitsu_metrics.mv_active_incoming3
(
`workspaceId` LowCardinality(String),
`timestamp` DateTime,
`messageId` String
)
AS
SELECT
timestamp,
workspaceId,
uniqState(messageId) AS count
FROM newjitsu_metrics.active_incoming
GROUP BY
timestamp,
workspaceId;
--
-- create table newjitsu_metrics.metrics on cluster jitsu_cluster
-- (
-- timestamp DateTime,
-- messageId String,
-- workspaceId LowCardinality(String),
-- streamId LowCardinality(String),
-- connectionId LowCardinality(String),
-- functionId LowCardinality(String),
-- destinationId LowCardinality(String),
-- status LowCardinality(String),
-- events Int64,
-- tmpTotal Int64,
-- tmpUniq Int64
-- )
-- engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/metrics_real', '{replica}')
-- PARTITION BY toYYYYMM(timestamp)
-- ORDER BY (workspaceId, connectionId, timestamp)
-- SETTINGS index_granularity = 8192;
messageId
FROM newjitsu_metrics.active_incoming;


CREATE TABLE IF NOT EXISTS newjitsu_metrics.metrics on cluster jitsu_cluster
Expand All @@ -82,29 +73,51 @@ CREATE TABLE IF NOT EXISTS newjitsu_metrics.metrics on cluster jitsu_cluster
)
ENGINE = Null;

CREATE MATERIALIZED VIEW newjitsu_metrics.mv_metrics on cluster jitsu_cluster
(
timestamp DateTime,
workspaceId LowCardinality(String),
streamId LowCardinality(String),
connectionId LowCardinality(String),
functionId LowCardinality(String),
destinationId LowCardinality(String),
status LowCardinality(String),
events AggregateFunction(sum, Int64),
uniqEvents AggregateFunction(uniq, String)
create table newjitsu_metrics.mv_metrics on cluster jitsu_cluster
(
timestamp DateTime,
workspaceId LowCardinality(String),
streamId LowCardinality(String),
connectionId LowCardinality(String),
functionId LowCardinality(String),
destinationId LowCardinality(String),
status LowCardinality(String),
events AggregateFunction(sum, Int64)
)
engine = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_metrics', '{replica}')
ORDER BY (timestamp, workspaceId, streamId, connectionId, functionId, destinationId, status)
SETTINGS index_granularity = 8192;


CREATE MATERIALIZED VIEW newjitsu_metrics.to_mv_metrics on cluster jitsu_cluster
TO newjitsu_metrics.mv_metrics
(
`timestamp` DateTime,
`workspaceId` LowCardinality(String),
`streamId` LowCardinality(String),
`connectionId` LowCardinality(String),
`functionId` LowCardinality(String),
`destinationId` LowCardinality(String),
`status` LowCardinality(String),
`events` AggregateFunction(sum, Int64)
)
AS
SELECT
timestamp,
workspaceId,
streamId,
connectionId,
functionId,
destinationId,
status,
sumState(events) AS events
FROM newjitsu_metrics.metrics
GROUP BY
timestamp,
workspaceId,
streamId,
connectionId,
functionId,
destinationId,
status;

)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/newjitsu_metrics/mv_metrics', '{replica}') ORDER BY (timestamp, workspaceId, streamId, connectionId, functionId, destinationId, status)
AS SELECT
timestamp,
workspaceId,
streamId,
connectionId,
functionId,
destinationId,
status,
sumState(events) AS events,
uniqState(messageId) AS uniqEvents
FROM newjitsu_metrics.metrics
GROUP BY timestamp, workspaceId, streamId, connectionId, functionId, destinationId, status;

0 comments on commit 70852f9

Please sign in to comment.