Skip to content

Commit

Permalink
isolate canned metric info in one location (#368)
Browse files Browse the repository at this point in the history
* isolate canned metric info in one location

* fix formatting

* fix test
  • Loading branch information
zachary-rote authored May 30, 2023
1 parent 4752a91 commit 065f64c
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,8 @@ class KafkaProducerActor(
publisherActor.ref ! PoisonPill
}

private val isAggregateStateCurrentTimer: Timer = metrics.timer(
MetricInfo(
s"surge.aggregate.is-current-timer",
"Average time in milliseconds taken to check if a particular aggregate is up to date in the KTable",
tags = Map("aggregate" -> aggregateName)))
private val isAggregateStateCurrentTimer: Timer = metrics.timer(Metrics.SURGE_AGGREGATE_IS_CURRENT_TIMER.withTags(Map("aggregate" -> aggregateName)))

def isAggregateStateCurrent(aggregateId: String): Future[Boolean] = {
implicit val askTimeout: Timeout = Timeout(TimeoutConfig.PublisherActor.aggregateStateCurrentTimeout)
isAggregateStateCurrentTimer.timeFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import surge.internal.domain.SurgeProcessingModel
import surge.internal.kafka.{ HeadersHelper, ProducerActorContext }
import surge.internal.utils.DiagnosticContextFuturePropagation
import surge.kafka.KafkaTopic
import surge.metrics.MetricInfo
import surge.metrics.{ MetricInfo, Metrics }

import java.util.concurrent.Executors
import scala.concurrent.{ ExecutionContext, Future }
Expand All @@ -30,17 +30,9 @@ trait SurgeModel[State, Message, Event] extends ProducerActorContext {
private val serializationExecutionContext: ExecutionContext = new DiagnosticContextFuturePropagation(
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(serializationThreadPoolSize)))

private lazy val serializeEventTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.event-serialization-timer",
description = "Average time taken in milliseconds to serialize an individual event to bytes before persisting to Kafka",
tags = Map("aggregate" -> aggregateName)))
private lazy val serializeEventTimer = metrics.timer(Metrics.SURGE_AGGREGATE_EVENT_SERIALIZATION_TIMER.withTags(Map("aggregate" -> aggregateName)))

private lazy val serializeStateTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.aggregate-state-serialization-timer",
description = "Average time taken in milliseconds to serialize a new aggregate state to bytes before persisting to Kafka",
tags = Map("aggregate" -> aggregateName)))
private lazy val serializeStateTimer = metrics.timer(Metrics.SURGE_AGGREGATE_AGGREGATE_STATE_SERIALIZATION_TIMER.withTags(Map("aggregate" -> aggregateName)))

def serializeEvents(events: Seq[(Event, Option[KafkaTopic])]): Future[Seq[KafkaProducerActor.MessageToPublish]] = Future {
val eventWriteFormatting = eventWriteFormattingOpt.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,8 @@ object KafkaProducerActorImpl {

object AggregateStateRates {
def apply(aggregateName: String, metrics: Metrics): AggregateStateRates = AggregateStateRates(
current = metrics.rate(
MetricInfo(
name = s"surge.aggregate.state-current-rate",
description = "The per-second rate of aggregates that are up-to-date in and can be loaded immediately from the KTable",
tags = Map("aggregate" -> aggregateName))),
notCurrent = metrics.rate(
MetricInfo(
name = s"surge.aggregate.state-not-current-rate",
description = "The per-second rate of aggregates that are not up-to-date in the KTable and must wait to be loaded",
tags = Map("aggregate" -> aggregateName))))
current = metrics.rate(Metrics.SURGE_AGGREGATE_STATE_CURRENT_RATE.withTags(Map("aggregate" -> aggregateName))),
notCurrent = metrics.rate(Metrics.SURGE_AGGREGATE_STATE_NOT_CURRENT_RATE.withTags(Map("aggregate" -> aggregateName))))

}

Expand Down Expand Up @@ -138,11 +130,7 @@ class KafkaProducerActorImpl(
private val nonTransactionalStatePublisher =
kafkaProducerOverride.getOrElse(KafkaProducer.bytesProducer(config, brokers, stateTopic, partitioner = partitioner))

private val kafkaPublisherTimer: Timer = metrics.timer(
MetricInfo(
s"surge.aggregate.kafka-write-timer",
"Average time in milliseconds that it takes the publisher to write a batch of messages (events & state) to Kafka",
tags = Map("aggregate" -> aggregateName)))
private val kafkaPublisherTimer: Timer = metrics.timer(Metrics.SURGE_AGGREGATE_KAFKA_WRITE_TIMER.withTags(Map("aggregate" -> aggregateName)))
private implicit val rates: AggregateStateRates = AggregateStateRates(aggregateName, metrics)

context.system.scheduler.scheduleOnce(10.milliseconds, self, InitTransactions)
Expand Down Expand Up @@ -404,11 +392,8 @@ class KafkaProducerActorImpl(
msg.originalSenders.foreach(sender => sender.actorRef ! KafkaProducerActor.PublishFailure(sender.trackingId, msg.reason))
}

private val eventsPublishedRate: Rate = metrics.rate(
MetricInfo(
name = s"surge.aggregate.message-publish-rate",
description = "The per-second rate at which this aggregate attempts to publish messages to Kafka",
tags = Map("aggregate" -> aggregateName)))
private val eventsPublishedRate: Rate = metrics.rate(Metrics.SURGE_AGGREGATE_MESSAGE_PUBLISH_RATE.withTags(Map("aggregate" -> aggregateName)))

private def handleFlushMessages(state: KafkaProducerActorState): Unit = {
if (state.transactionInProgress) {
if (state.currentTransactionTimeMillis >= transactionTimeWarningThresholdMillis &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,12 @@ object PersistentActor {

private[internal] def createMetrics(metrics: Metrics, aggregateName: String): MetricsQuiver = {
MetricsQuiver(
stateInitializationTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.actor-state-initialization-timer",
description = "Average time in milliseconds taken to load aggregate state from the KTable",
tags = Map("aggregate" -> aggregateName))),
aggregateDeserializationTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.state-deserialization-timer",
description = "Average time taken in milliseconds to deserialize aggregate state after the bytes are read from the KTable",
tags = Map("aggregate" -> aggregateName))),
commandHandlingTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.command-handling-timer",
description = "Average time taken in milliseconds for the business logic 'processCommand' function to process a command",
tags = Map("aggregate" -> aggregateName))),
messageHandlingTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.command-handling-timer",
description = "Average time taken in milliseconds for the business logic 'processCommand' function to process a message",
tags = Map("aggregate" -> aggregateName))),
eventHandlingTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.event-handling-timer",
description = "Average time taken in milliseconds for the business logic 'handleEvent' function to handle an event",
tags = Map("aggregate" -> aggregateName))),
eventPublishTimer = metrics.timer(
MetricInfo(
name = s"surge.aggregate.event-publish-timer",
description = "Average time taken in milliseconds to persist all generated events plus an updated state to Kafka",
tags = Map("aggregate" -> aggregateName))))
stateInitializationTimer = metrics.timer(Metrics.SURGE_AGGREGATE_ACTOR_STATE_INITIALIZATION_TIMER.withTags(Map("aggregate" -> aggregateName))),
aggregateDeserializationTimer = metrics.timer(Metrics.SURGE_AGGREGATE_STATE_DESERIALIZATION_TIMER.withTags(Map("aggregate" -> aggregateName))),
commandHandlingTimer = metrics.timer(Metrics.SURGE_AGGREGATE_COMMAND_HANDLING_TIMER.withTags(Map("aggregate" -> aggregateName))),
messageHandlingTimer = metrics.timer(Metrics.SURGE_AGGREGATE_MESSAGE_HANDLING_TIMER.withTags(Map("aggregate" -> aggregateName))),
eventHandlingTimer = metrics.timer(Metrics.SURGE_AGGREGATE_EVENT_HANDLING_TIMER.withTags(Map("aggregate" -> aggregateName))),
eventPublishTimer = metrics.timer(Metrics.SURGE_AGGREGATE_EVENT_PUBLISH_TIMER.withTags(Map("aggregate" -> aggregateName))))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ class KafkaStreamManagerActor(surgeConsumer: SurgeStateStoreConsumer, partitionT
import KafkaStreamManagerActor._
import context.dispatcher

private val getAggregateBytesTimer = metrics.timer(
MetricInfo(
name = s"surge.state-store.get-aggregate-state-timer",
description = "The time taken to fetch aggregate state from the KTable",
tags = Map("storeName" -> surgeConsumer.storeName)))
private val getAggregateBytesTimer = metrics.timer(Metrics.SURGE_STATE_STORE_GET_AGGREGATE_STATE_TIMER.withTags(Map("storeName" -> surgeConsumer.storeName)))

private var lastConsumerSeen: Option[KafkaStreams] = None

Expand Down
6 changes: 5 additions & 1 deletion modules/metrics/src/main/scala/surge/metrics/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ trait MetricValueProvider {
* @param tags
* Additional key/value attributes to associate to the metric
*/
case class MetricInfo(name: String, description: String, tags: Map[String, String] = Map.empty)
case class MetricInfo(name: String, description: String, tags: Map[String, String] = Map.empty) {
def withTags(tags: Map[String, String]): MetricInfo = {
copy(tags = tags)
}
}

case class MetricValue(name: String, tags: Map[String, String], value: Double)
case class MetricDescription(name: String, description: String, tags: Map[String, String], recordingLevel: RecordingLevel)
Expand Down
99 changes: 99 additions & 0 deletions modules/metrics/src/main/scala/surge/metrics/Metrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,107 @@ import surge.metrics.statistics.{ Count, ExponentiallyWeightedMovingAverage, Mos
import scala.collection.mutable
import scala.concurrent.duration._

import scala.jdk.CollectionConverters._

object Metrics {
lazy val globalMetricRegistry: Metrics = Metrics(config = MetricsConfig.fromConfig(ConfigFactory.load()))

def surgeMetricInfo: Map[String, MetricInfo] = Map(
SURGE_AGGREGATE_AGGREGATE_STATE_SERIALIZATION_TIMER.name -> SURGE_AGGREGATE_AGGREGATE_STATE_SERIALIZATION_TIMER,
SURGE_STATE_STORE_GET_AGGREGATE_STATE_TIMER.name -> SURGE_STATE_STORE_GET_AGGREGATE_STATE_TIMER,
SURGE_AGGREGATE_IS_CURRENT_TIMER.name -> SURGE_AGGREGATE_IS_CURRENT_TIMER,
SURGE_AGGREGATE_EVENT_SERIALIZATION_TIMER.name -> SURGE_AGGREGATE_EVENT_SERIALIZATION_TIMER,
SURGE_AGGREGATE_STATE_CURRENT_RATE.name -> SURGE_AGGREGATE_STATE_CURRENT_RATE,
SURGE_AGGREGATE_STATE_NOT_CURRENT_RATE.name -> SURGE_AGGREGATE_STATE_NOT_CURRENT_RATE,
SURGE_AGGREGATE_KAFKA_WRITE_TIMER.name -> SURGE_AGGREGATE_KAFKA_WRITE_TIMER,
SURGE_AGGREGATE_MESSAGE_PUBLISH_RATE.name -> SURGE_AGGREGATE_MESSAGE_PUBLISH_RATE,
SURGE_AGGREGATE_ACTOR_STATE_INITIALIZATION_TIMER.name -> SURGE_AGGREGATE_ACTOR_STATE_INITIALIZATION_TIMER,
SURGE_AGGREGATE_STATE_DESERIALIZATION_TIMER.name -> SURGE_AGGREGATE_STATE_DESERIALIZATION_TIMER,
SURGE_AGGREGATE_COMMAND_HANDLING_TIMER.name -> SURGE_AGGREGATE_COMMAND_HANDLING_TIMER,
SURGE_AGGREGATE_MESSAGE_HANDLING_TIMER.name -> SURGE_AGGREGATE_MESSAGE_HANDLING_TIMER,
SURGE_AGGREGATE_EVENT_HANDLING_TIMER.name -> SURGE_AGGREGATE_EVENT_HANDLING_TIMER,
SURGE_AGGREGATE_EVENT_PUBLISH_TIMER.name -> SURGE_AGGREGATE_EVENT_PUBLISH_TIMER,
SURGE_GRPC_PROCESS_COMMAND_TIMER.name -> SURGE_GRPC_PROCESS_COMMAND_TIMER,
SURGE_GRPC_HANDLE_EVENTS_TIMER.name -> SURGE_GRPC_HANDLE_EVENTS_TIMER,
SURGE_GRPC_FORWARD_COMMAND_TIMER.name -> SURGE_GRPC_FORWARD_COMMAND_TIMER,
SURGE_GRPC_GET_AGGREGATE_STATE_TIMER.name -> SURGE_GRPC_GET_AGGREGATE_STATE_TIMER,
SURGE_EVENT_DESERIALIZATION_TIMER.name -> SURGE_EVENT_DESERIALIZATION_TIMER,
SURGE_EVENT_SHOULD_PARSE_MESSAGE_TIMER.name -> SURGE_EVENT_SHOULD_PARSE_MESSAGE_TIMER,
SURGE_EVENT_HANDLER_EXCEPTION_RATE.name -> SURGE_EVENT_HANDLER_EXCEPTION_RATE)

val SURGE_EVENT_HANDLER_EXCEPTION_RATE: MetricInfo =
MetricInfo(name = "surge.event.handler.exception.rate", description = "Rate of exceptions caught while handling an event")
val SURGE_STATE_STORE_GET_AGGREGATE_STATE_TIMER: MetricInfo =
MetricInfo(name = s"surge.state-store.get-aggregate-state-timer", description = "The time taken to fetch aggregate state from the KTable")

val SURGE_AGGREGATE_IS_CURRENT_TIMER: MetricInfo =
MetricInfo(s"surge.aggregate.is-current-timer", "Average time in milliseconds taken to check if a particular aggregate is up to date in the KTable")

val SURGE_AGGREGATE_EVENT_SERIALIZATION_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.event-serialization-timer",
description = "Average time taken in milliseconds to serialize an individual event to bytes before persisting to Kafka")

val SURGE_AGGREGATE_AGGREGATE_STATE_SERIALIZATION_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.aggregate-state-serialization-timer",
description = "Average time taken in milliseconds to serialize a new aggregate state to bytes before persisting to Kafka")

val SURGE_AGGREGATE_STATE_CURRENT_RATE: MetricInfo = MetricInfo(
name = s"surge.aggregate.state-current-rate",
description = "The per-second rate of aggregates that are up-to-date in and can be loaded immediately from the KTable")

val SURGE_AGGREGATE_STATE_NOT_CURRENT_RATE: MetricInfo = MetricInfo(
name = s"surge.aggregate.state-not-current-rate",
description = "The per-second rate of aggregates that are not up-to-date in the KTable and must wait to be loaded")

val SURGE_AGGREGATE_KAFKA_WRITE_TIMER: MetricInfo = MetricInfo(
s"surge.aggregate.kafka-write-timer",
"Average time in milliseconds that it takes the publisher to write a batch of messages (events & state) to Kafka")

val SURGE_AGGREGATE_MESSAGE_PUBLISH_RATE: MetricInfo = MetricInfo(
name = s"surge.aggregate.message-publish-rate",
description = "The per-second rate at which this aggregate attempts to publish messages to Kafka")

val SURGE_AGGREGATE_ACTOR_STATE_INITIALIZATION_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.actor-state-initialization-timer",
description = "Average time in milliseconds taken to load aggregate state from the KTable")

val SURGE_AGGREGATE_STATE_DESERIALIZATION_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.state-deserialization-timer",
description = "Average time taken in milliseconds to deserialize aggregate state after the bytes are read from the KTable")

val SURGE_EVENT_DESERIALIZATION_TIMER: MetricInfo =
MetricInfo(name = s"surge.event.deserialization-timer", description = "The average time (in ms) taken to deserialize events")

val SURGE_EVENT_SHOULD_PARSE_MESSAGE_TIMER: MetricInfo =
MetricInfo(name = s"surge.event.should-parse-message-timer", description = "The average time (in ms) taken by the shouldParseMessage function")

val SURGE_AGGREGATE_COMMAND_HANDLING_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.command-handling-timer",
description = "Average time taken in milliseconds for the business logic 'processCommand' function to process a command")

val SURGE_AGGREGATE_MESSAGE_HANDLING_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.message-handling-timer",
description = "Average time taken in milliseconds for the business logic 'processCommand' function to process a command")

val SURGE_AGGREGATE_EVENT_HANDLING_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.event-handling-timer",
description = "Average time taken in milliseconds for the business logic 'handleEvent' function to handle an event")

val SURGE_AGGREGATE_EVENT_PUBLISH_TIMER: MetricInfo = MetricInfo(
name = s"surge.aggregate.event-publish-timer",
description = "Average time taken in milliseconds to persist all generated events plus an updated state to Kafka")

val SURGE_GRPC_PROCESS_COMMAND_TIMER: MetricInfo =
MetricInfo("surge.grpc.process-command-timer", "The time taken by the Surge gRPC processCommand business logic callback")

val SURGE_GRPC_HANDLE_EVENTS_TIMER: MetricInfo =
MetricInfo("surge.grpc.handle-events-timer", "The time taken by gRPC handleEvents business logic to handle the event")

val SURGE_GRPC_FORWARD_COMMAND_TIMER: MetricInfo =
MetricInfo("surge.grpc.forward-command-timer", "The time taken by gRPC forwardCommand to forward the command to the aggregate")

val SURGE_GRPC_GET_AGGREGATE_STATE_TIMER: MetricInfo =
MetricInfo("surge.grpc.get-aggregate-state-timer", "The time taken by gRPC getState to get the state of the aggregate")
}

object KafkaMetricListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ class MetricsSpec extends AnyWordSpec with Matchers with MockitoSugar {
metrics.metricHtml should include(sensorMax.description)
}

"Have MetricInfo prepared that matches names" in {
Metrics.surgeMetricInfo.keySet.size shouldEqual 21

Metrics.surgeMetricInfo.keySet.foreach(name => {
Metrics.surgeMetricInfo(name).name shouldEqual name
})
}

def setupMockKafkaMetricListener(metrics: Metrics): KafkaMetricListener = {
val mockKafkaMetricListener = mock[KafkaMetricListener]
doNothing().when(mockKafkaMetricListener).onMetricsRegistered(anyString, any(classOf[KafkaMetricListener.KafkaMetricSupplier]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GenericAsyncAggregateCommandModel(bridgeToBusinessApp: BusinessLogicServic

private val metric: Metrics = Metrics.globalMetricRegistry
private val processCommandTimerMetric: Timer =
metric.timer(MetricInfo("surge.grpc.process-command-timer", "The time taken by the Surge gRPC processCommand business logic callback"))
metric.timer(Metrics.SURGE_GRPC_PROCESS_COMMAND_TIMER)

override def processCommand(aggregate: Option[SurgeState], surgeCommand: SurgeCmd): Future[Seq[SurgeEvent]] =
processCommandTimerMetric.timeFuture {
Expand Down Expand Up @@ -78,7 +78,7 @@ class GenericAsyncAggregateCommandModel(bridgeToBusinessApp: BusinessLogicServic
}

private val handleEventsTimerMetric: Timer =
metric.timer(MetricInfo("surge.grpc.handle-events-timer", "The time taken by gRPC handleEvents business logic to handle the event"))
metric.timer(Metrics.SURGE_GRPC_HANDLE_EVENTS_TIMER)

override def handleEvents(aggregate: Option[SurgeState], surgeEvents: Seq[SurgeEvent]): Future[Option[SurgeState]] =
handleEventsTimerMetric.timeFuture {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MultilanguageGatewayServiceImpl(surgeEngine: SurgeCommand[UUID, SurgeState

private val metric: Metrics = Metrics.globalMetricRegistry
private val forwardCommandTimerMetric: Timer =
metric.timer(MetricInfo("surge.grpc.forward-command-timer", "The time taken by gRPC forwardCommand to forward the command to the aggregate"))
metric.timer(Metrics.SURGE_GRPC_FORWARD_COMMAND_TIMER)

override def forwardCommand(in: ForwardCommandRequest): Future[ForwardCommandReply] = forwardCommandTimerMetric.timeFuture {
in.command match {
Expand Down Expand Up @@ -56,7 +56,7 @@ class MultilanguageGatewayServiceImpl(surgeEngine: SurgeCommand[UUID, SurgeState
}

private val getAggregateStateTimerMetric: Timer =
metric.timer(MetricInfo("surge.grpc.get-aggregate-state-timer", "The time taken by gRPC getState to get the state of the aggregate"))
metric.timer(Metrics.SURGE_GRPC_GET_AGGREGATE_STATE_TIMER)

override def getState(in: GetStateRequest): Future[GetStateReply] = getAggregateStateTimerMetric.timeFuture {
logger.info(s"Business app asking for state of aggregate with id ${in.aggregateId}!")
Expand Down

0 comments on commit 065f64c

Please sign in to comment.