Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow (timeout) shutdown and rebalancing with withRebalanceSafeCommits #1132

Open
jgordijn opened this issue Dec 7, 2023 · 14 comments
Open

Comments

@jgordijn
Copy link

jgordijn commented Dec 7, 2023

I tried out the new withRebalanceSafeCommits feature and it has unexpected behavior:

  1. Shutdown takes long time now. Result is that all partitions handled by the stopping application are stuck
    It seems that on shutdown the stream is stopped, but the rebalanceListener is waiting until the last message is committed. As the stream is stopped, this will never happen and it takes 3 min (3/5 of the maxPollInterval) to stop.
  2. Joining the group takes a long time.
    When I start the second application it seems that rebalancing is started. However it still takes 3 minutes to join.

Maybe it has something to do with the slow (100ms) processing per message, but having a 10 message poll should mitigate this. I would thus expect rebalancing to happen in (worst case) 1 sec (10x100ms).

package listener

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import zio.kafka.consumer.{Consumer, ConsumerSettings, RebalanceListener, Subscription}
import zio.kafka.serde.Serde
import zio.stream.ZStream
import zio.{ZIO, ZIOAppDefault, ZLayer, durationInt}

object Main extends ZIOAppDefault {
  private val LOG = LoggerFactory.getLogger(Main.getClass)

  private val TOPIC = "topic1"

  def consumer: ZStream[Consumer, Throwable, Nothing] = Consumer
    .plainStream(Subscription.topics(TOPIC), Serde.string, Serde.string)
    .tap(r =>
      ZIO.succeed(LOG.info(
        s"${r.value} p:${r.offset.partition}, o:${r.offset.offset}"
      ))
    )
    .mapZIO(ZIO.succeed(_).delay(100.millis))
    .map(_.offset)
    .aggregateAsync(Consumer.offsetBatches)
    .tap(offsets =>
      ZIO.succeed(LOG.info(s"Going to commit: ${offsets.offsets.map { case (k, v) => s"${k.partition()}: ${v.offset()}" }}"))
    )
    .mapZIO(_.commit)
    .drain

  def consumerLayer = {
    val value: Map[String, AnyRef] = Map[String, AnyRef](
      ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "60000",
      ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG -> "250"
    )
    ZLayer.scoped(
      Consumer.make(
        ConsumerSettings(List("localhost:29092"))
          .withGroupId("tester")
          .withMaxPollRecords(10)
          .withRebalanceSafeCommits(true)
          .withOffsetRetrieval(
            Consumer.OffsetRetrieval.Auto(Consumer.AutoOffsetStrategy.Earliest)
          )
          .withProperties(
            value
          )
          .withRebalanceListener(RebalanceListener.apply(
            (assigned, _) => {
              val text = s"Assigned Partitions: ${assigned.map(_.partition())}"
              ZIO.succeed(LOG.info(text))
            },
            (revoked, _) => {
              val text = s"Revoked Partitions: ${revoked.map(_.partition())}"
              ZIO.succeed(LOG.info(text))
            }
          ))
      )
    )
  }

  override def run = {
    (for {
      _ <- ZIO.succeed(LOG.info(s"Starting"))
      _ <- consumer.runDrain
      _ <- ZIO.succeed(LOG.info("DONE"))
    } yield ())
      .provide(consumerLayer)
  }
}
@erikvanoosten
Copy link
Collaborator

Thanks @jgordijn, that is a really interesting finding!
Indeed, shutdown is something that we need to improve. I want to look at this beginning next year.

For now you could apply the workaround of setting a shorter maxRebalanceDuration, e.g. 15 seconds.

@jgordijn
Copy link
Author

jgordijn commented Dec 7, 2023

If I set maxRebalanceDuration, won't I fallback to old behaviour and get duplicates.

Also, please look at the rebalance time. It is not only the shutdown that seems to fail.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Dec 8, 2023

If I set maxRebalanceDuration, won't I fallback to old behaviour and get duplicates.

It depends. In the old behavior the program gets no chance at all to do commits. If you set a maxRebalanceTime, it at least gets some chance. Most programs commit everything within a few seconds. With slow processing like here, it will be necessary to reduce the amount of records that are pre-fetched (withMaxPollRecords(10) is a good start, but I recommend you also disable prefetching with withoutPartitionPreFetching) to be done processing and committing before the deadline.

Also, please look at the rebalance time. It is not only the shutdown that seems to fail.

Can you elaborate on that please? What error do you see?

@jgordijn
Copy link
Author

I start consumer1. Then I start consumer 2, which doesn't start immediately (or after a short delay). Meanwhile the new consumer doesn't show anything in the log, and in consumer1 I see:

09:11:35.370 zio-kafka-runloop-thread-0 INFO  logger - [Consumer clientId=consumer-tester1-1, groupId=tester1] Request joining group due to: group is already rebalancing

It takes nearly 3 minutes (differs per run), before consumer2 starts consuming.

@erikvanoosten
Copy link
Collaborator

Did you add withoutPartitionPreFetching to the consumer settings already?

@jgordijn
Copy link
Author

A yes, flag (withoutPartitionPreFetching) seems to work. Why is this flag there? The Kafka Client also has prefetching, right?

I'm a bit worried about the amount of flags and combination I need to use to get it to work.

@erikvanoosten
Copy link
Collaborator

The kafka-client does not do any pre-fetching. By default zio-kafka does quite a bit of pre-fetching.

I'm a bit worried about the amount of flags and combination I need to use to get it to work.

I know... Kafka has a huge amount of knobs that can be turned. Its a pain to support people with it because there is always one more setting that can be tweaked.

I am really happy that this solved the issue though! We will need to add this gotcha to the documentation.

@jgordijn
Copy link
Author

@erikvanoosten
Copy link
Collaborator

I can only speculate. Conduktor uses zio-kafka, so most probably they are describing how their product works, not the underlying java client.

@jgordijn
Copy link
Author

Why did you close this issue? The issue with shutdown is not resolved.

@erikvanoosten erikvanoosten reopened this Dec 12, 2023
@erikvanoosten
Copy link
Collaborator

@jgordijn Yep, you are right. Thanks for correcting me.

@svroonland
Copy link
Collaborator

svroonland commented Apr 4, 2024

It seems that on shutdown the stream is stopped, but the rebalanceListener is waiting until the last message is committed.

@erikvanoosten You think this would be solved by #1201 ?

@erikvanoosten
Copy link
Collaborator

It seems that on shutdown the stream is stopped, but the rebalanceListener is waiting until the last message is committed.

@erikvanoosten You think this would be solved by #1201 ?

Yes, that would be my expectation 😄

@svroonland
Copy link
Collaborator

Sounds related and perhaps fully fixed by #1358.

Shutdown of one of the members of a consumer group will result in a rebalancing. Is that the situation here as well, or was shutdown of a member of a single-instance consmer group resulting in long times shutdown times?

During rebalancing sometimes partitions are assigned and then removed without any records having been fetched for those partitions. Partition streams are created and emitted in the top-level stream but may not have pulled from by the downstream stages (in user code). In that case, the safe rebalance mechanism would wait for an end signal for those partition streams that never arrived and timeout only after the maxRebalanceDuration.

We will be adding some logging for this situation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants