diff --git a/src/Kafka/Consumer/Process.php b/src/Kafka/Consumer/Process.php index d6066c2a..bb5ff167 100644 --- a/src/Kafka/Consumer/Process.php +++ b/src/Kafka/Consumer/Process.php @@ -14,6 +14,8 @@ namespace Kafka\Consumer; +use Kafka\ConsumerConfig; + /** +------------------------------------------------------------------------------ * Kafka protocol since Kafka v0.8 @@ -660,8 +662,31 @@ public function succFetch($result, $fd) // }}} // {{{ protected function commit() + protected function consume_msg() + { + foreach ($this->messages as $topic => $value) { + foreach ($value as $part => $messages) { + foreach ($messages as $message) { + if ($this->consumer != null) { + call_user_func($this->consumer, $topic, $part, $message); + } + } + } + } + + $this->messages = array(); + } + + protected function commit() { + $config= ConsumerConfig::getInstance(); + if($config->getConsumeMode() == ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET) + { + $this->consume_msg(); + } + + $broker = \Kafka\Broker::getInstance(); $groupBrokerId = $broker->getGroupBrokerId(); $connect = $broker->getMetaConnect($groupBrokerId); @@ -704,6 +729,10 @@ protected function commit() // }}} // {{{ public function succCommit() + /** + * @var State + */ + public $state; public function succCommit($result) { $this->debug('Commit success, result:' . json_encode($result)); @@ -716,17 +745,10 @@ public function succCommit($result) } } } - - foreach ($this->messages as $topic => $value) { - foreach ($value as $part => $messages) { - foreach ($messages as $message) { - if ($this->consumer != null) { - call_user_func($this->consumer, $topic, $part, $message); - } - } - } + if(ConsumerConfig::getInstance()->getConsumeMode() == ConsumerConfig::CONSUME_AFTER_COMMIT_OFFSET) + { + $this->consume_msg(); } - $this->messages = array(); } // }}} diff --git a/src/Kafka/ConsumerConfig.php b/src/Kafka/ConsumerConfig.php index 9a8a8cf3..b983f225 100644 --- a/src/Kafka/ConsumerConfig.php +++ b/src/Kafka/ConsumerConfig.php @@ -130,4 +130,20 @@ public function setTopics($topics) // }}} // }}} + + protected $runtime_options = [ + 'consume_mode' => self::CONSUME_AFTER_COMMIT_OFFSET + ]; + const CONSUME_AFTER_COMMIT_OFFSET = 1; + const CONSUME_BEFORE_COMMIT_OFFSET = 2; + + public function setConsumeMode($mode) + { + $this->runtime_options['consume_mode'] = $mode; + } + + public function getConsumeMode() + { + return $this->runtime_options['consume_mode']; + } }