Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Consumer dies after a reconnect following a network hiccup #1408

Open
L7R7 opened this issue Aug 27, 2021 · 12 comments
Open

Kafka Consumer dies after a reconnect following a network hiccup #1408

L7R7 opened this issue Aug 27, 2021 · 12 comments

Comments

@L7R7
Copy link

L7R7 commented Aug 27, 2021

Versions used

akka version: 2.6.15
akka-stream-kafka version: 2.1.1
scala version: 2.13.6

Expected Behavior

We're using a committableSource to consume messages from Kafka. We're reading the messages, parse the JSON payload and persist it into a database.
We're also using Draining Control for graceful shutdown.
I was hoping that reconnects after network glitches are handled properly by either akka-stream-kafka or the underlying kafka client lib.

Actual Behavior

The consumer is working as expected most of the time. However, it (very sporadically) dies after a short network hiccup happened (we see error logs from our database connection, so I'm pretty sure that it's a short network issue that's resolved by a reconnect after a short moment).
The first thing I see in the logs is that the consumer fails to do a successful commit against the broker, then loses the assigned partitions and has to rejoin the consumer group. It does so (as I expect), but shortly after that leaves the consumer group. To me, it looks as if the Kafka consumer is shutting down, while the service itself keeps running. I can't see a reason why the consumer should be shutting down.

At this point I'm not sure if it's something we're missing on our side (should we use something other than committableSource? Do we miss some configuration? ...), or if it's a bug or just bad luck? Or do we have to take care of such problems on our side by restarting the Kafka source in these cases, like it's mentioned in this issue?

What's even more confusing is the fact that we have two consumer groups that read from the same topic and do similar things when consuming the messages from Kafka. We observed this behavior twice over a couple of weeks. One time for each consumer group, both of the times the other group was consuming without any issues.

Relevant logs

I'm putting the (almost) full JSON from our logs here, so it's clear when the logs were made, which logger and what message (I replaced sensitive stuff with ...). I'll try to group them logically (how it makes sense to me, at least):

consumer fails to commit and loses partition assignments after network issues
[
{"ts":"2021-08-27T06:27:13.442Z","message":"[Consumer clientId=..., groupId=...] Group coordinator ... (id: ... rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted.","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"kafka-coordinator-heartbeat-thread | ...","level":"INFO"},
{"ts":"2021-08-27T06:27:13.512Z","message":"[76f50] Kafka commit is to be retried, after=7619 ms, commitsInProgress=0, cause=null","logger_name":"akka.kafka.internal.KafkaConsumerActor","thread_name":"system-akka.actor.default-dispatcher-5","level":"WARN","akkaAddress":"akka://system","sourceThread":"system-akka.kafka.default-dispatcher-29","akkaSource":"akka://system/system/kafka-consumer-1","sourceActorSystem":"system","akkaTimestamp":"06:27:13.512UTC"},
{"ts":"2021-08-27T06:27:20.853Z","message":"[fae5a] Completing","logger_name":"akka.kafka.internal.SingleSourceLogic","thread_name":"system-akka.actor.default-dispatcher-18","level":"INFO","akkaAddress":"akka://system","sourceThread":"system-akka.actor.default-dispatcher-5","akkaSource":"SingleSourceLogic(akka://system)","sourceActorSystem":"system","akkaTimestamp":"06:27:20.853UTC"},
{"ts":"2021-08-27T06:27:33.732Z","message":"[Consumer clientId=..., groupId=...] Error sending fetch request (sessionId=..., epoch=431315) to node 6:","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"system-akka.kafka.default-dispatcher-28","level":"INFO","stack_trace":"org.apache.kafka.common.errors.DisconnectException: null\n"},
{"ts":"2021-08-27T06:27:33.931Z","message":"[Consumer clientId=..., groupId=...] Error sending fetch request (sessionId=..., epoch=426654) to node 8:","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"system-akka.kafka.default-dispatcher-24","level":"INFO","stack_trace":"org.apache.kafka.common.errors.DisconnectException: null\n"},
{"ts":"2021-08-27T06:27:34.064Z","message":"[Consumer clientId=..., groupId=...] Error sending fetch request (sessionId=1096616561, epoch=426840) to node 7:","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"system-akka.kafka.default-dispatcher-24","level":"INFO","stack_trace":"org.apache.kafka.common.errors.DisconnectException: null\n"},
{"ts":"2021-08-27T06:27:34.132Z","message":"[76f50] Kafka commit is to be retried, after=20569 ms, commitsInProgress=0, cause=null","logger_name":"akka.kafka.internal.KafkaConsumerActor","thread_name":"system-akka.actor.default-dispatcher-18","level":"WARN","akkaAddress":"akka://system","sourceThread":"system-akka.kafka.default-dispatcher-29","akkaSource":"akka://system/system/kafka-consumer-1","sourceActorSystem":"system","akkaTimestamp":"06:27:34.132UTC"},
{"ts":"2021-08-27T06:27:34.151Z","message":"[Consumer clientId=..., groupId=...] Discovered group coordinator ... (id: 2147483639 rack: null)","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"system-akka.kafka.default-dispatcher-29","level":"INFO"},
{"ts":"2021-08-27T06:27:34.313Z","message":"[Consumer clientId=..., groupId=...] Lost previously assigned partitions topic-0, topic-3, topic-4, topic-1, topic-2, topic-7, topic-8, topic-5, topic-6, topic-11, topic-9, topic-10","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-22","level":"INFO"},
{"ts":"2021-08-27T06:27:34.313Z","message":"[Consumer clientId=..., groupId=...] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-22","level":"INFO"},
{"ts":"2021-08-27T06:27:34.313Z","message":"[Consumer clientId=..., groupId=...] Attempt to heartbeat with Generation{generationId=521, memberId='...', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"system-akka.kafka.default-dispatcher-22","level":"INFO"},
{"ts":"2021-08-27T06:27:34.314Z","message":"[Consumer clientId=..., groupId=...] Offset commit failed on partition topic-2 at offset 6283765: The coordinator is not aware of this member.","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-22","level":"ERROR"}
]
consumer starts re-joining
[
{"ts":"2021-08-27T06:27:34.314Z","message":"[Consumer clientId=..., groupId=...] (Re-)joining group","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"system-akka.kafka.default-dispatcher-22","level":"INFO"},
{"ts":"2021-08-27T06:27:34.314Z","message":"[Consumer clientId=..., groupId=...] OffsetCommit failed with Generation{generationId=521, memberId='...', protocol='range'}: The coordinator is not aware of this member.","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-22","level":"INFO"},
{"ts":"2021-08-27T06:27:34.319Z","message":"[Consumer clientId=..., groupId=...] (Re-)joining group","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"system-akka.kafka.default-dispatcher-22","level":"INFO"},
{"ts":"2021-08-27T06:27:34.319Z","message":"[76f50] Kafka commit is to be retried, after=161 ms, commitsInProgress=0, cause=null","logger_name":"akka.kafka.internal.KafkaConsumerActor","thread_name":"system-akka.actor.default-dispatcher-8","level":"WARN","akkaAddress":"akka://system","sourceThread":"system-akka.kafka.default-dispatcher-22","akkaSource":"akka://system/system/kafka-consumer-1","sourceActorSystem":"system","akkaTimestamp":"06:27:34.319UTC"}
]
consumer re-joins successfully
[
{"ts":"2021-08-27T06:27:37.322Z","message":"[Consumer clientId=..., groupId=...] Successfully joined group with generation Generation{generationId=523, memberId='...', protocol='range'}","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"system-akka.kafka.default-dispatcher-28","level":"INFO"},
{"ts":"2021-08-27T06:27:37.322Z","message":"[Consumer clientId=..., groupId=...] Finished assignment for group at generation 523: {...=Assignment(partitions=[topic-0, topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-7, topic-8, topic-9, topic-10, topic-11])}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-28","level":"INFO"},
{"ts":"2021-08-27T06:27:37.392Z","message":"[Consumer clientId=..., groupId=...] Notifying assignor about the new Assignment(partitions=[topic-0, topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-7, topic-8, topic-9, topic-10, topic-11])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.392Z","message":"[Consumer clientId=..., groupId=...] Adding newly assigned partitions: topic-0, topic-3, topic-4, topic-1, topic-2, topic-7, topic-8, topic-5, topic-6, topic-11, topic-9, topic-10","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.392Z","message":"[Consumer clientId=..., groupId=...] Successfully synced group in generation Generation{generationId=523, memberId='...', protocol='range'}","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-6 to the committed offset FetchPosition{offset=6264460, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 6 rack: null)], epoch=17}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-7 to the committed offset FetchPosition{offset=6242663, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 8 rack: null)], epoch=18}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-4 to the committed offset FetchPosition{offset=6282332, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 7 rack: null)], epoch=20}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-10 to the committed offset FetchPosition{offset=6257160, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 7 rack: null)], epoch=18}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-8 to the committed offset FetchPosition{offset=6326935, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[.... (id: 6 rack: null)], epoch=15}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-2 to the committed offset FetchPosition{offset=6283764, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 8 rack: null)], epoch=15}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-0 to the committed offset FetchPosition{offset=6347171, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 8 rack: null)], epoch=15}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-11 to the committed offset FetchPosition{offset=6326452, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 7 rack: null)], epoch=16}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-5 to the committed offset FetchPosition{offset=6235477, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 8 rack: null)], epoch=15}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-5 to the committed offset FetchPosition{offset=6235477, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 8 rack: null)], epoch=15}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-3 to the committed offset FetchPosition{offset=6238460, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 6 rack: null)], epoch=14}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"},
{"ts":"2021-08-27T06:27:37.394Z","message":"[Consumer clientId=..., groupId=...] Setting offset for partition topic-9 to the committed offset FetchPosition{offset=6253467, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[... (id: 7 rack: null)], epoch=18}}","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-27","level":"INFO"}
]
consumer shuts down and leaves the group
[
{"ts":"2021-08-27T06:27:50.875Z","message":"[Consumer clientId=..., groupId=...] Member ... sending LeaveGroup request to coordinator ... (id: 2147483639 rack: null) due to the consumer is being closed","logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","thread_name":"system-akka.kafka.default-dispatcher-34","level":"INFO"},
{"ts":"2021-08-27T06:27:50.875Z","message":"[Consumer clientId=..., groupId=...] Revoke previously assigned partitions topic-0, topic-3, topic-4, topic-1, topic-2, topic-7, topic-8, topic-5, topic-6, topic-11, topic-9, topic-10","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"system-akka.kafka.default-dispatcher-34","level":"INFO"},
{"ts":"2021-08-27T06:27:50.882Z","message":"App info kafka.consumer for ... unregistered","logger_name":"org.apache.kafka.common.utils.AppInfoParser","thread_name":"system-akka.kafka.default-dispatcher-34","level":"INFO"}
]

Consumer config

kafka.consumer {
    kafka-clients {
        ssl.keystore.location = ${?KAFKA_KEYSTORE_LOCATION}
        ssl.keystore.password = ${?KAFKA_KEYSTORE_PASSWORD}
        ssl.truststore.location = ${?KAFKA_KEYSTORE_LOCATION}
        ssl.truststore.password = ${?KAFKA_KEYSTORE_PASSWORD}
        security.protocol = "PLAINTEXT"
        security.protocol = ${?KAFKA_SECURITY_PROTOCOL}
        enable.auto.commit = "false"
        auto.offset.reset = "earliest"
        fetch.max.bytes = "16000000"
        max.poll.records = "50"
        client.id = ${?POD_NAME}
    }
}

we're consuming rather large messages, so fetch.max.bytes is set to something unusual, but I don't think that's relevant here.

Reproducible Test Case

I'm currently not able to reproduce the behavior. It happened to us twice over the course of several weeks, and I think it's initiated by a network glitch which is hard to simulate

@mrubin
Copy link

mrubin commented Sep 14, 2021

We are experiencing the same thing (I mentioned in #1264 as well) - ever since we switched from Kafka's Java SDK to alpakka-kafka, some of our consumers sporadically die after a network hiccup. Others are fine.

Around this time we see "Completing" in the logs - it looks like the Stream completed for those consumers (and never restarted).

image (137)

Our typical setup (Play Framework):

private val config = configuration.underlying.getConfig("akka.kafka.consumer")

private val consumerSettings = ConsumerSettings(config, new StringDeserializer, new StringDeserializer).withGroupId("...")

private val committerSettings = CommitterSettings(configuration.underlying.getConfig("akka.kafka.committer"))

private val parallelism = configuration.get[Int]("kafka.foo-consumer.map-async-parallelism")

private val lastConsumerControl = new AtomicReference[Consumer.Control](Consumer.NoopControl)

val restartSettings = RestartSettings(
  minBackoff = 3.seconds,
  maxBackoff = 300.seconds,
  randomFactor = 0.0)

private val source = RestartSource.withBackoff(restartSettings) { () =>
  Consumer.committableSource(
    consumerSettings,
    Subscriptions.topicPattern("..."))
    .mapMaterializedValue(c => lastConsumerControl.set(c))
}

private val materializer = source
  .map(msg => (Json.parse(msg.record.value()), msg.committableOffset))
  .mapAsync(parallelism) {
    case (js, committableOffset) =>
      val f = // some logic to handle record

      Future.sequence(Seq(f).flatten)
        .map(_ => committableOffset)
        .recover { case _ => committableOffset }
        .andThen { case Failure(e: Exception) => serviceLog.logException(e) }
  }
  .log("...")
  .addAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
  .addAttributes(Attributes.logLevels(
    onElement = Attributes.LogLevels.Off,
    onFinish = Attributes.LogLevels.Off,
    onFailure = Attributes.LogLevels.Error))
  .toMat(Committer.sink(committerSettings))(Keep.right)
  .run()

coordinatedShutdown.addTask(
  phase = CoordinatedShutdown.PhaseServiceStop,
  taskName = "...") {
  () => lastConsumerControl.get().drainAndShutdown(materializer).map(_ => Done)
}

@L7R7
Copy link
Author

L7R7 commented Sep 15, 2021

@mrubin thanks for adding the observations from your systems. To me it looks like you have a similar problem like we observe.

@mrubin
Copy link

mrubin commented Sep 16, 2021

@seglo do you see anything interesting / that stands out in the above confirmation for the alpakka-kafka consumer? Thank you

@L7R7
Copy link
Author

L7R7 commented Sep 24, 2021

We implemented a workaround in the meantime. We're watching the stream completion via the draining control and take down the Kubernetes liveness probe so that the pod gets restarted.
It's better than nothing, but it's not elegant and also isn't an ideal solution because IMHO there should be a way to make this robust without requiring Kubernetes.

@Aniket-Singla
Copy link

Faced similar issue with:
com.typesafe.akka:akka-stream-kafka_2.13:2.0.2

17:20:10.480 INFO 1 --- [ispatcher-33695] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=xx-xxxx-xxxxxxxxxx, groupId=xxx-xxxxxx] Group coordinator b-1.xxxxxxxxxxxx.amazonaws.com:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery

17:20:10.484 INFO 1 --- [t-dispatcher-17] akka.kafka.internal.SingleSourceLogic : [85b0f] Completing

17:20:14.410 INFO 1 --- [ead | xx-xxxxxx] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=xx-xxxx-xxxxxxxxx, groupId=xx-xxxxxx] Discovered group coordinator b-1.xxxxxxxxxxxx.amazonaws.com:9092 (id: 2147483646 rack: null)

17:20:14.682 INFO 1 --- [ispatcher-33673] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=xxx-xxxx-xxxxxxxx, groupId=xxxx-xxxxxxx] Member xx-xxxx-xxxxxxxx-50755c38-5ee2-4934-9522-165ad3f7c94f sending LeaveGroup request to coordinator b-1.xxxxxxxx.amazonaws.com:9092 (id: 2147483646 rack: null)

After printing these logs kafka consumer stopped consuming messages. It just left the consumer group and did not rejoin.

@weigoxp
Copy link

weigoxp commented Aug 26, 2022

Same issue here

@L7R7
Copy link
Author

L7R7 commented Sep 19, 2022

FTR: so far we haven't found a better solution than restarting the whole app. Unfortunately, due to the upcoming license changes to Akka, we won't dig deeper into that.

@seglo
Copy link
Contributor

seglo commented Sep 24, 2022

FYI I have no plans to upgrade Akka to releases under Lightbend's new license, but I understand the concern.

Replacing Akka would be a huge effort so I have hopes that a viable Akka fork (such as the Apache Akka fork that's beginning to gain attention) is found. In the mean time, Lightbend has committed to releasing security patches for Akka 2.6 until September of next year, which buys this and other OSS projects built on Akka some time to pivot.

@L7R7
Copy link
Author

L7R7 commented Apr 6, 2023

FYI I lost interest in this issue because we don't use Akka anymore. I would be fine with closing it, but I'll leave it open for now as somebody else might be affected by this problem

@tpanagos
Copy link

tpanagos commented Feb 2, 2024

I believe that we are experiencing this issue. Have not found a suitable config setup that might avoid it. We are running in k8s with kafka and the strimzi operator and akka 2.6. Keep the issue open, please. I am looking to try to crash the responsible actor using stream completion detection in the draining control, but haven't managed to find a reliable way. @L7R7 Would you share a snippet of your approach to the work-around, please?

@L7R7
Copy link
Author

L7R7 commented Feb 2, 2024

@tpanagos We migrated away from Akka to a different stack a while ago so I don't recall the details. But I can dig through the git history next week and see if I can provide you with some details

@L7R7
Copy link
Author

L7R7 commented Feb 7, 2024

@tpanagos From what I can see in our git history, we combined this approach: https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-control with a message to the Supervisor that stops the whole application. So basically a drainingControl.streamCompletion.onComplete(result => /* some logging and then a supervisor ! Stop */).
I hope that helps!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants