Skip to content

Commit 858347c

Browse files
authored
Logger with Admin Client (confluentinc#1758)
* Fixed logger not working when provided as an argument to AdminClient instead of a configuration property * Updated examples/adminapi.py to include usage of the custom logger with AdminClient
1 parent a6d2e1e commit 858347c

File tree

4 files changed

+104
-16
lines changed

4 files changed

+104
-16
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
v2.4.1 is a maintenance release with the following fixes and enhancements:
66

7+
- Added an example to show the usage of the custom logger with `AdminClient`
78
- Removed usage of `strcpy` to enhance security of the client (#1745)
89
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
910
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
1011
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
12+
- Fixed `logger` not working when provided as an argument to `AdminClient` instead of a configuration property
1113

1214
confluent-kafka-python is based on librdkafka v2.4.1, see the
1315
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)

examples/adminapi_logger.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import sys
2+
import logging
3+
4+
from confluent_kafka.admin import AdminClient
5+
6+
if len(sys.argv) != 2:
7+
sys.stderr.write("Usage: %s <broker>\n" % sys.argv[0])
8+
sys.exit(1)
9+
10+
broker = sys.argv[1]
11+
12+
# Custom logger
13+
logger = logging.getLogger('AdminClient')
14+
logger.setLevel(logging.DEBUG)
15+
handler = logging.StreamHandler()
16+
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
17+
logger.addHandler(handler)
18+
19+
# Create Admin client with logger
20+
a = AdminClient({'bootstrap.servers': broker,
21+
'debug': 'all'},
22+
logger=logger)
23+
24+
# Alternatively, pass the logger as a key.
25+
# When passing it as an argument, it overwrites the key.
26+
#
27+
# a = AdminClient({'bootstrap.servers': broker,
28+
# 'debug': 'all',
29+
# 'logger': logger})
30+
31+
# Sample Admin API call
32+
future = a.list_consumer_groups(request_timeout=10)
33+
34+
while not future.done():
35+
# Log messages through custom logger while waiting for the result
36+
a.poll(0.1)
37+
38+
try:
39+
list_consumer_groups_result = future.result()
40+
print("\n\n\n========================= List consumer groups result Start =========================")
41+
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
42+
for valid in list_consumer_groups_result.valid:
43+
print(" id: {} is_simple: {} state: {}".format(
44+
valid.group_id, valid.is_simple_consumer_group, valid.state))
45+
print("{} errors".format(len(list_consumer_groups_result.errors)))
46+
for error in list_consumer_groups_result.errors:
47+
print(" error: {}".format(error))
48+
print("========================= List consumer groups result End =========================\n\n\n")
49+
50+
except Exception:
51+
raise
52+
53+
# Log final log messages
54+
a.poll(0)

src/confluent_kafka/admin/__init__.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -109,17 +109,18 @@ class AdminClient (_AdminClientImpl):
109109
Requires broker version v0.11.0.0 or later.
110110
"""
111111

112-
def __init__(self, conf):
112+
def __init__(self, conf, **kwargs):
113113
"""
114114
Create a new AdminClient using the provided configuration dictionary.
115115
116116
The AdminClient is a standard Kafka protocol client, supporting
117117
the standard librdkafka configuration properties as specified at
118-
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
118+
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
119119
120-
At least 'bootstrap.servers' should be configured.
120+
:param dict conf: Configuration properties. At a minimum ``bootstrap.servers`` **should** be set\n"
121+
:param Logger logger: Optional Logger instance to use as a custom log messages handler.
121122
"""
122-
super(AdminClient, self).__init__(conf)
123+
super(AdminClient, self).__init__(conf, **kwargs)
123124

124125
@staticmethod
125126
def _make_topics_result(f, futmap):

tests/test_log.py

+43-12
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from io import StringIO
44
import confluent_kafka
55
import confluent_kafka.avro
6+
import confluent_kafka.admin
67
import logging
78

89

@@ -17,6 +18,16 @@ def filter(self, record):
1718
print(record)
1819

1920

21+
def _setup_string_buffer_logger(name):
22+
stringBuffer = StringIO()
23+
logger = logging.getLogger(name)
24+
logger.setLevel(logging.DEBUG)
25+
handler = logging.StreamHandler(stringBuffer)
26+
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
27+
logger.addHandler(handler)
28+
return stringBuffer, logger
29+
30+
2031
def test_logging_consumer():
2132
""" Tests that logging works """
2233

@@ -120,12 +131,7 @@ def test_logging_constructor():
120131
def test_producer_logger_logging_in_given_format():
121132
"""Test that asserts that logging is working by matching part of the log message"""
122133

123-
stringBuffer = StringIO()
124-
logger = logging.getLogger('Producer')
125-
logger.setLevel(logging.DEBUG)
126-
handler = logging.StreamHandler(stringBuffer)
127-
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
128-
logger.addHandler(handler)
134+
stringBuffer, logger = _setup_string_buffer_logger('Producer')
129135

130136
p = confluent_kafka.Producer(
131137
{"bootstrap.servers": "test", "logger": logger, "debug": "msg"})
@@ -142,12 +148,7 @@ def test_producer_logger_logging_in_given_format():
142148
def test_consumer_logger_logging_in_given_format():
143149
"""Test that asserts that logging is working by matching part of the log message"""
144150

145-
stringBuffer = StringIO()
146-
logger = logging.getLogger('Consumer')
147-
logger.setLevel(logging.DEBUG)
148-
handler = logging.StreamHandler(stringBuffer)
149-
handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s'))
150-
logger.addHandler(handler)
151+
stringBuffer, logger = _setup_string_buffer_logger('Consumer')
151152

152153
c = confluent_kafka.Consumer(
153154
{"bootstrap.servers": "test", "group.id": "test", "logger": logger, "debug": "msg"})
@@ -158,3 +159,33 @@ def test_consumer_logger_logging_in_given_format():
158159
c.close()
159160

160161
assert "Consumer Logger | INIT" in logMessage
162+
163+
164+
def test_admin_logger_logging_in_given_format_when_provided_in_conf():
165+
"""Test that asserts that logging is working by matching part of the log message"""
166+
167+
stringBuffer, logger = _setup_string_buffer_logger('Admin')
168+
169+
admin_client = confluent_kafka.admin.AdminClient(
170+
{"bootstrap.servers": "test", "logger": logger, "debug": "admin"})
171+
admin_client.poll(0)
172+
173+
logMessage = stringBuffer.getvalue().strip()
174+
stringBuffer.close()
175+
176+
assert "Admin Logger | INIT" in logMessage
177+
178+
179+
def test_admin_logger_logging_in_given_format_when_provided_as_admin_client_argument():
180+
"""Test that asserts that logging is working by matching part of the log message"""
181+
182+
stringBuffer, logger = _setup_string_buffer_logger('Admin')
183+
184+
admin_client = confluent_kafka.admin.AdminClient(
185+
{"bootstrap.servers": "test", "debug": "admin"}, logger=logger)
186+
admin_client.poll(0)
187+
188+
logMessage = stringBuffer.getvalue().strip()
189+
stringBuffer.close()
190+
191+
assert "Admin Logger | INIT" in logMessage

0 commit comments

Comments
 (0)