Skip to content

Commit

Permalink
feat: Metadata: ES支持Object别名,更改为新表实现方案,无需声明配置字段,QueryDataLink接口修复tran…
Browse files Browse the repository at this point in the history
…sfer集群ID为空问题 --story=121013310 (#4150)
  • Loading branch information
EASYGOING45 authored Dec 2, 2024
1 parent aee66b9 commit 12df6f2
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 94 deletions.
29 changes: 29 additions & 0 deletions bkmonitor/metadata/migrations/0199_esfieldqueryaliasoption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 3.2.15 on 2024-12-02 06:26

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
('metadata', '0198_esstorage_archive_index_days'),
]

operations = [
migrations.CreateModel(
name='ESFieldQueryAliasOption',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('creator', models.CharField(max_length=64, verbose_name='创建者')),
('create_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
('updater', models.CharField(max_length=64, verbose_name='更新者')),
('update_time', models.DateTimeField(auto_now=True, verbose_name='更新时间')),
('table_id', models.CharField(max_length=128, verbose_name='结果表名')),
('field_path', models.CharField(max_length=256, verbose_name='原始字段路径')),
('query_alias', models.CharField(max_length=256, verbose_name='查询别名')),
('is_deleted', models.BooleanField(default=False, verbose_name='是否已删除')),
],
options={
'abstract': False,
},
),
]
2 changes: 2 additions & 0 deletions bkmonitor/metadata/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from .record_rule import RecordRule, ResultTableFlow
from .result_table import (
CMDBLevelRecord,
ESFieldQueryAliasOption,
ResultTable,
ResultTableField,
ResultTableFieldOption,
Expand Down Expand Up @@ -105,6 +106,7 @@
"CMDBLevelRecord",
"ResultTableOption",
"ResultTableFieldOption",
"ESFieldQueryAliasOption",
# storage
"ClusterInfo",
"KafkaTopicInfo",
Expand Down
117 changes: 104 additions & 13 deletions bkmonitor/metadata/models/result_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from django.conf import settings
from django.db import models
from django.db import models, transaction
from django.db.models import sql
from django.db.transaction import atomic, on_commit
from django.utils.translation import ugettext as _
Expand All @@ -30,7 +30,7 @@
from metadata.models.constants import BULK_CREATE_BATCH_SIZE
from metadata.utils.basic import getitems

from .common import Label, OptionBase
from .common import BaseModel, Label, OptionBase
from .data_source import DataSource, DataSourceOption, DataSourceResultTable
from .result_table_manage import EnableManager
from .space import SpaceDataSource
Expand Down Expand Up @@ -1435,7 +1435,10 @@ def clean_metric_split(self, cmdb_level, operator):
return True

def to_json(self):
return {
query_alias_settings = None
if self.default_storage == ClusterInfo.TYPE_ES:
query_alias_settings = ESFieldQueryAliasOption.generate_query_alias_settings(self.table_id)
data = {
"table_id": self.table_id,
"table_name_zh": _(self.table_name_zh),
"is_custom_table": self.is_custom_table,
Expand All @@ -1457,6 +1460,10 @@ def to_json(self):
"data_label": self.data_label,
}

if query_alias_settings:
data["query_alias_settings"] = query_alias_settings
return data

def to_json_self_only(self):
"""
仅返回自身相关的信息json格式,其他的内容需要调用方自行追加,目前已知需要用户自定义添加的内容
Expand Down Expand Up @@ -2698,17 +2705,101 @@ def get_field_option_es_format(cls, table_id, field_name):

return es_config


class ESFieldQueryAliasOption(BaseModel):
"""
ES字段关联别名配置
"""

table_id = models.CharField("结果表名", max_length=128)
field_path = models.CharField("原始字段路径", max_length=256)
query_alias = models.CharField("查询别名", max_length=256)
is_deleted = models.BooleanField("是否已删除", default=False)

@classmethod
def get_field_es_read_alias(cls, table_id, field_name):
def generate_query_alias_settings(cls, table_id):
"""
寻找ES字段关联别名,若有对应Option记录,回写至IndexBody.Properties中
生成指定 table_id 的别名配置
:param table_id: 结果表ID
:param field_name: 字段名(原始字段名,非别名)
:return: dict {alias_name:{"type":alias,"path":origin_field_name}}
:return: dict -> {query_alias: {"type": "alias", "path": field_path}}
"""
origin_option = cls.get_field_option(table_id=table_id, field_name=field_name)
es_config = {}
for config_name, config_value in list(origin_option.items()):
if config_name == 'query_alias':
es_config[config_value] = {"type": "alias", "path": field_name}
return es_config
logger.info("Generating alias configuration for table_id=[%s]", table_id)
try:
# 获取未软删除的所有记录
alias_records = cls.objects.filter(table_id=table_id, is_deleted=False)

# 构建结果字典
alias_config = {
record.query_alias: {
"type": "alias",
"path": record.field_path,
}
for record in alias_records
}

logger.info("Alias configuration generated for table_id=[%s]: %s", table_id, alias_config)
return alias_config

except Exception as e:
logger.error("Error generating alias configuration for table_id=[%s]: %s", table_id, str(e), exc_info=True)
raise

@staticmethod
@transaction.atomic
def manage_query_alias_settings(table_id, query_alias_settings, operator):
"""
管理ES字段关联别名配置记录(支持一个field_path对应多个alias)
:param table_id: 结果表ID
:param query_alias_settings: 用户传入的query_alias_settings列表
:param operator: 操作者
"""
logger.info(
"manage_query_alias_settings: try to manage alias settings for table_id->[%s],"
"query_alias_settings->[%s]",
table_id,
query_alias_settings,
)
# 获取当前数据库中的记录(包括软删除记录)
existing_records = ESFieldQueryAliasOption.objects.filter(table_id=table_id)
existing_map = {(record.field_path, record.query_alias): record for record in existing_records}

# 提取用户传入的数据组合,对于一个采集项而言,field_path+query_alias为一个组合
incoming_combinations = {(item["field_name"], item["query_alias"]) for item in query_alias_settings}

try:
# 新增或更新记录
for item in query_alias_settings:
field_path = item["field_name"]
query_alias = item["query_alias"]

if (field_path, query_alias) in existing_map:
# 更新记录
record = existing_map[(field_path, query_alias)]
record.is_deleted = False # 重置软删除标记
record.updater = operator
record.save()
else:
# 新增记录
ESFieldQueryAliasOption.objects.create(
table_id=table_id,
field_path=field_path,
query_alias=query_alias,
creator=operator,
is_deleted=False,
)

# 标记未提供的记录为软删除
for (field_path, query_alias), record in existing_map.items():
if (field_path, query_alias) not in incoming_combinations and not record.is_deleted:
record.is_deleted = True
record.updater = operator
record.save()
except Exception as e: # pylint: disable=broad-except
logger.error(
"manage_query_alias_settings: failed to manage alias settings for table_id->[%s], "
"query_alias_settings->[%s], error->[%s]",
table_id,
query_alias_settings,
e,
)
raise e
39 changes: 18 additions & 21 deletions bkmonitor/metadata/models/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,7 @@ def index_body(self):
ES创建索引的配置内容
:return: dict, 可以直接
"""
from metadata.models import ResultTableField
from metadata.models import ESFieldQueryAliasOption, ResultTableField

logger.info("index_body: try to compose index_body for table_id->[%s]", self.table_id)
body = {"settings": json.loads(self.index_settings), "mappings": json.loads(self.mapping_settings)}
Expand All @@ -1992,12 +1992,6 @@ def index_body(self):
properties[field.field_name] = ResultTableFieldOption.get_field_option_es_format(
table_id=self.table_id, field_name=field.field_name
)
alias_settings = ResultTableFieldOption.get_field_es_read_alias(
table_id=self.table_id, field_name=field.field_name
)

if alias_settings: # 当别名配置不为空时,将别名添加至索引body中
properties.update(alias_settings)
except Exception as e: # pylint: disable=broad-except
logger.error(
"index_body: error occurs for table_id->[%s],field->[%s],error->[%s]",
Expand All @@ -2006,6 +2000,15 @@ def index_body(self):
e,
)

try:
logger.info("index_body: try to add alias_config for table_id->[%s]", self.table_id)
alias_config = ESFieldQueryAliasOption.generate_query_alias_settings(self.table_id)
logger.info("index_body: table_id->[%s] got alias_config->[%s]", self.table_id, alias_config)
if alias_config:
properties.update(alias_config)
except Exception as e:
logger.warning("index_body: add alias_config failed,table_id->[%s],error->[%s]", self.table_id, e)

# 按ES版本返回构建body内容
if self.es_version < self.ES_REMOVE_TYPE_VERSION:
body["mappings"] = {self.table_id: body["mappings"]}
Expand All @@ -2017,21 +2020,15 @@ def compose_field_alias_settings(self):
组装采集项的别名配置
:return: dict {"properties":{"alias":"type":"alias","path":"xxx"}}
"""
properties = {}
from metadata.models import ESFieldQueryAliasOption

logger.info("compose_field_alias_settings: try to compose field alias mapping for->[%s]", self.table_id)
for field in ResultTableField.objects.filter(table_id=self.table_id):
try:
properties.update(
ResultTableFieldOption.get_field_es_read_alias(table_id=self.table_id, field_name=field.field_name)
)
except Exception as e: # pylint: disable=broad-except
logger.error(
"compose_field_alias_settings: unexpected error occurs for table_id->[%s],field_name->[%s],"
"error->[%s]",
self.table_id,
field.field_name,
e,
)
properties = ESFieldQueryAliasOption.generate_query_alias_settings(self.table_id)
logger.info(
"compose_field_alias_settings: compose alias mapping for->[%s] success,alias_settings->[%s]",
self.table_id,
properties,
)
return {"properties": properties}

def put_field_alias_mapping_to_es(self):
Expand Down
4 changes: 2 additions & 2 deletions bkmonitor/metadata/resources/bkdata_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def _get_data_id_details(self, ds):
"数据源来源": ds.created_from,
"消息队列集群ID": ds.mq_cluster_id,
'Consul路径': ds.consul_config_path,
"Transfer集群ID": ds.transfer_cluster_id if ds.created_from == DataIdCreatedFromSystem.BKDATA.value else None,
"链路版本": "V4链路" if ds.created_from == 'bkdata' else 'V3链路',
"Transfer集群ID": ds.transfer_cluster_id,
"链路版本": "V4链路" if ds.created_from == DataIdCreatedFromSystem.BKDATA.value else 'V3链路',
}

def _get_etl_details(self, ds):
Expand Down
61 changes: 61 additions & 0 deletions bkmonitor/metadata/resources/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class FieldSerializer(serializers.Serializer):
is_reserved_check = serializers.BooleanField(required=False, label="是否进行保留字检查", default=True)


class QueryAliasSettingSerializer(serializers.Serializer):
field_name = serializers.CharField(required=True, label="字段名", help_text="需要设置查询别名的字段名")
query_alias = serializers.CharField(required=True, label="查询别名", help_text="字段的查询别名")


class CreateDataIDResource(Resource):
"""创建数据源ID"""

Expand Down Expand Up @@ -143,6 +148,9 @@ class RequestSerializer(serializers.Serializer):
default_storage = serializers.CharField(required=True, label="默认存储方案")
default_storage_config = serializers.DictField(required=False, label="默认存储参数")
field_list = FieldSerializer(many=True, required=False, label="字段列表")
query_alias_settings = QueryAliasSettingSerializer(
many=True, required=False, label="查询别名设置", help_text="字段查询别名的配置"
)
bk_biz_id = serializers.IntegerField(required=False, label="结果表所属ID", default=0)
label = serializers.CharField(required=False, label="结果表标签", default=models.Label.RESULT_TABLE_LABEL_OTHER)
external_storage = serializers.JSONField(required=False, label="额外存储配置", default=None)
Expand All @@ -154,6 +162,32 @@ class RequestSerializer(serializers.Serializer):
data_label = serializers.CharField(required=False, label="数据标签", default="")

def perform_request(self, request_data):
query_alias_settings = request_data.pop("query_alias_settings", [])
table_id = request_data.get("table_id", None)
operator = request_data.get("operator", None)

if query_alias_settings:
try:
logger.info(
"CreateResultTableResource: try to manage alias_settings,table_id->[%s],"
"query_alias_settings->[%s]",
table_id,
query_alias_settings,
)
models.ESFieldQueryAliasOption.manage_query_alias_settings(
table_id=table_id,
query_alias_settings=query_alias_settings,
operator=operator,
)
except Exception as e: # pylint: disable=broad-except
logger.warning(
"CreateResultTableResource: manage alias_settings failed,table_id->[%s],query_alias_settings->["
"%s],error->[%s]",
table_id,
query_alias_settings,
e,
)

new_result_table = models.ResultTable.create_result_table(**request_data)

return {"table_id": new_result_table.table_id}
Expand Down Expand Up @@ -233,6 +267,9 @@ class RequestSerializer(serializers.Serializer):
table_id = serializers.CharField(required=True, label="结果表ID")
operator = serializers.CharField(required=True, label="操作者")
field_list = FieldSerializer(many=True, required=False, label="字段列表", default=None)
query_alias_settings = QueryAliasSettingSerializer(
many=True, required=False, label="查询别名设置", help_text="字段查询别名的配置"
)
table_name_zh = serializers.CharField(required=False, label="结果表中文名")
default_storage = serializers.CharField(required=False, label="默认存储方案")
label = serializers.CharField(required=False, label="结果表标签", default=None)
Expand All @@ -246,6 +283,30 @@ class RequestSerializer(serializers.Serializer):

def perform_request(self, request_data):
table_id = request_data.pop("table_id")
query_alias_settings = request_data.pop("query_alias_settings", [])
operator = request_data.get("operator", None)

if query_alias_settings:
try:
logger.info(
"ModifyResultTableResource: try to manage alias_settings,table_id->[%s],"
"query_alias_settings->[%s]",
table_id,
query_alias_settings,
)
models.ESFieldQueryAliasOption.manage_query_alias_settings(
table_id=table_id,
query_alias_settings=query_alias_settings,
operator=operator,
)
except Exception as e: # pylint: disable=broad-except
logger.warning(
"ModifyResultTableResource: manage alias_settings failed,table_id->[%s],query_alias_settings->["
"%s],error->[%s]",
table_id,
query_alias_settings,
e,
)

result_table = models.ResultTable.objects.get(table_id=table_id)
result_table.modify(**request_data)
Expand Down
Loading

0 comments on commit 12df6f2

Please sign in to comment.