Skip to content

Commit

Permalink
Filter events that shouldn't be persisted later in the flow (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffboutotte authored Aug 17, 2022
1 parent fb73b65 commit db07057
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,23 @@ trait SurgeModel[State, Message, Event] extends ProducerActorContext {
description = "Average time taken in milliseconds to serialize a new aggregate state to bytes before persisting to Kafka",
tags = Map("aggregate" -> aggregateName)))

def serializeEvents(events: Seq[(Event, KafkaTopic)]): Future[Seq[KafkaProducerActor.MessageToPublish]] = Future {
def serializeEvents(events: Seq[(Event, Option[KafkaTopic])]): Future[Seq[KafkaProducerActor.MessageToPublish]] = Future {
val eventWriteFormatting = eventWriteFormattingOpt.getOrElse {
throw new IllegalStateException("businessLogic.eventWriteFormattingOpt must not be None")
}
events.map { case (event, topic) =>
val serializedMessage = serializeEventTimer.time(eventWriteFormatting.writeEvent(event))
log.trace(s"Publishing event for {} {}", Seq(aggregateName, serializedMessage.key): _*)
KafkaProducerActor.MessageToPublish(
// Using null here since we need to add the headers but we don't want to explicitly assign the partition
new ProducerRecord(
topic.name,
null, // scalastyle:ignore null
serializedMessage.key,
serializedMessage.value,
HeadersHelper.createHeaders(serializedMessage.headers)))
events.flatMap { case (event, topicOpt) =>
topicOpt.map { topic =>
val serializedMessage = serializeEventTimer.time(eventWriteFormatting.writeEvent(event))
log.trace(s"Publishing event for {} {}", Seq(aggregateName, serializedMessage.key): _*)
KafkaProducerActor.MessageToPublish(
// Using null here since we need to add the headers but we don't want to explicitly assign the partition
new ProducerRecord(
topic.name,
null, // scalastyle:ignore null
serializedMessage.key,
serializedMessage.value,
HeadersHelper.createHeaders(serializedMessage.headers)))
}
}
}(serializationExecutionContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ trait SurgeContext[State, Event] {
def persistEvent(event: Event): SurgeContext[State, Event]
def persistEvents(events: Seq[Event]): SurgeContext[State, Event]
def persistToTopic(event: Event, topic: KafkaTopic): SurgeContext[State, Event]
def persistToTopics(eventsWithTopics: Seq[(Event, KafkaTopic)]): SurgeContext[State, Event]
def persistRecord(record: ProducerRecord[String, Array[Byte]]): SurgeContext[State, Event]
def persistRecords(records: Seq[ProducerRecord[String, Array[Byte]]]): SurgeContext[State, Event]
def updateState(state: Option[State]): SurgeContext[State, Event]
Expand All @@ -38,19 +39,17 @@ case class SurgeContextImpl[State, Event](
defaultEventTopicOpt: Option[KafkaTopic],
sideEffects: Seq[SurgeSideEffect[State]] = Seq.empty,
isRejected: Boolean = false,
events: Seq[(Event, KafkaTopic)] = Seq.empty,
events: Seq[(Event, Option[KafkaTopic])] = Seq.empty,
records: Seq[ProducerRecord[String, Array[Byte]]] = Seq.empty)
extends SurgeContext[State, Event] {
override def persistEvent(event: Event): SurgeContextImpl[State, Event] = {
val maybeEventAndTopic = defaultEventTopicOpt.map(topic => Seq(event -> topic)).getOrElse(Seq.empty)
copy(events = events ++ maybeEventAndTopic)
}
override def persistEvents(events: Seq[Event]): SurgeContextImpl[State, Event] = {
val maybeEventAndTopic = defaultEventTopicOpt.map(topic => events.map(e => e -> topic)).getOrElse(Seq.empty)
copy(events = this.events ++ maybeEventAndTopic)
}
override def persistEvent(event: Event): SurgeContextImpl[State, Event] =
copy(events = events :+ event -> defaultEventTopicOpt)
override def persistEvents(events: Seq[Event]): SurgeContextImpl[State, Event] =
copy(events = this.events ++ events.map(e => e -> defaultEventTopicOpt))

override def persistToTopic(event: Event, topic: KafkaTopic): SurgeContext[State, Event] = copy(events = this.events :+ event -> topic)
override def persistToTopic(event: Event, topic: KafkaTopic): SurgeContext[State, Event] = copy(events = this.events :+ event -> Some(topic))
override def persistToTopics(eventsWithTopics: Seq[(Event, KafkaTopic)]): SurgeContext[State, Event] =
copy(events = this.events ++ eventsWithTopics.map(tup => tup._1 -> Some(tup._2)))

override def persistRecord(record: ProducerRecord[String, Array[Byte]]): SurgeContextImpl[State, Event] = copy(records = records :+ record)
override def persistRecords(records: Seq[ProducerRecord[String, Array[Byte]]]): SurgeContextImpl[State, Event] = copy(records = this.records ++ records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class PersistentActorSpec

probe.expectMsg(PersistentActor.ACKSuccess(expectedState))
val expectedStateSerialized = businessLogic.serializeState(state.aggregateId, expectedState, mockProducer.assignedPartition).futureValue
val expectedEventSerialized = businessLogic.serializeEvents(Seq(expectedEvent -> businessLogic.kafka.eventsTopic)).futureValue
val expectedEventSerialized = businessLogic.serializeEvents(Seq(expectedEvent -> Some(businessLogic.kafka.eventsTopic))).futureValue

val messagesCaptor: ArgumentCaptor[Seq[KafkaProducerActor.MessageToPublish]] = ArgumentCaptor.forClass(classOf[Seq[KafkaProducerActor.MessageToPublish]])
verify(mockProducer).publish(any[UUID], argEquals(state.aggregateId), messagesCaptor.capture())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import org.scalatest.wordspec.AnyWordSpec
import surge.health.windows.Window

import scala.concurrent.duration._
import scala.languageFeature.postfixOps

class WindowSpec extends AnyWordSpec with Matchers with Eventually {
implicit override val patienceConfig: PatienceConfig =
Expand Down

0 comments on commit db07057

Please sign in to comment.