Skip to content

Commit aadb1da

Browse files
committed
feat: add kafka healthcheck
1 parent 9cfe2ea commit aadb1da

File tree

4 files changed

+130
-0
lines changed

4 files changed

+130
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import django
2+
3+
if django.VERSION < (3, 2):
4+
default_app_config = "health_check.contrib.kafka.apps.HealthCheckConfig"

health_check/contrib/kafka/apps.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from django.apps import AppConfig
2+
3+
from health_check.plugins import plugin_dir
4+
5+
6+
class HealthCheckConfig(AppConfig):
7+
name = "health_check.contrib.kafka"
8+
9+
def ready(self):
10+
from .backends import KafkaHealthCheck
11+
12+
plugin_dir.register(KafkaHealthCheck)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import logging
2+
import importlib
3+
4+
from django.conf import settings
5+
6+
from health_check.backends import BaseHealthCheckBackend
7+
from health_check.exceptions import ServiceUnavailable
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
try:
13+
kafka_module = importlib.import_module("kafka")
14+
except ImportError:
15+
kafka_module = None
16+
17+
if not kafka_module:
18+
raise ImportError(
19+
"No kafka-python or kafka-python-ng library found. Please install one of them."
20+
)
21+
22+
KafkaAdminClient = getattr(kafka_module, "KafkaAdminClient", None)
23+
KafkaError = getattr(importlib.import_module("kafka.errors"), "KafkaError", None)
24+
25+
if not KafkaAdminClient or not KafkaError:
26+
raise ImportError(
27+
"KafkaAdminClient or KafkaError not available. Please check your installations."
28+
)
29+
30+
31+
class KafkaHealthCheck(BaseHealthCheckBackend):
32+
"""Health check for Kafka."""
33+
34+
namespace = None
35+
36+
def check_status(self):
37+
"""Check Kafka service by opening and closing a broker channel."""
38+
logger.debug("Checking for a KAFKA_URL on django settings...")
39+
40+
bootstrap_servers = getattr(settings, "KAFKA_URL", None)
41+
42+
logger.debug(
43+
"Got %s as the kafka_url. Connecting to kafka...", bootstrap_servers
44+
)
45+
46+
logger.debug("Attempting to connect to kafka...")
47+
try:
48+
admin_client = KafkaAdminClient(
49+
bootstrap_servers=bootstrap_servers,
50+
request_timeout_ms=3000, # 3 secondes max
51+
api_version_auto_timeout_ms=1000,
52+
)
53+
# Ping léger : on liste les topics (lecture metadata)
54+
admin_client.list_topics()
55+
except KafkaError as e:
56+
self.add_error(ServiceUnavailable("Unknown error"), e)
57+
else:
58+
logger.debug("Connection established. Kafka is healthy.")

tests/test_kafka.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from unittest import mock
2+
3+
from kafka.errors import BrokerResponseError
4+
5+
from health_check.contrib.kafka.backends import KafkaHealthCheck
6+
7+
8+
class TestKafkaHealthCheck:
9+
"""Test Kafka health check."""
10+
11+
@mock.patch("health_check.contrib.kafka.backends.getattr")
12+
@mock.patch("health_check.contrib.kafka.backends.Connection")
13+
def test_broker_refused_connection(self, mocked_connection, mocked_getattr):
14+
"""Test when the connection to Kafka is refused."""
15+
mocked_getattr.return_value = "kafka_url"
16+
17+
conn_exception = ConnectionRefusedError("Refused connection")
18+
19+
# mock returns
20+
mocked_conn = mock.MagicMock()
21+
mocked_connection.return_value.__enter__.return_value = mocked_conn
22+
mocked_conn.connect.side_effect = conn_exception
23+
24+
# instantiates the class
25+
kafka_healthchecker = KafkaHealthCheck()
26+
27+
# invokes the method check_status()
28+
kafka_healthchecker.check_status()
29+
assert len(kafka_healthchecker.errors), 1
30+
31+
# mock assertions
32+
mocked_connection.assert_called_once_with("kafka_url")
33+
34+
@mock.patch("health_check.contrib.kafka.backends.getattr")
35+
@mock.patch("health_check.contrib.kafka.backends.Connection")
36+
def test_broker_auth_error(self, mocked_connection, mocked_getattr):
37+
"""Test that the connection to Kafka has an authentication error."""
38+
mocked_getattr.return_value = "kafka_url"
39+
40+
conn_exception = BrokerResponseError("Refused connection")
41+
42+
# mock returns
43+
mocked_conn = mock.MagicMock()
44+
mocked_connection.return_value.__enter__.return_value = mocked_conn
45+
mocked_conn.connect.side_effect = conn_exception
46+
47+
# instantiates the class
48+
rabbitmq_healthchecker = KafkaHealthCheck()
49+
50+
# invokes the method check_status()
51+
rabbitmq_healthchecker.check_status()
52+
assert len(rabbitmq_healthchecker.errors), 1
53+
54+
# mock assertions
55+
mocked_connection.assert_called_once_with("kafka_url")
56+

0 commit comments

Comments
 (0)