Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #712 from prateek1306/issue-704
Browse files Browse the repository at this point in the history
Raise worker exception in RdKafkaSimpleConsumer
  • Loading branch information
emmettbutler authored Aug 24, 2017
2 parents 2a98573 + 7a83a37 commit 878b1ff
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pykafka/rdkafka/simple_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from contextlib import contextmanager
import logging
from pkg_resources import parse_version
import sys
import time

from pykafka.exceptions import RdKafkaStoppedException, ConsumerStoppedException
Expand Down Expand Up @@ -93,6 +94,8 @@ def poll(rdk_handle, stop_event):
rdk_handle.poll(timeout_ms=1000)
except RdKafkaStoppedException:
break
except:
self._worker_exception = sys.exc_info()
log.debug("Exiting RdKafkaSimpleConsumer poller thread cleanly.")

self._stop_poller_thread.clear()
Expand All @@ -117,6 +120,7 @@ def consume(self, block=True):
if msg is not None:
# set offset in OwnedPartition so the autocommit_worker can find it
self._partitions_by_id[msg.partition_id].set_offset(msg.offset)
self._raise_worker_exceptions()
return msg

def _consume(self, timeout_ms):
Expand Down

0 comments on commit 878b1ff

Please sign in to comment.