Skip to content

Add metrics for last committed watermarks #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -35,12 +35,12 @@ ThisBuild / git.remoteRepo := {
}

val scalaTestVersion = "3.2.19"
val scalaCheckVersion = "1.18.0"
val scalaCheckVersion = "1.18.1"
val scalaCheckTestVersion = "3.2.19.0"

val hadoopVersion = "3.4.0"
val parquetVersion = "1.14.1"
val icebergVersion = "1.6.0"
val parquetVersion = "1.14.2"
val icebergVersion = "1.6.1"

lazy val `stream-loader-core` = project
.in(file("stream-loader-core"))
@@ -53,17 +53,17 @@ lazy val `stream-loader-core` = project
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.apache.kafka" % "kafka-clients" % "3.8.0",
"org.log4s" %% "log4s" % "1.10.0",
"org.apache.commons" % "commons-compress" % "1.26.2",
"org.xerial.snappy" % "snappy-java" % "1.1.10.5",
"org.apache.commons" % "commons-compress" % "1.27.1",
"org.xerial.snappy" % "snappy-java" % "1.1.10.7",
"org.lz4" % "lz4-java" % "1.8.0",
"com.github.luben" % "zstd-jni" % "1.5.6-4",
"com.github.luben" % "zstd-jni" % "1.5.6-5",
"com.univocity" % "univocity-parsers" % "2.9.1",
"org.json4s" %% "json4s-native" % "4.0.7",
"io.micrometer" % "micrometer-core" % "1.13.2",
"io.micrometer" % "micrometer-core" % "1.13.4",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
"ch.qos.logback" % "logback-classic" % "1.5.6" % "test"
"ch.qos.logback" % "logback-classic" % "1.5.8" % "test"
)
)

@@ -75,7 +75,7 @@ lazy val `stream-loader-clickhouse` = project
resolvers += "jitpack" at "https://jitpack.io",
libraryDependencies ++= Seq(
"org.apache.httpcomponents.client5" % "httpclient5" % "5.3.1",
"com.clickhouse" % "clickhouse-jdbc" % "0.6.3",
"com.clickhouse" % "clickhouse-jdbc" % "0.6.5",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test"
@@ -116,9 +116,9 @@ lazy val `stream-loader-s3` = project
.settings(commonSettings)
.settings(
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.26.25",
"software.amazon.awssdk" % "s3" % "2.28.3",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.765" % "test",
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.772" % "test",
"org.gaul" % "s3proxy" % "2.2.0" % "test"
)
)
@@ -138,7 +138,7 @@ lazy val `stream-loader-vertica` = project
)
)

val duckdbVersion = "1.0.0"
val duckdbVersion = "1.1.0"

lazy val packAndSplitJars =
taskKey[(File, File)]("Runs pack and splits out the application jars from the external dependency jars")
@@ -161,16 +161,16 @@ lazy val `stream-loader-tests` = project
.settings(
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.3",
"ch.qos.logback" % "logback-classic" % "1.5.6",
"ch.qos.logback" % "logback-classic" % "1.5.8",
"com.zaxxer" % "HikariCP" % "5.1.0",
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion,
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.13" % "test",
"org.slf4j" % "log4j-over-slf4j" % "2.0.16" % "test",
"org.mandas" % "docker-client" % "7.0.8" % "test",
"org.jboss.resteasy" % "resteasy-client" % "6.2.9.Final" % "test",
"org.jboss.resteasy" % "resteasy-client" % "6.2.10.Final" % "test",
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.17.2" % "test",
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
),
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.10.1
sbt.version=1.10.2
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -16,10 +16,10 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")

libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.6"
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.7"

addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")

addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.2.1")

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.11.1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.11.3")
Original file line number Diff line number Diff line change
@@ -9,11 +9,11 @@
package com.adform.streamloader.sink.batch

import com.adform.streamloader.model._
import com.adform.streamloader.util.Retry._
import com.adform.streamloader.util._
import com.adform.streamloader.sink.PartitionGroupSinker
import com.adform.streamloader.sink.batch.storage.RecordBatchStorage
import com.adform.streamloader.source.KafkaContext
import com.adform.streamloader.util.Retry._
import com.adform.streamloader.util._
import io.micrometer.core.instrument.{Counter, Gauge, Meter, Timer}
import org.apache.kafka.common.TopicPartition

@@ -71,10 +71,14 @@ class RecordBatchingSinker[B <: RecordBatch](
batchStorage.commitBatch(batch)
}
)

batch.recordRanges.foreach(range => {
Metrics.committedWatermarks(range.topicPartition).set(range.end.watermark.millis)
})

if (!batch.discard()) {
log.warn("Failed discarding batch")
}

} catch {
case e if isInterruptionException(e) =>
log.debug("Batch commit thread interrupted")
@@ -175,14 +179,25 @@ class RecordBatchingSinker[B <: RecordBatch](
val recordsWritten: Map[TopicPartition, Counter] =
groupPartitions.map(tp => tp -> createCounter("records.written", commonTags ++ partitionTags(tp))).toMap

val committedWatermarks: Map[TopicPartition, AssignableGauge[java.lang.Long]] =
groupPartitions
.map(tp =>
tp -> createAssignableGauge(
"committed.watermark.delay.ms",
(latestWatermark: java.lang.Long) => (System.currentTimeMillis() - latestWatermark).toDouble,
commonTags ++ partitionTags(tp)
)
)
.toMap

val recordsBatched: Map[TopicPartition, Counter] =
groupPartitions.map(tp => tp -> createCounter("records.batched", commonTags ++ partitionTags(tp))).toMap

val commitDuration: Timer = createTimer("commit.duration", commonTags, maxDuration = Duration.ofMinutes(5))
val commitQueueSize: Gauge =
createGauge("commit.queue.size", self, (_: RecordBatchingSinker[B]) => self.commitQueue.size(), commonTags)

val allMeters: Seq[Meter] =
Seq(commitDuration, commitQueueSize) ++ recordsWritten.values
val allMeters: Seq[Meter] = Seq(commitDuration, commitQueueSize) ++
recordsWritten.values ++ committedWatermarks.values.map(_.underlying) ++ recordsBatched.values
}
}
Original file line number Diff line number Diff line change
@@ -8,12 +8,11 @@

package com.adform.streamloader.util

import java.time.Duration
import java.util.function.ToDoubleFunction

import io.micrometer.core.instrument._
import io.micrometer.core.instrument.composite.CompositeMeterRegistry

import java.time.Duration
import java.util.function.ToDoubleFunction
import scala.collection.concurrent
import scala.collection.concurrent.TrieMap

@@ -65,6 +64,27 @@ trait Metrics {
.register(Metrics.registry)
}

class AssignableGauge[T <: AnyRef](name: String, tdf: ToDoubleFunction[T], tags: Seq[MetricTag]) {
private var currentValue: T = _
lazy val underlying: Gauge =
createGauge(name, this, (_: AssignableGauge[T]) => tdf.applyAsDouble(currentValue), tags)

def set(value: T): Double = {
currentValue = value
underlying.value()
}

def close(): Unit = underlying.close()
}

protected def createAssignableGauge[T <: AnyRef](
name: String,
tdf: ToDoubleFunction[T],
tags: Seq[MetricTag] = Seq()
): AssignableGauge[T] = {
new AssignableGauge(name, tdf, tags)
}

protected def createDistribution(name: String, tags: Seq[MetricTag] = Seq()): DistributionSummary =
DistributionSummary
.builder(joinPrefixes(Seq(metricsRoot, name)))