Skip to content

KafkaConsumerConsistentRegion

Dan Debrunner edited this page Jan 10, 2018 · 2 revisions

The current design page does not describe what happens with the operator in a consistent region so using this page as notes to try and understand it.

Overview

The operator has two threads, related to two queues.

  • processThread started by AbstractKafkaConsumerOperator.allPortsReady() that calls the method produceTuples(). This pulls messages of the blocking queue messageQueue and submits them as tuples.
  • eventThread started by KafkaConsumerClient that pulls events off eventQueue and processes them. Events are added by:
    • StateHandler processing
    • Control tuples on the optional input port
    • AbstractKafkaConsumerOperator.shutdown()

Events are used to start the KafkaConsumerClient polling (fetching messages) which are added to the messageQueue. While the KafkaConsumerClient is polling messages are added to messageQueue regardless of what the operator is doing, specifically messages are being read from Kafka:

  • before all ports ready
  • while any operator consistent region state processing is occurring (rest, checkpoint, drain etc.)

Assumptions:

  • Event thread is used to have a single thread interact with the KafkaConsumer since they are not thread safe.
  • Process thread and messageQueue is to disconnect tuple (message) processing from KafkaConsumer to avoid it timing out and its subscriptions being revoked.

https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

messageQueue is bounded to 100*max_poll_size, thus if each poll can return 500 messages (the default) then messageQueue can contain 50,000 messages (instances of ConsumerRecord).

Operator driven consistent region

When operator driven the region is made consistent after N tuples (set by operator parameter triggerCount). The current offset for a topic is updated after the submit of the tuple for a message using the offset in the ConsumerRecord.

The checkpoint corresponds to the offset for the last tuple submitted for each subscribed topic. Note that drain() does nothing, since the saved offset is the last tuple submitted. Specifically messageQueue may contain outstanding messages that are after the offset to be saved, these are not drained and will be submitted (up to N) during the next cut.

The checkpoint saves the offsets of the last tuple submitted for each topic (using a serialized OffsetManager).

Upon a reset the offsets are reset from the checkpoint and messageQueue cleared.

Periodic region

Currently broken - issue #53.

Initialization

The operator kicks off the "process thread" in all ports ready but if the relaunch count > 0 it creates a reset semaphore that processTuples waits for. The semaphore is cleared once reset processing is complete.

  • I don't believe this is required, the permit acquisition will wait for the reset (need to verify).

ProduceTuples

Potential issues:

  • Line 468 - Permit is acquired in the try block that releases it, this is not standard practice.