Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
先消费,后提交offset,防止未能处理完成中,程序崩溃,导致数据消息丢失。
Browse files Browse the repository at this point in the history
  • Loading branch information
noname007 committed Oct 23, 2017
1 parent 08d8a31 commit 1017f7d
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions src/Kafka/Consumer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,18 @@ public function succFetch($result, $fd)

protected function commit()
{
foreach ($this->messages as $topic => $value) {
foreach ($value as $part => $messages) {
foreach ($messages as $message) {
if ($this->consumer != null) {
call_user_func($this->consumer, $topic, $part, $message);
}
}
}
}

$this->messages = array();

$broker = \Kafka\Broker::getInstance();
$groupBrokerId = $broker->getGroupBrokerId();
$connect = $broker->getMetaConnect($groupBrokerId);
Expand Down Expand Up @@ -704,6 +716,10 @@ protected function commit()
// }}}
// {{{ public function succCommit()

/**
* @var State
*/
public $state;
public function succCommit($result)
{
$this->debug('Commit success, result:' . json_encode($result));
Expand All @@ -716,17 +732,6 @@ public function succCommit($result)
}
}
}

foreach ($this->messages as $topic => $value) {
foreach ($value as $part => $messages) {
foreach ($messages as $message) {
if ($this->consumer != null) {
call_user_func($this->consumer, $topic, $part, $message);
}
}
}
}
$this->messages = array();
}

// }}}
Expand Down

0 comments on commit 1017f7d

Please sign in to comment.