Skip to content

Commit

Permalink
feat: 智能监控策略通过开关切换到SDK来执行检测逻辑 --story=120679992 (#4105)
Browse files Browse the repository at this point in the history
  • Loading branch information
dengyh authored Nov 29, 2024
1 parent af9540e commit a570356
Show file tree
Hide file tree
Showing 22 changed files with 599 additions and 96 deletions.
10 changes: 10 additions & 0 deletions bkmonitor/alarm_backends/core/cache/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,16 @@ def register_key_with_config(config):
}
)

SERVICE_LOCK_PREPARATION = register_key_with_config(
{
"label": "preparation.lock.strategy_{strategy_id}",
"key_type": "string",
"key_tpl": "preparation.lock.{strategy_id}",
"ttl": CONST_MINUTES,
"backend": "service",
}
)

ACCESS_END_TIME_KEY = register_key_with_config(
{
"label": "[access]数据拉取的结束时间",
Expand Down
2 changes: 1 addition & 1 deletion bkmonitor/alarm_backends/core/cache/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def handle_strategy(cls, strategy: Dict, invalid_strategy_dict=None) -> bool:
return False

# 智能异常检测算法,结果表是存在intelligent_detect中,需要用这个配置
if query_config.get("intelligent_detect"):
if query_config.get("intelligent_detect") and not query_config["intelligent_detect"].get("use_sdk", False):
raw_query_config = query_config.copy()
raw_query_config.pop("intelligent_detect")
query_config["raw_query_config"] = raw_query_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


class Command(BaseCommand):
_SERVICE_TYPE_ = "" # access/detect/trigger/event/action/recovery .etc
_SERVICE_TYPE_ = "" # access/detect/trigger/event/action/recovery/preparation .etc
_HANDLER_TYPE_ = "" # process/celery

def add_arguments(self, parser):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ def check_no_data(self, alert: Alert, latest_item, latest_strategy):
window_unit = Strategy.get_check_window_unit(latest_item, self.DEFAULT_CHECK_WINDOW_UNIT)

# TODO:智能异常检测,目前只支持单指标
if query_configs[0].get("intelligent_detect", {}):
if query_configs[0].get("intelligent_detect", {}) and not query_configs[0]["intelligent_detect"].get(
"use_sdk", False
):
# 智能异常检测在计算平台会经过几层dataflow,会有一定的周期延时,所以这里需要再加上这个延时窗口
trigger_window_size = trigger_window_size + settings.BK_DATA_INTELLIGENT_DETECT_DELAY_WINDOW

Expand Down
56 changes: 2 additions & 54 deletions bkmonitor/alarm_backends/service/detect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,58 +9,6 @@
specific language governing permissions and limitations under the License.
"""

from alarm_backends.service.detect.core import AnomalyDataPoint, DataPoint

import arrow
import six


class DataPoint(object):
"""
access 拉取的数据,在detect模块的一层封装
"""

# 定义DataPoint必须拥有的属性
context_field = ["value", "timestamp", "unit", "item"]

def __init__(self, accessed_data, item):
self.item = item
self._raw_input = accessed_data
for k, v in six.iteritems(accessed_data):
setattr(self, k, v)

def as_dict(self):
return self._raw_input

# data_point attribute
@property
def unit(self):
# 多指标不进行任何单位处理
if len(self.item.data_sources) > 1:
return ""
return self.item.unit

@property
def timestamp(self):
# alias for "time"
return self.time

def __str__(self):
return "{record_id}:{value}".format(record_id=self.record_id, value=self.value)

def __repr__(self):
return str(self.as_dict())


class AnomalyDataPoint(object):
"""
被detector处理后的DataPoint,如果是异常,则会变成AnomalyDataPoint。
"""

def __init__(self, data_point, detector):
self.data_point = data_point
self.detector = detector
self.anomaly_message = ""
self.anomaly_time = arrow.utcnow().format("YYYY-MM-DD HH:mm:ss")
self.strategy_snapshot_key = ""
self.child_detector = []
self.context = {}
__all__ = ["AnomalyDataPoint", "DataPoint"]
67 changes: 67 additions & 0 deletions bkmonitor/alarm_backends/service/detect/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""


import arrow
import six


class DataPoint(object):
"""
access 拉取的数据,在detect模块的一层封装
"""

# 定义DataPoint必须拥有的属性
context_field = ["value", "timestamp", "unit", "item"]

def __init__(self, accessed_data, item):
self.item = item
self._raw_input = accessed_data
for k, v in six.iteritems(accessed_data):
if not k.startswith("__"):
setattr(self, k, v)

def as_dict(self):
return self._raw_input

# data_point attribute
@property
def unit(self):
# 多指标不进行任何单位处理
if len(self.item.data_sources) > 1:
return ""
return self.item.unit

@property
def timestamp(self):
# alias for "time"
return self.time

def __str__(self):
return "{record_id}:{value}".format(record_id=self.record_id, value=self.value)

def __repr__(self):
return str(self.as_dict())


class AnomalyDataPoint(object):
"""
被detector处理后的DataPoint,如果是异常,则会变成AnomalyDataPoint。
"""

def __init__(self, data_point, detector):
self.data_point = data_point
self.detector = detector
self.anomaly_message = ""
self.anomaly_time = arrow.utcnow().format("YYYY-MM-DD HH:mm:ss")
self.strategy_snapshot_key = ""
self.child_detector = []
self.context = {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
from django.conf import settings
from django.utils.translation import ugettext as _

from alarm_backends.service.detect.strategy import ExprDetectAlgorithms, RangeRatioAlgorithmsCollection
from alarm_backends.service.detect import DataPoint
from alarm_backends.service.detect.strategy import (
ExprDetectAlgorithms,
RangeRatioAlgorithmsCollection,
)
from constants.aiops import SDKDetectStatus
from core.drf_resource import api

logger = logging.getLogger("detect")

Expand All @@ -34,6 +40,47 @@ class IntelligentDetect(RangeRatioAlgorithmsCollection):
智能异常检测(动态阈值算法)
"""

def detect(self, data_point):
if data_point.item.query_configs[0]["intelligent_detect"].get("use_sdk", False):
# 历史依赖准备就绪才开始检测
if data_point.item.query_configs[0]["intelligent_detect"]["status"] == SDKDetectStatus.PREPARING:
raise Exception("Strategy history dependency data not ready")

return self.detect_by_sdk(data_point)
else:
return super().detect(data_point)

def detect_by_sdk(self, data_point):
dimensions = copy.deepcopy(data_point.dimensions)
dimensions["strategy_id"] = data_point.item.strategy.id
predict_params = {
"data": [{"value": data_point.value, "timestamp": data_point.timestamp * 1000}],
"dimensions": dimensions,
"interval": data_point.item.query_configs[0]["agg_interval"],
"predict_args": {
"alert_up": self.validated_config["args"].get("$alert_up"),
"alert_down": self.validated_config["args"].get("$alert_down"),
"sensitivity": self.validated_config["args"].get("$sensitivity"),
},
"extra_data": {
"history_anomaly": [],
},
}

predict_result = api.aiops_sdk.kpi_predict(**predict_params)

return super().detect(
DataPoint(
accessed_data={
"record_id": data_point.record_id,
"value": data_point.value,
"values": predict_result[0],
"time": predict_result[0]["timestamp"],
},
item=data_point.item,
)
)

def gen_expr(self):
expr = "is_anomaly > 0"
yield ExprDetectAlgorithms(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
TimeSeriesForecasting:时序预测算法,基于计算平台的预测结果进行静态阈值检测
"""
import copy
import json
import logging
import operator
Expand All @@ -24,6 +25,8 @@
from alarm_backends.templatetags.unit import unit_convert_min, unit_suffix
from bkmonitor.strategy.serializers import TimeSeriesForecastingSerializer
from bkmonitor.utils.time_tools import hms_string
from constants.aiops import SDKDetectStatus
from core.drf_resource import api
from core.unit import load_unit

logger = logging.getLogger("detect")
Expand Down Expand Up @@ -55,6 +58,44 @@ class TimeSeriesForecasting(BasicAlgorithmsCollection):
desc_tpl = "{method_desc} {threshold}{unit_suffix}"

def detect(self, data_point):
if data_point.item.query_configs[0]["intelligent_detect"].get("use_sdk", False):
# 历史依赖准备就绪才开始检测
if data_point.item.query_configs[0]["intelligent_detect"]["status"] == SDKDetectStatus.PREPARING:
raise Exception("Strategy history dependency data not ready")

return self.detect_by_sdk(data_point)
else:
return self.detect_by_bkdata(data_point)

def detect_by_sdk(self, data_point):
dimensions = copy.deepcopy(data_point.dimensions)
dimensions["strategy_id"] = data_point.item.strategy.id
predict_params = {
"data": [{"value": data_point.value, "timestamp": data_point.timestamp * 1000}],
"dimensions": dimensions,
"predict_args": {
"granularity": "T",
"range_level": self.validated_config["args"].get("$range_level"),
"forecast_mode": self.validated_config["args"].get("$forecast_mode"),
"mode": "serving",
},
}

predict_result = api.aiops_sdk.tf_predict(**predict_params)

return self.detect_by_bkdata(
DataPoint(
accessed_data={
"record_id": data_point.record_id,
"value": data_point.value,
"values": predict_result[0],
"time": predict_result[0]["timestamp"],
},
item=data_point.item,
)
)

def detect_by_bkdata(self, data_point):
bound_type = self.validated_config.get("bound_type", TimeSeriesForecastingSerializer.BoundType.MIDDLE)

if bound_type == TimeSeriesForecastingSerializer.BoundType.UPPER:
Expand Down
10 changes: 10 additions & 0 deletions bkmonitor/alarm_backends/service/preparation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
10 changes: 10 additions & 0 deletions bkmonitor/alarm_backends/service/preparation/aiops/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
Loading

0 comments on commit a570356

Please sign in to comment.