Skip to content

Commit

Permalink
fix clear-expired-trackers-dead-letter (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
zachary-rote authored Jul 7, 2022
1 parent 4dc2652 commit 821e225
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class KafkaProducerActorImpl(
case RestartProducer => restartPublisher()
case ShutdownProducer => stopPublisher()
case ClearExpiredTrackers =>
context.become(processing(state.clearExpiredTrackers()))
context.become(fenced(state.clearExpiredTrackers(), initialFenceTime))
case CheckKTableProgress => log.trace("KafkaProducerActor ignoring CheckKTableProgress message from the fenced state")
case FlushMessages => log.trace("KafkaProducerActor ignoring FlushMessages message from the fenced state")
case update: KTableProgressUpdate => context.become(fenced(state.processedUpTo(update), initialFenceTime))
Expand All @@ -250,8 +250,8 @@ class KafkaProducerActorImpl(
case msg: InternalMessage => handleProcessingInternalMessage(msg, state)
case msg: KafkaProducerActorImpl.KafkaProducerActorMessage => handleProcessingProducerMessage(msg, state)
case GetHealth => doHealthCheck(state)
case ClearExpiredTrackers =>
context.become(processing(state.clearExpiredTrackers()))
// case ClearExpiredTrackers =>
// context.become(processing(state.clearExpiredTrackers()))
case ShutdownProducer => stopPublisher()
case Status.Failure(e) =>
log.error(s"Saw unhandled exception in producer for $assignedPartition", e)
Expand Down Expand Up @@ -299,12 +299,21 @@ class KafkaProducerActorImpl(
case msg: EventsPublished => handle(state, msg)
case msg: EventsFailedToPublish =>
handleFailedToPublish(state, msg)

case FlushMessages => handleFlushMessages(state)
case msg: AbortTransactionFailed => handle(msg)
case KafkaProducerActorImpl.ClearExpiredTrackers => context.become(processing(state.clearExpiredTrackers()))
case FlushMessages => handleFlushMessages(state)
case msg: AbortTransactionFailed => handle(msg)
case msg: ProducerFenced =>
context.become(fenced(state, Instant.now))
self ! msg
case KafkaProducerActorImpl.InitTransactions => unhandled()
case KafkaProducerActorImpl.InitTransactionSuccess => unhandled()
case KafkaProducerActorImpl.FailedToInitTransactions => unhandled()
case KafkaProducerActorImpl.RestartProducer => unhandled()
case msg: PublishWithSender => unhandled(msg)
case msg: PendingInitialization => unhandled(msg)

case msg: KTableProgressUpdate => unhandled(msg)

case other => unhandled(other)
}
}
Expand Down

0 comments on commit 821e225

Please sign in to comment.