Skip to content

Commit

Permalink
feat: 进程操作request_body压缩 (closed #345)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Aug 6, 2024
1 parent 25f3572 commit 968a513
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 38 deletions.
63 changes: 44 additions & 19 deletions apps/gsekit/pipeline_plugins/components/collections/gse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"""
import json
import logging
from typing import Dict
from typing import Any, Dict, List

from django.db.models import F
from django.utils.translation import ugettext as _
Expand All @@ -27,6 +27,8 @@
from apps.gsekit.process.models import Process, ProcessInst
from apps.utils.mako_utils.render import mako_render
from dataclasses import dataclass

from env.constants import GseVersion
from .base import CommonData
from apps.adapters.api.gse import get_gse_api_helper
from apps.adapters.api.gse.base import GseApiBaseHelper
Expand Down Expand Up @@ -269,10 +271,24 @@ def _execute(self, data, parent_data, common_data):
op_type = data.get_one_of_inputs("op_type")
data.outputs.proc_op_status_map = {}

proc_operate_req = []
proc_operate_req: Dict[str, Dict[str, Any]] = {}
no_agent_id_job_task_ids: List[int] = []
for job_task in job_tasks:

host_info = job_task.extra_data["process_info"]["host"]
if all([common_data.gse_api_helper.version == GseVersion.V2.value, not host_info.get("bk_agent_id", "")]):
# 对于GseV2来说必须使用agentid进行操作,如果没有agentid可能会导致任务整个handling
no_agent_id_job_task_ids.append(job_task.id)
error_msg = _("该主机无bk_agent_id无法进行相关操作, 请检查主机Agent是否正常")
job_task.set_status(
JobStatus.FAILED,
extra_data={
"failed_reason": self.generate_proc_op_error_msg(GseDataErrorCode.OP_FAILED, error_msg),
"err_code": GseDataErrorCode.OP_FAILED,
},
)
continue

process_info = job_task.extra_data["process_info"]["process"]
set_info = job_task.extra_data["process_info"]["set"]
module_info = job_task.extra_data["process_info"]["module"]
Expand All @@ -298,24 +314,24 @@ def _execute(self, data, parent_data, common_data):
}
self.is_op_cmd_configured(op_type, process_info, raise_exception=True)

proc_operate_req.append(
{
local_inst_name: str = f"{process_info['bk_process_name']}_{local_inst_id}"

host_identity: Dict[str, Any] = {
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}

if local_inst_name in proc_operate_req:
proc_operate_req[local_inst_name]["hosts"].append(host_identity)
else:
proc_operate_req[local_inst_name] = {
"meta": {
"namespace": NAMESPACE.format(bk_biz_id=process_info["bk_biz_id"]),
"name": f"{process_info['bk_process_name']}_{local_inst_id}",
"labels": {
"bk_process_name": process_info["bk_process_name"],
"bk_process_id": process_info["bk_process_id"],
},
"name": local_inst_name,
},
"op_type": op_type,
"hosts": [
{
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}
],
"hosts": [host_identity],
"spec": {
"identity": {
"index_key": "",
Expand All @@ -339,24 +355,34 @@ def _execute(self, data, parent_data, common_data):
},
},
}
)

# pipeline-engine会把data转为json,不能用int作为key
data.outputs.proc_op_status_map[str(job_task.id)] = GseDataErrorCode.RUNNING
task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req)

if not proc_operate_req.values():
self.finish_schedule()
return True

task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=list(proc_operate_req.values()))

data.outputs.task_id = task_id
data.outputs.no_agent_id_job_task_ids = no_agent_id_job_task_ids
return self.return_data(result=True)

def _schedule(self, data, parent_data, common_data, callback_data=None):
job_tasks = data.get_one_of_inputs("job_tasks")
op_type = data.get_one_of_inputs("op_type")
task_id = data.get_one_of_outputs("task_id")
no_agent_id_job_task_ids = data.get_one_of_outputs("no_agent_id_job_task_ids", [])

gse_api_result = common_data.gse_api_helper.get_proc_operate_result(task_id)
if gse_api_result["code"] == GSE_RUNNING_TASK_CODE:
# 查询的任务等待执行中,还未入到redis,继续下一次查询
return self.return_data(result=True)

for job_task in job_tasks:
if job_task.id in no_agent_id_job_task_ids:
continue
local_inst_id = job_task.extra_data["local_inst_id"]
task_result = self.get_job_task_gse_result(gse_api_result, job_task, common_data)
error_code = task_result.get("error_code")
Expand All @@ -366,7 +392,6 @@ def _schedule(self, data, parent_data, common_data, callback_data=None):
continue

data.outputs.proc_op_status_map[str(job_task.id)] = error_code

if error_code == GseDataErrorCode.SUCCESS:
process_inst = ProcessInst.objects.get(
bk_process_id=job_task.bk_process_id, local_inst_id=local_inst_id
Expand Down
6 changes: 6 additions & 0 deletions apps/gsekit/process/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ class ProcessNotMatchException(ProcessBaseException):
ERROR_CODE = "006"
MESSAGE = _("查询进程不匹配")
MESSAGE_TPL = _("查询进程不匹配: {user_bk_process_id} vs {cc_bk_process_id}")


class ProcessNoAgentIDException(ProcessBaseException):
ERROR_CODE = "007"
MESSAGE = _("找不到带有agent_id的进程进行状态同步,请联系管理员")
MESSAGE_TPL = _("找不到带有agent_id的进程进行状态同步,请联系管理员")
47 changes: 28 additions & 19 deletions apps/gsekit/process/handlers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import defaultdict
from functools import reduce
from itertools import groupby
from typing import List, Dict, Union
from typing import Any, List, Dict, Union

from django.db import transaction
from django.db.models import Q, QuerySet
Expand Down Expand Up @@ -44,6 +44,7 @@
from common.log import logger
from apps.adapters.api.gse import get_gse_api_helper
from apps.core.gray.tools import GrayTools
from env.constants import GseVersion


class ProcessHandler(APIModel):
Expand Down Expand Up @@ -950,8 +951,8 @@ def sync_proc_status_to_db(self, proc_status_infos=None, gse_api_helper: GseApiB
def get_proc_inst_status_infos(
proc_inst_infos, _request=None, gse_api_helper: GseApiBaseHelper = None
) -> List[Dict]:
proc_operate_req_slice = []
meta_key_uniq_key_map = {}
proc_operate_req: Dict[str, Dict[str, Any]] = {}
for proc_inst_info in proc_inst_infos:
host_info = proc_inst_info["host_info"]
process_info = proc_inst_info["process_info"]
Expand Down Expand Up @@ -986,26 +987,34 @@ def get_proc_inst_status_infos(
meta_key: str = gse_api_helper.get_gse_proc_key(
host_info, namespace=namespace, proc_name=f"{process_info['bk_process_name']}_{local_inst_id}"
)
bk_agent_id: str = host_info.get("bk_agent_id", "")
if gse_api_helper.version == GseVersion.V2.value and not bk_agent_id:
# 对于V2来说必须使用agentid进行查询
logger.info(
f"get_proc_inst_status failed-> namespace: {namespace}, meta_key:{meta_key}, uniq_key: {uniq_key}"
)
continue

meta_key_uniq_key_map[meta_key] = uniq_key
proc_operate_req_slice.append(
{

local_inst_name: str = f"{process_info['bk_process_name']}_{local_inst_id}"

host_identity: Dict[str, Any] = {
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}

if local_inst_name in proc_operate_req:
proc_operate_req[local_inst_name]["hosts"].append(host_identity)
else:
proc_operate_req[local_inst_name] = {
"meta": {
"namespace": namespace,
"name": f"{process_info['bk_process_name']}_{local_inst_id}",
"labels": {
"bk_process_name": process_info["bk_process_name"],
"bk_process_id": process_info["bk_process_id"],
},
"name": local_inst_name,
},
"op_type": GseOpType.CHECK,
"hosts": [
{
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}
],
"hosts": [host_identity],
"spec": {
"identity": {
"index_key": "",
Expand All @@ -1029,9 +1038,9 @@ def get_proc_inst_status_infos(
},
},
}
)

gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req_slice)
if not proc_operate_req.values():
raise exceptions.ProcessNoAgentIDException()
gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=list(proc_operate_req.values()))

proc_inst_status_infos = []
uniq_keys_recorded = set()
Expand Down

0 comments on commit 968a513

Please sign in to comment.