Skip to content

Commit

Permalink
Add probe (#187)
Browse files Browse the repository at this point in the history
* Add probe

* Specify encodings

* rename param

* Remove log statements
  • Loading branch information
keiranjprice101 authored Nov 8, 2023
1 parent 7c4c8f3 commit a1c7d42
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 4 deletions.
8 changes: 8 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
tools/
test/
.*/
*.txt
.coverage
venv*/
build/
*egg-info
19 changes: 15 additions & 4 deletions rundetection/run_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ def process_messages(channel: BlockingChannel, notification_queue: SimpleQueue[J
:param notification_queue: The notification queue
:return: None
"""
logger.info("Listening for messages")
for method_frame, _, body in channel.consume(INGRESS_QUEUE_NAME):
for method_frame, _, body in channel.consume(INGRESS_QUEUE_NAME, inactivity_timeout=5):
try:
process_message(body.decode(), notification_queue)
logger.info("Acking message %s", method_frame.delivery_tag)
channel.basic_ack(method_frame.delivery_tag)
# pylint: disable = broad-exception-caught
except AttributeError:
pass
except Exception as exc:
logger.exception("Problem processing message: %s", body, exc_info=exc)
finally:
logger.info("Acking message %s", method_frame.delivery_tag)
channel.basic_ack(method_frame.delivery_tag)
logger.info("Pausing listener...")
break
# pylint: enable = broad-exception-caught

Expand All @@ -129,6 +130,15 @@ def process_notifications(notification_queue: SimpleQueue[JobRequest]) -> None:
logger.info("Notification queue empty. Continuing...")


def write_readiness_probe_file() -> None:
"""
Write the file with the timestamp for the readinessprobe
:return: None
"""
with open("/tmp/heartbeat", "w", encoding="utf-8") as file:
file.write(time.strftime("%Y-%m-%d %H:%M:%S"))


def start_run_detection() -> None:
"""
Main Coroutine starts the producer and consumer in a loop
Expand All @@ -145,6 +155,7 @@ def start_run_detection() -> None:
while True:
process_messages(consumer_channel, notification_queue)
process_notifications(notification_queue)
write_readiness_probe_file()
time.sleep(0.1)

# pylint: disable = broad-except
Expand Down
47 changes: 47 additions & 0 deletions test/test_run_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Tests for run detection module
"""
import logging
import os
import re
import time
import unittest
from pathlib import Path
from queue import SimpleQueue
Expand All @@ -18,6 +21,7 @@
verify_archive_access,
get_channel,
producer,
write_readiness_probe_file,
)


Expand Down Expand Up @@ -139,6 +143,25 @@ def test_process_messages_raises_still_acks(mock_process):
channel.basic_ack.assert_called_once_with(method_frame.delivery_tag)


@patch("rundetection.run_detection.process_message")
def test_process_messages_does_not_ack_attribute_error(_):
"""
Test messages are not acked after AttributeError in processing. As this should only occur when no message is
consumed.
:param mock_process: Mock Process messages function
:return: None
"""
channel = MagicMock()
channel.consume.return_value = [(None, None, None)]

notification_queue = Mock()

process_messages(channel, notification_queue)

channel.consume.assert_called_once()
channel.basic_ack.assert_not_called()


@patch("rundetection.run_detection.producer")
def test_process_notifications(mock_producer):
"""
Expand Down Expand Up @@ -269,5 +292,29 @@ def test_producer(mock_get_channel):
mock_channel.connection.close.assert_called_once()


def test_write_readiness_probe_file():
"""
Test the write_readiness_probe
:return: None
"""
# Call the function to create the file
write_readiness_probe_file()

assert os.path.exists("/tmp/heartbeat"), "Heartbeat file does not exist."

with open("/tmp/heartbeat", "r", encoding="utf-8") as file:
content = file.read()

# Check if the content matches the timestamp format
timestamp_format = re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}")
assert timestamp_format.match(content), f"Unexpected content format: {content}"

# Assert a reasonable time difference
timestamp = time.strptime(content, "%Y-%m-%d %H:%M:%S")
current_time = time.localtime()
time_difference = time.mktime(current_time) - time.mktime(timestamp)
assert abs(time_difference) < 5, f"The timestamp difference is too large: {time_difference} seconds."


if __name__ == "__main__":
unittest.main()

0 comments on commit a1c7d42

Please sign in to comment.