From b00d9dd89764babd86d8ea7fcfaebac3b0fae917 Mon Sep 17 00:00:00 2001 From: yunchao Date: Mon, 24 Jun 2024 13:07:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20IP=E9=80=89=E6=8B=A9=E5=99=A8=E5=B7=AE?= =?UTF-8?q?=E9=87=8F=E5=90=8C=E6=AD=A5=E4=B8=BB=E6=9C=BA(closed=20#2294)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/core/ipchooser/handlers/host_handler.py | 161 +++++++++++++++++-- apps/core/ipchooser/tests/test_handlers.py | 11 ++ apps/node_man/tests/utils.py | 11 +- 3 files changed, 169 insertions(+), 14 deletions(-) diff --git a/apps/core/ipchooser/handlers/host_handler.py b/apps/core/ipchooser/handlers/host_handler.py index bca3da809..00c7b5e98 100644 --- a/apps/core/ipchooser/handlers/host_handler.py +++ b/apps/core/ipchooser/handlers/host_handler.py @@ -9,11 +9,18 @@ specific language governing permissions and limitations under the License. """ import typing +from collections import defaultdict from django_mysql.models import QuerySet +from apps.core.concurrent import controller from apps.core.ipchooser.tools.host_tool import HostTool +from apps.node_man.constants import QUERY_CMDB_LIMIT from apps.node_man.models import GlobalSettings +from apps.node_man.periodic_tasks.sync_cmdb_host import bulk_differential_sync_biz_hosts +from apps.utils import batch_request, concurrent +from common.api import CCApi +from common.log import logger from .. import constants, types from ..tools import base @@ -21,8 +28,128 @@ class HostHandler: - @staticmethod + @classmethod + def query_host_by_cloud_ip(cls, cloud_inner_ip: typing.List[str]) -> typing.List[int]: + """ + 查询主机host_id用于查询主机的业务信息 + """ + bk_cloud_ids: typing.Set[int] = set() + bk_host_innerip: typing.Set[str] = set() + for cloud_ip in cloud_inner_ip: + bk_cloud_ids.add(int(cloud_ip.split(constants.CommonEnum.SEP.value)[0])) + bk_host_innerip.add(cloud_ip.split(constants.CommonEnum.SEP.value)[1]) + + query_hosts_params: typing.Dict[str, typing.Any] = { + "fields": ["bk_host_id"], + "host_property_filter": { + "condition": "AND", + "rules": [ + {"field": "bk_host_innerip", "operator": "in", "value": list(bk_host_innerip)}, + {"field": "bk_cloud_id", "operator": "in", "value": list(bk_cloud_ids)}, + ], + }, + "no_request": True, + } + + cmdb_host_infos: typing.List[typing.Dict[str, typing.Any]] = batch_request.batch_request( + func=CCApi.list_hosts_without_biz, params=query_hosts_params + ) + + logger.info(f"need_differential_sync_cloud_ip count: {len(cmdb_host_infos)} -> {cmdb_host_infos}") + + return [host["bk_host_id"] for host in cmdb_host_infos] + + @controller.ConcurrentController( + data_list_name="bk_host_ids", + batch_call_func=concurrent.batch_call, + get_config_dict_func=lambda: {"limit": QUERY_CMDB_LIMIT}, + ) + @classmethod + def find_host_biz_relations( + cls, bk_host_ids: typing.List[int] + ) -> typing.List[typing.Optional[typing.Dict[str, typing.Any]]]: + """ + 查询主机业务关系信息 + :param bk_host_ids: 主机列表 + :return: 主机业务关系列表 + """ + # 接口对于空数组的处理是报错,这里需要提前处理返回 + if not bk_host_ids: + return [] + + return CCApi.find_host_biz_relations({"bk_host_id": bk_host_ids}) + + @classmethod + def fetch_need_differential_sync_bk_host_ids( + cls, untreated_host_infos, or_conditions + ) -> typing.List[typing.Optional[int]]: + """ + 获取所有需要差量同步的主机id + """ + exist_hosts: typing.Dict[str, typing.Set] = { + "bk_host_id": set(), + "cloud_inner_ip": set(), + } + for host in untreated_host_infos: + exist_hosts["bk_host_id"].add(host["bk_host_id"]) + exist_hosts["cloud_inner_ip"].add( + f"{host['bk_cloud_id']}{constants.CommonEnum.SEP.value}{host['inner_ip']}" + ) + or_conditions_map: typing.Dict = {item["key"]: item["val"] for item in or_conditions} + + need_differential_sync_bk_host_ids: typing.List[int] = list( + or_conditions_map.get("bk_host_id", set()) - exist_hosts["bk_host_id"] + ) + need_differential_sync_cloud_ip: typing.List[str] = list( + or_conditions_map.get("cloud_inner_ip", set()) - exist_hosts["cloud_inner_ip"] + ) + if need_differential_sync_cloud_ip: + # 如果是IP形式需要查询出主机ID + need_differential_sync_cloud_ip_host_ids = cls.query_host_by_cloud_ip(need_differential_sync_cloud_ip) + # 所有需求差量同步的主机ID + need_differential_sync_bk_host_ids += need_differential_sync_cloud_ip_host_ids + + logger.info( + f"need_differential_sync_bk_host_ids " + f"count:{len(need_differential_sync_bk_host_ids)} -> {need_differential_sync_bk_host_ids}" + ) + + return need_differential_sync_bk_host_ids + + @classmethod + def bulk_differential_sync_hosts(cls, need_differential_sync_bk_host_ids): + """ + 差量同步所有需要同步的主机 + """ + # 查询主机id所属业务 + host_biz_relations: typing.List[typing.Optional[typing.Dict[str, typing.Any]]] = cls.find_host_biz_relations( + bk_host_ids=need_differential_sync_bk_host_ids + ) + + expected_bk_host_ids_gby_bk_biz_id: typing.Dict[str, typing.List[int]] = defaultdict(list) + for host_biz_realtion in host_biz_relations: + expected_bk_host_ids_gby_bk_biz_id[host_biz_realtion["bk_biz_id"]].append(host_biz_realtion["bk_host_id"]) + + bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id) + + @classmethod + def fetch_untreated_host_infos( + cls, limit_host_ids, or_conditions, scope_list + ) -> typing.List[typing.Dict[str, typing.Any]]: + host_queryset: QuerySet = base.HostQuerySqlHelper.multiple_cond_sql( + params={}, biz_scope=[scope["bk_biz_id"] for scope in scope_list], return_all_node_type=True + ) + host_queryset = base.HostQueryHelper.or_query_hosts(host_queryset, or_conditions=or_conditions) + if limit_host_ids is not None: + host_queryset = host_queryset.filter(bk_host_id__in=limit_host_ids) + # 获取主机信息 + host_fields: typing.List[str] = constants.CommonEnum.DEFAULT_HOST_FIELDS.value + untreated_host_infos: typing.List[types.HostInfo] = list(host_queryset.values(*host_fields)) + return untreated_host_infos + + @classmethod def details_base( + cls, scope_list: types.ScopeList, or_conditions: typing.List[types.Condition], limit_host_ids: typing.Optional[typing.List[int]] = None, @@ -36,17 +163,26 @@ def details_base( :param show_agent_realtime_state: 是否展示Agent实时状态 :return: """ - host_queryset: QuerySet = base.HostQuerySqlHelper.multiple_cond_sql( - params={}, biz_scope=[scope["bk_biz_id"] for scope in scope_list], return_all_node_type=True + untreated_host_infos: typing.List[typing.Dict[str, typing.Any]] = cls.fetch_untreated_host_infos( + limit_host_ids, or_conditions, scope_list ) - host_queryset = base.HostQueryHelper.or_query_hosts(host_queryset, or_conditions=or_conditions) - if limit_host_ids is not None: - host_queryset = host_queryset.filter(bk_host_id__in=limit_host_ids) - # 获取主机信息 - host_fields: typing.List[str] = constants.CommonEnum.DEFAULT_HOST_FIELDS.value - untreated_host_infos: typing.List[types.HostInfo] = list(host_queryset.values(*host_fields)) + + need_differential_sync_bk_host_ids: typing.List[ + typing.Optional[int] + ] = cls.fetch_need_differential_sync_bk_host_ids(untreated_host_infos, or_conditions) + + if need_differential_sync_bk_host_ids: + # 差量同步主机 + cls.bulk_differential_sync_hosts(need_differential_sync_bk_host_ids) + + # 查询差量主机信息回填查询结果 + differential_host_infos: typing.List[typing.Dict[str, typing.Any]] = cls.fetch_untreated_host_infos( + need_differential_sync_bk_host_ids, [], scope_list + ) + untreated_host_infos += differential_host_infos bk_biz_ids: typing.List[int] = [scope["bk_biz_id"] for scope in scope_list] + biz_whitelist: typing.List[int] = GlobalSettings.get_config( key=GlobalSettings.KeyEnum.IP_CHOOSER_BIZ_WHITELIST.value, default=[] ) @@ -60,12 +196,11 @@ def details_base( return BaseHandler.format_hosts(untreated_host_infos) if show_agent_realtime_state: - enable = GlobalSettings.get_config( + enable: bool = GlobalSettings.get_config( key=GlobalSettings.KeyEnum.IP_CHOOSER_ENABLE_SHOW_REALTIME_AGENT_STATE.value, default=False ) - if not enable: - return BaseHandler.format_hosts(untreated_host_infos) - HostTool.fill_agent_state_info_to_hosts(host_infos=untreated_host_infos) + if enable: + HostTool.fill_agent_state_info_to_hosts(host_infos=untreated_host_infos) return BaseHandler.format_hosts(untreated_host_infos) diff --git a/apps/core/ipchooser/tests/test_handlers.py b/apps/core/ipchooser/tests/test_handlers.py index 808936476..57bfe1e9c 100644 --- a/apps/core/ipchooser/tests/test_handlers.py +++ b/apps/core/ipchooser/tests/test_handlers.py @@ -60,6 +60,8 @@ def test_check(self): self.assertEqual(res[0]["ipv6"], "0000:0000:0000:0000:0000:ffff:7f00:0002") @patch("apps.core.ipchooser.query.resource.CCApi", MockClient.cc) + @patch("apps.core.ipchooser.handlers.host_handler.CCApi", MockClient.cc) + @patch("apps.node_man.periodic_tasks.sync_cmdb_host.client_v2", MockClient) @patch( "apps.node_man.periodic_tasks.sync_agent_status_task.get_gse_api_helper", get_gse_api_helper(GseVersion.V2.value, GseApiMockClient()), @@ -88,3 +90,12 @@ def test_details(self): ) self.assertEqual(result[0]["alive"], 1) self.assertEqual(result[0]["bk_agent_alive"], 1) + + # 验证主机不存进同步 + result = HostHandler.details( + scope_list=[{"scope_type": "biz", "scope_id": "100001", "bk_biz_id": 100001}], + host_list=[{"host_id": 14110, "meta": {"scope_type": "biz", "scope_id": "100001", "bk_biz_id": 100001}}], + show_agent_realtime_state=False, + ) + self.assertEqual(result[0]["alive"], 0) + self.assertEqual(result[0]["bk_agent_alive"], 0) diff --git a/apps/node_man/tests/utils.py b/apps/node_man/tests/utils.py index 8edfa912d..576f7883d 100644 --- a/apps/node_man/tests/utils.py +++ b/apps/node_man/tests/utils.py @@ -1066,7 +1066,7 @@ def list_biz_hosts_topo(cls, *args, **kwargs): @classmethod def list_biz_hosts(cls, *args, **kwargs): - if args[0]["bk_biz_id"] == 1: + if args[0]["bk_biz_id"] in [1, 100001]: return { "count": 1, "info": [ @@ -1161,6 +1161,15 @@ def get_host_by_topo_node(cls, bk_biz_id, *args, **kwargs): def list_service_template(cls, *args, **kwargs): return {"count": 1, "info": [{"id": 1}, {"id": 2}]} + @classmethod + def find_host_biz_relations(cls, *args, **kwargs): + return [ + { + "bk_biz_id": 100001, + "bk_host_id": 14110, + } + ] + class MockPermission(object): def get_apply_data(self, *args, **kwargs):