Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
Merge pull request #133 from noname007/master
Browse files Browse the repository at this point in the history
信息消费模式改变
  • Loading branch information
nmred committed Oct 23, 2017
2 parents 16b8a7b + 43cb74a commit d5b395a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
42 changes: 32 additions & 10 deletions src/Kafka/Consumer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

namespace Kafka\Consumer;

use Kafka\ConsumerConfig;

/**
+------------------------------------------------------------------------------
* Kafka protocol since Kafka v0.8
Expand Down Expand Up @@ -660,8 +662,31 @@ public function succFetch($result, $fd)
// }}}
// {{{ protected function commit()

protected function consume_msg()
{
foreach ($this->messages as $topic => $value) {
foreach ($value as $part => $messages) {
foreach ($messages as $message) {
if ($this->consumer != null) {
call_user_func($this->consumer, $topic, $part, $message);
}
}
}
}

$this->messages = array();
}


protected function commit()
{
$config= ConsumerConfig::getInstance();
if($config->getConsumeMode() == ConsumerConfig::CONSUME_BEFORE_COMMIT_OFFSET)
{
$this->consume_msg();
}


$broker = \Kafka\Broker::getInstance();
$groupBrokerId = $broker->getGroupBrokerId();
$connect = $broker->getMetaConnect($groupBrokerId);
Expand Down Expand Up @@ -704,6 +729,10 @@ protected function commit()
// }}}
// {{{ public function succCommit()

/**
* @var State
*/
public $state;
public function succCommit($result)
{
$this->debug('Commit success, result:' . json_encode($result));
Expand All @@ -716,17 +745,10 @@ public function succCommit($result)
}
}
}

foreach ($this->messages as $topic => $value) {
foreach ($value as $part => $messages) {
foreach ($messages as $message) {
if ($this->consumer != null) {
call_user_func($this->consumer, $topic, $part, $message);
}
}
}
if(ConsumerConfig::getInstance()->getConsumeMode() == ConsumerConfig::CONSUME_AFTER_COMMIT_OFFSET)
{
$this->consume_msg();
}
$this->messages = array();
}

// }}}
Expand Down
16 changes: 16 additions & 0 deletions src/Kafka/ConsumerConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,20 @@ public function setTopics($topics)

// }}}
// }}}

protected $runtime_options = [
'consume_mode' => self::CONSUME_AFTER_COMMIT_OFFSET
];
const CONSUME_AFTER_COMMIT_OFFSET = 1;
const CONSUME_BEFORE_COMMIT_OFFSET = 2;

public function setConsumeMode($mode)
{
$this->runtime_options['consume_mode'] = $mode;
}

public function getConsumeMode()
{
return $this->runtime_options['consume_mode'];
}
}

0 comments on commit d5b395a

Please sign in to comment.