Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] [EventHubs] add logging to async consumer for checkpoints + ownership lost #36447

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 5.12.2 (Unreleased)
## 5.12.2a1 (Unreleased)

### Features Added

### Breaking Changes

### Bugs Fixed

### Other Changes
- Add more debug logging for checkpoint store.

## 5.12.1 (2024-06-11)

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "5.12.2"
VERSION = "5.12.2a1"
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def __init__(self, client: "EventHubConsumerClient", source: str, **kwargs: Any)
self._track_last_enqueued_event_properties = (
track_last_enqueued_event_properties
)
_LOGGER.debug("%r - offset passed to consumer constructor: %r", self._name, self._offset)
self._message_buffer: Deque["uamqp_Message"] = deque()
self._last_received_event: Optional[EventData] = None
self._message_buffer_lock = asyncio.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ async def _receive(
)
raise
except Exception as error: # pylint:disable=broad-except
checkpoints = await self._ownership_manager.get_checkpoints() if self._checkpoint_store else None
_LOGGER.debug(f'{self._consumers[partition_id]._name} - Updated Checkpoints from blob: {checkpoints}')
_LOGGER.debug(
"%r - offset on exception: %r",
self._consumers[partition_id]._name,
self._consumers[partition_id]._offset,
)
_LOGGER.warning(
"EventProcessor instance %r of eventhub %r partition %r consumer group %r. "
"An error occurred while receiving. The exception is %r.",
Expand Down Expand Up @@ -379,6 +386,7 @@ async def start(self) -> None:
if self._checkpoint_store
else None
)
_LOGGER.debug("Get checkpoints to claim ownership: %r", checkpoints)
self._create_tasks_for_claimed_ownership(
newly_claimed_pids, checkpoints
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,15 @@ async def _callback_task(consumer, batch, max_batch_size, max_wait_time):
consumer._next_message_in_buffer() # pylint: disable=protected-access
for _ in range(min(max_batch_size, len(consumer._message_buffer))) # pylint: disable=protected-access
]
_LOGGER.debug(f'{consumer._name} - Length of message buffer: {len(consumer._message_buffer)}')
now_time = time.time()
if len(events) > 0:
await consumer._on_event_received(events if batch else events[0]) # pylint: disable=protected-access
consumer._last_callback_called_time = now_time # pylint: disable=protected-access
else:
if max_wait_time and (now_time - consumer._last_callback_called_time) > max_wait_time: # pylint: disable=protected-access
# no events received, and need to callback
_LOGGER.debug(f'{consumer._name} - No events received')
await consumer._on_event_received([] if batch else None) # pylint: disable=protected-access
consumer._last_callback_called_time = now_time # pylint: disable=protected-access
# backoff a bit to avoid throttling CPU when no events are coming
Expand Down Expand Up @@ -278,6 +280,15 @@ async def _receive_task(consumer):
return
if consumer._last_received_event:
consumer._offset = consumer._last_received_event.offset
_LOGGER.error(f'{consumer._name} - Exception {exception}')
_LOGGER.debug(f'{consumer._name} - last received offset: {consumer._offset}')
fully_qualified_namespace = consumer._client._address.hostname
eventhub_name = consumer._client._address.path
consumer_group = consumer._client._consumer_group
checkpoints = await consumer._client._checkpoint_store.list_checkpoints(
fully_qualified_namespace, eventhub_name, consumer_group
)
_LOGGER.debug(f'{consumer._name} - Updated Checkpoints from blob: {checkpoints}')
last_exception = await consumer._handle_exception(exception)
retried_times += 1
if retried_times > max_retries:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

import logging
import sys
handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
Expand Down Expand Up @@ -45,6 +52,7 @@ async def receive_batch():
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store,
logging_enable=True
)
async with client:
await client.receive_batch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@
BLOB_CONTAINER_NAME = "your-blob-container-name" # Please make sure the blob container resource exists.


import logging
import sys
#logging.basicConfig(level=logging.DEBUG, format='amqpConnection: %(amqpConnection)s, amqpSession: %(amqpSession)s, amqpLink: %(amqpLink)s', stream=sys.stdout)
handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.eventhub')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
# can store offset so that we can check vs. seq num as well
#offset_seqnum = {offset: seqnum}

async def on_event(partition_context, event):
# Put your code here.
print("Received event from partition: {}.".format(partition_context.partition_id))
Expand All @@ -37,7 +47,6 @@ async def receive(client):
"""
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# With specified partition_id, load-balance will be disabled, for example:
# await client.receive(on_event=on_event, partition_id='0'))
Expand All @@ -50,6 +59,7 @@ async def main():
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store, # For load-balancing and checkpoint. Leave None for no load-balancing.
logging_enable=True
)
async with client:
await receive(client)
Expand Down
15 changes: 8 additions & 7 deletions sdk/eventhub/azure-eventhub/samples/sync_samples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ def send_event_data_list(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.

event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)
for _ in range(100):
event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)


producer = EventHubProducerClient.from_connection_string(
Expand Down
Loading