From 1017f7dd6bb9cf59d9a365f1f653830a3fb9242f Mon Sep 17 00:00:00 2001 From: soul11201 Date: Mon, 23 Oct 2017 16:51:59 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=85=88=E6=B6=88=E8=B4=B9=EF=BC=8C?= =?UTF-8?q?=E5=90=8E=E6=8F=90=E4=BA=A4offset=EF=BC=8C=E9=98=B2=E6=AD=A2?= =?UTF-8?q?=E6=9C=AA=E8=83=BD=E5=A4=84=E7=90=86=E5=AE=8C=E6=88=90=E4=B8=AD?= =?UTF-8?q?=EF=BC=8C=E7=A8=8B=E5=BA=8F=E5=B4=A9=E6=BA=83=EF=BC=8C=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E6=95=B0=E6=8D=AE=E6=B6=88=E6=81=AF=E4=B8=A2=E5=A4=B1?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Kafka/Consumer/Process.php | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/Kafka/Consumer/Process.php b/src/Kafka/Consumer/Process.php index d6066c2a..56a32220 100644 --- a/src/Kafka/Consumer/Process.php +++ b/src/Kafka/Consumer/Process.php @@ -662,6 +662,18 @@ public function succFetch($result, $fd) protected function commit() { + foreach ($this->messages as $topic => $value) { + foreach ($value as $part => $messages) { + foreach ($messages as $message) { + if ($this->consumer != null) { + call_user_func($this->consumer, $topic, $part, $message); + } + } + } + } + + $this->messages = array(); + $broker = \Kafka\Broker::getInstance(); $groupBrokerId = $broker->getGroupBrokerId(); $connect = $broker->getMetaConnect($groupBrokerId); @@ -704,6 +716,10 @@ protected function commit() // }}} // {{{ public function succCommit() + /** + * @var State + */ + public $state; public function succCommit($result) { $this->debug('Commit success, result:' . json_encode($result)); @@ -716,17 +732,6 @@ public function succCommit($result) } } } - - foreach ($this->messages as $topic => $value) { - foreach ($value as $part => $messages) { - foreach ($messages as $message) { - if ($this->consumer != null) { - call_user_func($this->consumer, $topic, $part, $message); - } - } - } - } - $this->messages = array(); } // }}} From 43cb74a01f5efe402b55935f5aec64e3510d4416 Mon Sep 17 00:00:00 2001 From: soul11201 Date: Mon, 23 Oct 2017 17:28:12 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E5=8F=AF=E9=80=89=E9=A1=B9=EF=BC=8C=E6=98=AF=E9=80=89=E6=8B=A9?= =?UTF-8?q?=E4=B8=80=E6=AC=A1=E6=80=A7=E6=B6=88=E8=B4=B9=EF=BC=8C=E8=BF=98?= =?UTF-8?q?=E6=98=AF=E5=B9=82=E7=AD=89=E6=80=A7=E6=B6=88=E8=B4=B9=EF=BC=8C?= =?UTF-8?q?=20=E6=98=AF=E5=85=88=E6=8F=90=E4=BA=A4offset=20=E8=BF=98?= =?UTF-8?q?=E6=98=AF=E5=90=8E=E6=8F=90=E4=BA=A4offset.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Kafka/Consumer/Process.php | 21 +++++++++++++++++++-- src/Kafka/ConsumerConfig.php | 16 ++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/Kafka/Consumer/Process.php b/src/Kafka/Consumer/Process.php index 56a32220..bb5ff167 100644 --- a/src/Kafka/Consumer/Process.php +++ b/src/Kafka/Consumer/Process.php @@ -14,6 +14,8 @@ namespace Kafka\Consumer; +use Kafka\ConsumerConfig; + /** +------------------------------------------------------------------------------ * Kafka protocol since Kafka v0.8 @@ -660,7 +662,7 @@ public function succFetch($result, $fd) // }}} // {{{ protected function commit() - protected function commit() + protected function consume_msg() { foreach ($this->messages as $topic => $value) { foreach ($value as $part => $messages) { @@ -673,7 +675,18 @@ protected function commit() } $this->messages = array(); - + } + + + protected function commit() + { + $config= ConsumerConfig::getInstance(); + if($config->getConsumeMode() == ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET) + { + $this->consume_msg(); + } + + $broker = \Kafka\Broker::getInstance(); $groupBrokerId = $broker->getGroupBrokerId(); $connect = $broker->getMetaConnect($groupBrokerId); @@ -732,6 +745,10 @@ public function succCommit($result) } } } + if(ConsumerConfig::getInstance()->getConsumeMode() == ConsumerConfig::CONSUME_AFTER_COMMIT_OFFSET) + { + $this->consume_msg(); + } } // }}} diff --git a/src/Kafka/ConsumerConfig.php b/src/Kafka/ConsumerConfig.php index 9a8a8cf3..b983f225 100644 --- a/src/Kafka/ConsumerConfig.php +++ b/src/Kafka/ConsumerConfig.php @@ -130,4 +130,20 @@ public function setTopics($topics) // }}} // }}} + + protected $runtime_options = [ + 'consume_mode' => self::CONSUME_AFTER_COMMIT_OFFSET + ]; + const CONSUME_AFTER_COMMIT_OFFSET = 1; + const CONSUME_BEFORE_COMMIT_OFFSET = 2; + + public function setConsumeMode($mode) + { + $this->runtime_options['consume_mode'] = $mode; + } + + public function getConsumeMode() + { + return $this->runtime_options['consume_mode']; + } }