Skip to content

Commit

Permalink
Fix #388: Add check for data platform pipeline latency (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenWu authored Mar 24, 2020
1 parent e605f2b commit 9a10227
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 0 deletions.
40 changes: 40 additions & 0 deletions checks/data_platform/live_main_ping_age.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Age, in seconds, of the latest live main ping measured as the time since the submission_timestamp
of the ping. This is meant to be used as an approximation of the time it takes for pings to
travel from the edge server to the live tables.
Return failure if more than `max_error_rate` of the last `value_count` values
are above `max_threshold`
"""
from poucave.typings import CheckResult
from poucave.utils import fetch_redash


REDASH_QUERY_ID = 69148

EXPOSED_PARAMETERS = ["max_threshold", "value_count", "max_over_rate"]


async def run(
api_key: str,
max_threshold: float = 700,
value_count: int = 4,
max_over_rate: float = 0.5,
) -> CheckResult:
rows = await fetch_redash(REDASH_QUERY_ID, api_key)

latest_rows = sorted(rows, key=lambda row: row["current_timestamp"])[-value_count:]

over_count = len(
list(filter(lambda row: row["seconds_since_last"] > max_threshold, latest_rows))
)
over_rate = over_count / value_count

data_output = {
"results": {
row["current_timestamp"]: row["seconds_since_last"] for row in latest_rows
},
"over_count": over_count,
}

return over_rate <= max_over_rate, data_output
46 changes: 46 additions & 0 deletions checks/data_platform/pipeline_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
Measured time it takes for pings to move from edge server to live tables.
This is the sum of latencies of the relevant pubsub topics/subscriptions and dataflow jobs.
Return failure if more than `max_error_rate` of the last `value_count` values
are above `max_threshold`
"""
from collections import defaultdict

from poucave.typings import CheckResult
from poucave.utils import fetch_redash


REDASH_QUERY_ID = 69304

EXPOSED_PARAMETERS = ["max_threshold", "value_count", "max_over_rate"]


async def run(
api_key: str,
max_threshold: float = 2000,
value_count: int = 6,
max_over_rate: float = 0.5,
) -> CheckResult:
rows = await fetch_redash(REDASH_QUERY_ID, api_key)

latest_timestamps = sorted({row["timestamp"] for row in rows}, reverse=True)[
:value_count
]
latest_rows = [row for row in rows if row["timestamp"] in latest_timestamps]

latency_sums = defaultdict(float)
for row in latest_rows:
latency_sums[row["timestamp"]] += row["value"]

over_count = len(
list(filter(lambda latency: latency > max_threshold, latency_sums.values()))
)
over_rate = over_count / value_count

data_output = {
"results": latency_sums,
"over_count": over_count,
}

return over_rate <= max_over_rate, data_output
79 changes: 79 additions & 0 deletions tests/checks/data_platform/test_live_main_ping_age.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from checks.data_platform.live_main_ping_age import run
from tests.utils import patch_async


MODULE = "checks.data_platform.live_main_ping_age"

INPUT_ROWS = [
{
"current_timestamp": "2020-03-19T22:00:00",
"latest_timestamp": "2020-03-19T21:57:00",
"seconds_since_last": 180,
},
{
"current_timestamp": "2020-03-19T21:50:00",
"latest_timestamp": "2020-03-19T21:44:00",
"seconds_since_last": 360,
},
{
"current_timestamp": "2020-03-19T21:40:00",
"latest_timestamp": "2020-03-19T21:38:00",
"seconds_since_last": 120,
},
{
"current_timestamp": "2020-03-19T21:30:00",
"latest_timestamp": "2020-03-19T21:29:00",
"seconds_since_last": 60,
},
{
"current_timestamp": "2020-03-19T21:20:00",
"latest_timestamp": "2020-03-19T21:18:00",
"seconds_since_last": 120,
},
]


async def test_success():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=120, value_count=4, max_over_rate=0.5
)

assert success
assert data["results"] == {
"2020-03-19T22:00:00": 180,
"2020-03-19T21:50:00": 360,
"2020-03-19T21:40:00": 120,
"2020-03-19T21:30:00": 60,
}
assert data["over_count"] == 2


async def test_max_threshold():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=100, value_count=4, max_over_rate=0.5
)

assert success is False
assert len(data["results"]) == 4


async def test_value_count():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=120, value_count=2, max_over_rate=0.5
)

assert success is False
assert len(data["results"]) == 2


async def test_max_over_rate():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=120, value_count=2, max_over_rate=0.499
)

assert success is False
assert len(data["results"]) == 2
106 changes: 106 additions & 0 deletions tests/checks/data_platform/test_pipeline_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from checks.data_platform.pipeline_latency import run
from tests.utils import patch_async


MODULE = "checks.data_platform.pipeline_latency"

INPUT_ROWS = [
{
"timestamp": "2020-03-19T22:00:00",
"value": 20,
"component": "decoder_sub_unacked",
},
{
"timestamp": "2020-03-19T22:00:00",
"value": 20,
"component": "decoder_watermark_age",
},
{
"timestamp": "2020-03-19T21:00:00",
"value": 50,
"component": "decoder_sub_unacked",
},
{
"timestamp": "2020-03-19T21:00:00",
"value": 10,
"component": "decoder_watermark_age",
},
{
"timestamp": "2020-03-19T20:00:00",
"value": 10,
"component": "decoder_sub_unacked",
},
{
"timestamp": "2020-03-19T20:00:00",
"value": 10,
"component": "decoder_watermark_age",
},
{
"timestamp": "2020-03-19T19:00:00",
"value": 10,
"component": "decoder_sub_unacked",
},
{
"timestamp": "2020-03-19T19:00:00",
"value": 20,
"component": "decoder_watermark_age",
},
{
"timestamp": "2020-03-19T18:00:00",
"value": 10,
"component": "decoder_sub_unacked",
},
{
"timestamp": "2020-03-19T18:00:00",
"value": 0,
"component": "decoder_watermark_age",
},
]


async def test_success():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=35, value_count=4, max_over_rate=0.5
)

assert success
assert data == {
"results": {
"2020-03-19T22:00:00": 40,
"2020-03-19T21:00:00": 60,
"2020-03-19T20:00:00": 20,
"2020-03-19T19:00:00": 30,
},
"over_count": 2,
}


async def test_max_threshold():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=20, value_count=4, max_over_rate=0.5
)

assert success is False
assert len(data["results"]) == 4


async def test_value_count():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=35, value_count=2, max_over_rate=0.5
)

assert success is False
assert len(data["results"]) == 2


async def test_max_over_rate():
with patch_async(f"{MODULE}.fetch_redash", return_value=INPUT_ROWS):
success, data = await run(
api_key="", max_threshold=35, value_count=2, max_over_rate=0.3
)

assert success is False
assert len(data["results"]) == 2

0 comments on commit 9a10227

Please sign in to comment.