Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #552 from Parsely/feature/param_validation
Browse files Browse the repository at this point in the history
perform some permissive validation on common parameters
  • Loading branch information
emmettbutler committed May 19, 2016
2 parents ef0836f + 7af1f0a commit 71497ac
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 43 deletions.
28 changes: 16 additions & 12 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from .handlers import GEventHandler
from .simpleconsumer import SimpleConsumer
from .utils.compat import range, get_bytes, itervalues, iteritems, get_string
from .utils.error_handlers import valid_int
try:
from . import rdkafka
except ImportError:
Expand Down Expand Up @@ -194,20 +195,23 @@ def __init__(self,
self._topic = topic

self._auto_commit_enable = auto_commit_enable
self._auto_commit_interval_ms = auto_commit_interval_ms
self._fetch_message_max_bytes = fetch_message_max_bytes
self._fetch_min_bytes = fetch_min_bytes
self._rebalance_max_retries = rebalance_max_retries
self._num_consumer_fetchers = num_consumer_fetchers
self._queued_max_messages = queued_max_messages
self._fetch_wait_max_ms = fetch_wait_max_ms
self._rebalance_backoff_ms = rebalance_backoff_ms
self._consumer_timeout_ms = consumer_timeout_ms
self._offsets_channel_backoff_ms = offsets_channel_backoff_ms
self._offsets_commit_max_retries = offsets_commit_max_retries
self._auto_commit_interval_ms = valid_int(auto_commit_interval_ms)
self._fetch_message_max_bytes = valid_int(fetch_message_max_bytes)
self._fetch_min_bytes = valid_int(fetch_min_bytes)
self._rebalance_max_retries = valid_int(rebalance_max_retries, allow_zero=True)
self._num_consumer_fetchers = valid_int(num_consumer_fetchers)
self._queued_max_messages = valid_int(queued_max_messages)
self._fetch_wait_max_ms = valid_int(fetch_wait_max_ms, allow_zero=True)
self._rebalance_backoff_ms = valid_int(rebalance_backoff_ms)
self._consumer_timeout_ms = valid_int(consumer_timeout_ms,
allow_zero=True, allow_negative=True)
self._offsets_channel_backoff_ms = valid_int(offsets_channel_backoff_ms)
self._offsets_commit_max_retries = valid_int(offsets_commit_max_retries,
allow_zero=True)
self._auto_offset_reset = auto_offset_reset
self._zookeeper_connect = zookeeper_connect
self._zookeeper_connection_timeout_ms = zookeeper_connection_timeout_ms
self._zookeeper_connection_timeout_ms = valid_int(zookeeper_connection_timeout_ms,
allow_zero=True)
self._reset_offset_on_start = reset_offset_on_start
self._post_rebalance_callback = post_rebalance_callback
self._generation_id = -1
Expand Down
27 changes: 15 additions & 12 deletions pykafka/managedbalancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress)
from .protocol import MemberAssignment
from .utils.compat import iterkeys
from .utils.error_handlers import valid_int

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -164,22 +165,24 @@ def __init__(self,
self._topic = topic

self._auto_commit_enable = auto_commit_enable
self._auto_commit_interval_ms = auto_commit_interval_ms
self._fetch_message_max_bytes = fetch_message_max_bytes
self._fetch_min_bytes = fetch_min_bytes
self._num_consumer_fetchers = num_consumer_fetchers
self._queued_max_messages = queued_max_messages
self._fetch_wait_max_ms = fetch_wait_max_ms
self._consumer_timeout_ms = consumer_timeout_ms
self._offsets_channel_backoff_ms = offsets_channel_backoff_ms
self._offsets_commit_max_retries = offsets_commit_max_retries
self._auto_commit_interval_ms = valid_int(auto_commit_interval_ms)
self._fetch_message_max_bytes = valid_int(fetch_message_max_bytes)
self._fetch_min_bytes = valid_int(fetch_min_bytes)
self._num_consumer_fetchers = valid_int(num_consumer_fetchers)
self._queued_max_messages = valid_int(queued_max_messages)
self._fetch_wait_max_ms = valid_int(fetch_wait_max_ms, allow_zero=True)
self._consumer_timeout_ms = valid_int(consumer_timeout_ms,
allow_zero=True, allow_negative=True)
self._offsets_channel_backoff_ms = valid_int(offsets_channel_backoff_ms)
self._offsets_commit_max_retries = valid_int(offsets_commit_max_retries,
allow_zero=True)
self._auto_offset_reset = auto_offset_reset
self._reset_offset_on_start = reset_offset_on_start
self._rebalance_max_retries = rebalance_max_retries
self._rebalance_backoff_ms = rebalance_backoff_ms
self._rebalance_max_retries = valid_int(rebalance_max_retries, allow_zero=True)
self._rebalance_backoff_ms = valid_int(rebalance_backoff_ms)
self._post_rebalance_callback = post_rebalance_callback
self._is_compacted_topic = compacted_topic
self._heartbeat_interval_ms = heartbeat_interval_ms
self._heartbeat_interval_ms = valid_int(heartbeat_interval_ms)
if use_rdkafka is True:
raise ImportError("use_rdkafka is not available for {}".format(
self.__class__.__name__))
Expand Down
19 changes: 11 additions & 8 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .partitioners import random_partitioner
from .protocol import Message, ProduceRequest
from .utils.compat import iteritems, itervalues, Empty
from .utils.error_handlers import valid_int

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -159,15 +160,17 @@ def __init__(self,
platform.python_implementation == "PyPy":
log.warning("Caution: python-snappy segfaults when attempting to compress "
"large messages under PyPy")
self._max_retries = max_retries
self._retry_backoff_ms = retry_backoff_ms
self._required_acks = required_acks
self._ack_timeout_ms = ack_timeout_ms
self._max_queued_messages = max_queued_messages
self._min_queued_messages = max(1, min_queued_messages if not sync else 1)
self._linger_ms = linger_ms
self._max_retries = valid_int(max_retries, allow_zero=True)
self._retry_backoff_ms = valid_int(retry_backoff_ms)
self._required_acks = valid_int(required_acks, allow_zero=True,
allow_negative=True)
self._ack_timeout_ms = valid_int(ack_timeout_ms, allow_zero=True)
self._max_queued_messages = valid_int(max_queued_messages, allow_zero=True)
self._min_queued_messages = max(1, valid_int(min_queued_messages)
if not sync else 1)
self._linger_ms = valid_int(linger_ms, allow_zero=True)
self._block_on_queue_full = block_on_queue_full
self._max_request_size = max_request_size
self._max_request_size = valid_int(max_request_size)
self._synchronous = sync
self._worker_exception = None
self._worker_trace_logged = False
Expand Down
26 changes: 15 additions & 11 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from .protocol import (PartitionFetchRequest, PartitionOffsetCommitRequest,
PartitionOffsetFetchRequest, PartitionOffsetRequest)
from .utils.error_handlers import (handle_partition_responses, raise_error,
build_parts_by_error)
build_parts_by_error, valid_int)


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -145,35 +145,40 @@ def __init__(self,
requests
:type consumer_id: bytes
"""
self._running = False
self._cluster = cluster
if not (isinstance(consumer_group, bytes) or consumer_group is None):
raise TypeError("consumer_group must be a bytes object")
self._consumer_group = consumer_group
self._topic = topic
self._fetch_message_max_bytes = fetch_message_max_bytes
self._fetch_min_bytes = fetch_min_bytes
self._queued_max_messages = queued_max_messages
self._num_consumer_fetchers = num_consumer_fetchers
self._fetch_wait_max_ms = fetch_wait_max_ms
self._consumer_timeout_ms = consumer_timeout_ms
self._offsets_channel_backoff_ms = offsets_channel_backoff_ms
self._fetch_message_max_bytes = valid_int(fetch_message_max_bytes)
self._fetch_min_bytes = valid_int(fetch_min_bytes)
self._queued_max_messages = valid_int(queued_max_messages)
self._num_consumer_fetchers = valid_int(num_consumer_fetchers)
self._fetch_wait_max_ms = valid_int(fetch_wait_max_ms, allow_zero=True)
self._consumer_timeout_ms = valid_int(consumer_timeout_ms,
allow_zero=True, allow_negative=True)
self._offsets_channel_backoff_ms = valid_int(offsets_channel_backoff_ms)
self._auto_offset_reset = auto_offset_reset
offsets_commit_max_retries = valid_int(offsets_commit_max_retries,
allow_zero=True)
self._offsets_commit_max_retries = offsets_commit_max_retries
# not directly configurable
self._offsets_fetch_max_retries = offsets_commit_max_retries
self._offsets_reset_max_retries = offsets_commit_max_retries
self._auto_start = auto_start
self._reset_offset_on_start = reset_offset_on_start
self._is_compacted_topic = compacted_topic
self._generation_id = generation_id
self._generation_id = valid_int(generation_id, allow_zero=True,
allow_negative=True)
self._consumer_id = consumer_id

# incremented for any message arrival from any partition
# the initial value is 0 (no messages waiting)
self._messages_arrived = self._cluster.handler.Semaphore(value=0)

self._auto_commit_enable = auto_commit_enable
self._auto_commit_interval_ms = auto_commit_interval_ms
self._auto_commit_interval_ms = valid_int(auto_commit_interval_ms)
self._last_auto_commit = time.time()
self._worker_exception = None
self._worker_trace_logged = False
Expand All @@ -200,7 +205,6 @@ def __init__(self,

self._default_error_handlers = self._build_default_error_handlers()

self._running = False
if self._auto_start:
self.start()

Expand Down
15 changes: 15 additions & 0 deletions pykafka/utils/error_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,18 @@ def build_parts_by_error(response, partitions_by_id):
def raise_error(error, info=""):
"""Raise the given error"""
raise error(info)


def valid_int(param, allow_zero=False, allow_negative=False):
"""Validate that param is an integer, raise an exception if not"""
pt = param
try: # a very permissive integer typecheck
pt += 1
except TypeError:
raise TypeError(
"Expected integer but found argument of type '{}'".format(type(param)))
if not allow_negative and param < 0:
raise ValueError("Expected nonnegative number but got '{}'".format(param))
if not allow_zero and param == 0:
raise ValueError("Expected nonzero number but got '{}'".format(param))
return param

0 comments on commit 71497ac

Please sign in to comment.