Skip to content

Commit

Permalink
feat: 进程操作request_body压缩 (closed #345)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Jul 26, 2024
1 parent 3f496cf commit 0fac58c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 37 deletions.
67 changes: 48 additions & 19 deletions apps/gsekit/pipeline_plugins/components/collections/gse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
from collections import defaultdict
import json
import logging
from typing import Dict
from typing import Any, Dict, List

from django.db.models import F
from django.utils.translation import ugettext as _
Expand All @@ -27,6 +28,8 @@
from apps.gsekit.process.models import Process, ProcessInst
from apps.utils.mako_utils.render import mako_render
from dataclasses import dataclass

from env.constants import GseVersion
from .base import CommonData
from apps.adapters.api.gse import get_gse_api_helper
from apps.adapters.api.gse.base import GseApiBaseHelper
Expand Down Expand Up @@ -269,10 +272,24 @@ def _execute(self, data, parent_data, common_data):
op_type = data.get_one_of_inputs("op_type")
data.outputs.proc_op_status_map = {}

proc_operate_req = []
proc_operate_req: Dict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict)
no_agent_id_job_task_ids: List[int] = []
for job_task in job_tasks:

host_info = job_task.extra_data["process_info"]["host"]
if all([common_data.gse_api_helper.version == GseVersion.V2.value, not host_info.get("bk_agent_id", "")]):
# 对于GseV2来说必须使用agentid进行操作,如果没有agentid可能会导致任务整个handling
no_agent_id_job_task_ids.append(job_task.id)
error_msg = _("该主机无bk_agent_id无法进行相关操作, 请检查主机Agent是否正常")
job_task.set_status(
JobStatus.FAILED,
extra_data={
"failed_reason": self.generate_proc_op_error_msg(GseDataErrorCode.OP_FAILED, error_msg),
"err_code": GseDataErrorCode.OP_FAILED,
},
)
continue

process_info = job_task.extra_data["process_info"]["process"]
set_info = job_task.extra_data["process_info"]["set"]
module_info = job_task.extra_data["process_info"]["module"]
Expand All @@ -298,24 +315,24 @@ def _execute(self, data, parent_data, common_data):
}
self.is_op_cmd_configured(op_type, process_info, raise_exception=True)

proc_operate_req.append(
{
local_inst_name: str = f"{process_info['bk_process_name']}_{local_inst_id}"

host_identity: Dict[str, Any] = {
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}

if op_type in proc_operate_req and local_inst_name in proc_operate_req[op_type]:
proc_operate_req[op_type][local_inst_name]["hosts"].append(host_identity)
else:
proc_operate_req[op_type][local_inst_name] = {
"meta": {
"namespace": NAMESPACE.format(bk_biz_id=process_info["bk_biz_id"]),
"name": f"{process_info['bk_process_name']}_{local_inst_id}",
"labels": {
"bk_process_name": process_info["bk_process_name"],
"bk_process_id": process_info["bk_process_id"],
},
"name": local_inst_name,
},
"op_type": op_type,
"hosts": [
{
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}
],
"hosts": [host_identity],
"spec": {
"identity": {
"index_key": "",
Expand All @@ -339,24 +356,37 @@ def _execute(self, data, parent_data, common_data):
},
},
}
)

# pipeline-engine会把data转为json,不能用int作为key
data.outputs.proc_op_status_map[str(job_task.id)] = GseDataErrorCode.RUNNING
task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req)

proc_operate_req_list = []
for local_inst_name_req in proc_operate_req.values():
proc_operate_req_list.extend(local_inst_name_req.values())
if not proc_operate_req_list:
self.finish_schedule()
return True

task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req_list)

data.outputs.task_id = task_id
data.outputs.no_agent_id_job_task_ids = no_agent_id_job_task_ids
return self.return_data(result=True)

def _schedule(self, data, parent_data, common_data, callback_data=None):
job_tasks = data.get_one_of_inputs("job_tasks")
op_type = data.get_one_of_inputs("op_type")
task_id = data.get_one_of_outputs("task_id")
no_agent_id_job_task_ids = data.get_one_of_outputs("no_agent_id_job_task_ids", [])

gse_api_result = common_data.gse_api_helper.get_proc_operate_result(task_id)
if gse_api_result["code"] == GSE_RUNNING_TASK_CODE:
# 查询的任务等待执行中,还未入到redis,继续下一次查询
return self.return_data(result=True)

for job_task in job_tasks:
if job_task.id in no_agent_id_job_task_ids:
continue
local_inst_id = job_task.extra_data["local_inst_id"]
task_result = self.get_job_task_gse_result(gse_api_result, job_task, common_data)
error_code = task_result.get("error_code")
Expand All @@ -366,7 +396,6 @@ def _schedule(self, data, parent_data, common_data, callback_data=None):
continue

data.outputs.proc_op_status_map[str(job_task.id)] = error_code

if error_code == GseDataErrorCode.SUCCESS:
process_inst = ProcessInst.objects.get(
bk_process_id=job_task.bk_process_id, local_inst_id=local_inst_id
Expand Down
44 changes: 26 additions & 18 deletions apps/gsekit/process/handlers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from collections import defaultdict
from functools import reduce
from itertools import groupby
from typing import List, Dict, Union
from typing import Any, List, Dict, Union

from django.db import transaction
from django.db.models import Q, QuerySet
Expand Down Expand Up @@ -44,6 +44,7 @@
from common.log import logger
from apps.adapters.api.gse import get_gse_api_helper
from apps.core.gray.tools import GrayTools
from env.constants import GseVersion


class ProcessHandler(APIModel):
Expand Down Expand Up @@ -950,8 +951,8 @@ def sync_proc_status_to_db(self, proc_status_infos=None, gse_api_helper: GseApiB
def get_proc_inst_status_infos(
proc_inst_infos, _request=None, gse_api_helper: GseApiBaseHelper = None
) -> List[Dict]:
proc_operate_req_slice = []
meta_key_uniq_key_map = {}
proc_operate_req: Dict[str, Dict[str, Any]] = {}
for proc_inst_info in proc_inst_infos:
host_info = proc_inst_info["host_info"]
process_info = proc_inst_info["process_info"]
Expand Down Expand Up @@ -986,26 +987,34 @@ def get_proc_inst_status_infos(
meta_key: str = gse_api_helper.get_gse_proc_key(
host_info, namespace=namespace, proc_name=f"{process_info['bk_process_name']}_{local_inst_id}"
)
bk_agent_id: str = host_info.get("bk_agent_id", "")
if gse_api_helper.version == GseVersion.V2.value and not bk_agent_id:
# 对于V2来说必须使用agentid进行查询
logger.info(
f"get_proc_inst_status failed-> namespace: {namespace}, meta_key:{meta_key}, uniq_key: {uniq_key}"
)
continue

meta_key_uniq_key_map[meta_key] = uniq_key
proc_operate_req_slice.append(
{

local_inst_name: str = f"{process_info['bk_process_name']}_{local_inst_id}"

host_identity: Dict[str, Any] = {
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}

if local_inst_name in proc_operate_req:
proc_operate_req[local_inst_name]["hosts"].append(host_identity)
else:
proc_operate_req[local_inst_name] = {
"meta": {
"namespace": namespace,
"name": f"{process_info['bk_process_name']}_{local_inst_id}",
"labels": {
"bk_process_name": process_info["bk_process_name"],
"bk_process_id": process_info["bk_process_id"],
},
"name": local_inst_name,
},
"op_type": GseOpType.CHECK,
"hosts": [
{
"bk_host_innerip": host_info["bk_host_innerip"],
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_agent_id": host_info.get("bk_agent_id", ""),
}
],
"hosts": [host_identity],
"spec": {
"identity": {
"index_key": "",
Expand All @@ -1029,9 +1038,8 @@ def get_proc_inst_status_infos(
},
},
}
)

gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req_slice)
gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=list(proc_operate_req.values()))

proc_inst_status_infos = []
uniq_keys_recorded = set()
Expand Down

0 comments on commit 0fac58c

Please sign in to comment.