Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
添加配置可选项,是选择一次性消费,还是幂等性消费,
Browse files Browse the repository at this point in the history
是先提交offset 还是后提交offset.
  • Loading branch information
noname007 committed Oct 23, 2017
1 parent 1017f7d commit 43cb74a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
21 changes: 19 additions & 2 deletions src/Kafka/Consumer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

namespace Kafka\Consumer;

use Kafka\ConsumerConfig;

/**
+------------------------------------------------------------------------------
* Kafka protocol since Kafka v0.8
Expand Down Expand Up @@ -660,7 +662,7 @@ public function succFetch($result, $fd)
// }}}
// {{{ protected function commit()

protected function commit()
protected function consume_msg()
{
foreach ($this->messages as $topic => $value) {
foreach ($value as $part => $messages) {
Expand All @@ -673,7 +675,18 @@ protected function commit()
}

$this->messages = array();

}


protected function commit()
{
$config= ConsumerConfig::getInstance();
if($config->getConsumeMode() == ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET)
{
$this->consume_msg();
}


$broker = \Kafka\Broker::getInstance();
$groupBrokerId = $broker->getGroupBrokerId();
$connect = $broker->getMetaConnect($groupBrokerId);
Expand Down Expand Up @@ -732,6 +745,10 @@ public function succCommit($result)
}
}
}
if(ConsumerConfig::getInstance()->getConsumeMode() == ConsumerConfig::CONSUME_AFTER_COMMIT_OFFSET)
{
$this->consume_msg();
}
}

// }}}
Expand Down
16 changes: 16 additions & 0 deletions src/Kafka/ConsumerConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,20 @@ public function setTopics($topics)

// }}}
// }}}

protected $runtime_options = [
'consume_mode' => self::CONSUME_AFTER_COMMIT_OFFSET
];
const CONSUME_AFTER_COMMIT_OFFSET = 1;
const CONSUME_BEFORE_COMMIT_OFFSET = 2;

public function setConsumeMode($mode)
{
$this->runtime_options['consume_mode'] = $mode;
}

public function getConsumeMode()
{
return $this->runtime_options['consume_mode'];
}
}

0 comments on commit 43cb74a

Please sign in to comment.