Skip to content

Commit

Permalink
feat: IP选择器差量同步主机(closed TencentBlueKing#2294)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Jun 25, 2024
1 parent 5f06b20 commit b00d9dd
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 14 deletions.
161 changes: 148 additions & 13 deletions apps/core/ipchooser/handlers/host_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,147 @@
specific language governing permissions and limitations under the License.
"""
import typing
from collections import defaultdict

from django_mysql.models import QuerySet

from apps.core.concurrent import controller
from apps.core.ipchooser.tools.host_tool import HostTool
from apps.node_man.constants import QUERY_CMDB_LIMIT
from apps.node_man.models import GlobalSettings
from apps.node_man.periodic_tasks.sync_cmdb_host import bulk_differential_sync_biz_hosts
from apps.utils import batch_request, concurrent
from common.api import CCApi
from common.log import logger

from .. import constants, types
from ..tools import base
from .base import BaseHandler


class HostHandler:
@staticmethod
@classmethod
def query_host_by_cloud_ip(cls, cloud_inner_ip: typing.List[str]) -> typing.List[int]:
"""
查询主机host_id用于查询主机的业务信息
"""
bk_cloud_ids: typing.Set[int] = set()
bk_host_innerip: typing.Set[str] = set()
for cloud_ip in cloud_inner_ip:
bk_cloud_ids.add(int(cloud_ip.split(constants.CommonEnum.SEP.value)[0]))
bk_host_innerip.add(cloud_ip.split(constants.CommonEnum.SEP.value)[1])

query_hosts_params: typing.Dict[str, typing.Any] = {
"fields": ["bk_host_id"],
"host_property_filter": {
"condition": "AND",
"rules": [
{"field": "bk_host_innerip", "operator": "in", "value": list(bk_host_innerip)},
{"field": "bk_cloud_id", "operator": "in", "value": list(bk_cloud_ids)},
],
},
"no_request": True,
}

cmdb_host_infos: typing.List[typing.Dict[str, typing.Any]] = batch_request.batch_request(
func=CCApi.list_hosts_without_biz, params=query_hosts_params
)

logger.info(f"need_differential_sync_cloud_ip count: {len(cmdb_host_infos)} -> {cmdb_host_infos}")

return [host["bk_host_id"] for host in cmdb_host_infos]

@controller.ConcurrentController(
data_list_name="bk_host_ids",
batch_call_func=concurrent.batch_call,
get_config_dict_func=lambda: {"limit": QUERY_CMDB_LIMIT},
)
@classmethod
def find_host_biz_relations(
cls, bk_host_ids: typing.List[int]
) -> typing.List[typing.Optional[typing.Dict[str, typing.Any]]]:
"""
查询主机业务关系信息
:param bk_host_ids: 主机列表
:return: 主机业务关系列表
"""
# 接口对于空数组的处理是报错,这里需要提前处理返回
if not bk_host_ids:
return []

return CCApi.find_host_biz_relations({"bk_host_id": bk_host_ids})

@classmethod
def fetch_need_differential_sync_bk_host_ids(
cls, untreated_host_infos, or_conditions
) -> typing.List[typing.Optional[int]]:
"""
获取所有需要差量同步的主机id
"""
exist_hosts: typing.Dict[str, typing.Set] = {
"bk_host_id": set(),
"cloud_inner_ip": set(),
}
for host in untreated_host_infos:
exist_hosts["bk_host_id"].add(host["bk_host_id"])
exist_hosts["cloud_inner_ip"].add(
f"{host['bk_cloud_id']}{constants.CommonEnum.SEP.value}{host['inner_ip']}"
)
or_conditions_map: typing.Dict = {item["key"]: item["val"] for item in or_conditions}

need_differential_sync_bk_host_ids: typing.List[int] = list(
or_conditions_map.get("bk_host_id", set()) - exist_hosts["bk_host_id"]
)
need_differential_sync_cloud_ip: typing.List[str] = list(
or_conditions_map.get("cloud_inner_ip", set()) - exist_hosts["cloud_inner_ip"]
)
if need_differential_sync_cloud_ip:
# 如果是IP形式需要查询出主机ID
need_differential_sync_cloud_ip_host_ids = cls.query_host_by_cloud_ip(need_differential_sync_cloud_ip)
# 所有需求差量同步的主机ID
need_differential_sync_bk_host_ids += need_differential_sync_cloud_ip_host_ids

logger.info(
f"need_differential_sync_bk_host_ids "
f"count:{len(need_differential_sync_bk_host_ids)} -> {need_differential_sync_bk_host_ids}"
)

return need_differential_sync_bk_host_ids

@classmethod
def bulk_differential_sync_hosts(cls, need_differential_sync_bk_host_ids):
"""
差量同步所有需要同步的主机
"""
# 查询主机id所属业务
host_biz_relations: typing.List[typing.Optional[typing.Dict[str, typing.Any]]] = cls.find_host_biz_relations(
bk_host_ids=need_differential_sync_bk_host_ids
)

expected_bk_host_ids_gby_bk_biz_id: typing.Dict[str, typing.List[int]] = defaultdict(list)
for host_biz_realtion in host_biz_relations:
expected_bk_host_ids_gby_bk_biz_id[host_biz_realtion["bk_biz_id"]].append(host_biz_realtion["bk_host_id"])

bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id)

@classmethod
def fetch_untreated_host_infos(
cls, limit_host_ids, or_conditions, scope_list
) -> typing.List[typing.Dict[str, typing.Any]]:
host_queryset: QuerySet = base.HostQuerySqlHelper.multiple_cond_sql(
params={}, biz_scope=[scope["bk_biz_id"] for scope in scope_list], return_all_node_type=True
)
host_queryset = base.HostQueryHelper.or_query_hosts(host_queryset, or_conditions=or_conditions)
if limit_host_ids is not None:
host_queryset = host_queryset.filter(bk_host_id__in=limit_host_ids)
# 获取主机信息
host_fields: typing.List[str] = constants.CommonEnum.DEFAULT_HOST_FIELDS.value
untreated_host_infos: typing.List[types.HostInfo] = list(host_queryset.values(*host_fields))
return untreated_host_infos

@classmethod
def details_base(
cls,
scope_list: types.ScopeList,
or_conditions: typing.List[types.Condition],
limit_host_ids: typing.Optional[typing.List[int]] = None,
Expand All @@ -36,17 +163,26 @@ def details_base(
:param show_agent_realtime_state: 是否展示Agent实时状态
:return:
"""
host_queryset: QuerySet = base.HostQuerySqlHelper.multiple_cond_sql(
params={}, biz_scope=[scope["bk_biz_id"] for scope in scope_list], return_all_node_type=True
untreated_host_infos: typing.List[typing.Dict[str, typing.Any]] = cls.fetch_untreated_host_infos(
limit_host_ids, or_conditions, scope_list
)
host_queryset = base.HostQueryHelper.or_query_hosts(host_queryset, or_conditions=or_conditions)
if limit_host_ids is not None:
host_queryset = host_queryset.filter(bk_host_id__in=limit_host_ids)
# 获取主机信息
host_fields: typing.List[str] = constants.CommonEnum.DEFAULT_HOST_FIELDS.value
untreated_host_infos: typing.List[types.HostInfo] = list(host_queryset.values(*host_fields))

need_differential_sync_bk_host_ids: typing.List[
typing.Optional[int]
] = cls.fetch_need_differential_sync_bk_host_ids(untreated_host_infos, or_conditions)

if need_differential_sync_bk_host_ids:
# 差量同步主机
cls.bulk_differential_sync_hosts(need_differential_sync_bk_host_ids)

# 查询差量主机信息回填查询结果
differential_host_infos: typing.List[typing.Dict[str, typing.Any]] = cls.fetch_untreated_host_infos(
need_differential_sync_bk_host_ids, [], scope_list
)
untreated_host_infos += differential_host_infos

bk_biz_ids: typing.List[int] = [scope["bk_biz_id"] for scope in scope_list]

biz_whitelist: typing.List[int] = GlobalSettings.get_config(
key=GlobalSettings.KeyEnum.IP_CHOOSER_BIZ_WHITELIST.value, default=[]
)
Expand All @@ -60,12 +196,11 @@ def details_base(
return BaseHandler.format_hosts(untreated_host_infos)

if show_agent_realtime_state:
enable = GlobalSettings.get_config(
enable: bool = GlobalSettings.get_config(
key=GlobalSettings.KeyEnum.IP_CHOOSER_ENABLE_SHOW_REALTIME_AGENT_STATE.value, default=False
)
if not enable:
return BaseHandler.format_hosts(untreated_host_infos)
HostTool.fill_agent_state_info_to_hosts(host_infos=untreated_host_infos)
if enable:
HostTool.fill_agent_state_info_to_hosts(host_infos=untreated_host_infos)

return BaseHandler.format_hosts(untreated_host_infos)

Expand Down
11 changes: 11 additions & 0 deletions apps/core/ipchooser/tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def test_check(self):
self.assertEqual(res[0]["ipv6"], "0000:0000:0000:0000:0000:ffff:7f00:0002")

@patch("apps.core.ipchooser.query.resource.CCApi", MockClient.cc)
@patch("apps.core.ipchooser.handlers.host_handler.CCApi", MockClient.cc)
@patch("apps.node_man.periodic_tasks.sync_cmdb_host.client_v2", MockClient)
@patch(
"apps.node_man.periodic_tasks.sync_agent_status_task.get_gse_api_helper",
get_gse_api_helper(GseVersion.V2.value, GseApiMockClient()),
Expand Down Expand Up @@ -88,3 +90,12 @@ def test_details(self):
)
self.assertEqual(result[0]["alive"], 1)
self.assertEqual(result[0]["bk_agent_alive"], 1)

# 验证主机不存进同步
result = HostHandler.details(
scope_list=[{"scope_type": "biz", "scope_id": "100001", "bk_biz_id": 100001}],
host_list=[{"host_id": 14110, "meta": {"scope_type": "biz", "scope_id": "100001", "bk_biz_id": 100001}}],
show_agent_realtime_state=False,
)
self.assertEqual(result[0]["alive"], 0)
self.assertEqual(result[0]["bk_agent_alive"], 0)
11 changes: 10 additions & 1 deletion apps/node_man/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ def list_biz_hosts_topo(cls, *args, **kwargs):

@classmethod
def list_biz_hosts(cls, *args, **kwargs):
if args[0]["bk_biz_id"] == 1:
if args[0]["bk_biz_id"] in [1, 100001]:
return {
"count": 1,
"info": [
Expand Down Expand Up @@ -1161,6 +1161,15 @@ def get_host_by_topo_node(cls, bk_biz_id, *args, **kwargs):
def list_service_template(cls, *args, **kwargs):
return {"count": 1, "info": [{"id": 1}, {"id": 2}]}

@classmethod
def find_host_biz_relations(cls, *args, **kwargs):
return [
{
"bk_biz_id": 100001,
"bk_host_id": 14110,
}
]


class MockPermission(object):
def get_apply_data(self, *args, **kwargs):
Expand Down

0 comments on commit b00d9dd

Please sign in to comment.