diff --git a/apps/gsekit/pipeline_plugins/components/collections/gse.py b/apps/gsekit/pipeline_plugins/components/collections/gse.py index a726ffd..e780d4f 100644 --- a/apps/gsekit/pipeline_plugins/components/collections/gse.py +++ b/apps/gsekit/pipeline_plugins/components/collections/gse.py @@ -8,7 +8,6 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from collections import defaultdict import json import logging from typing import Any, Dict, List @@ -272,7 +271,7 @@ def _execute(self, data, parent_data, common_data): op_type = data.get_one_of_inputs("op_type") data.outputs.proc_op_status_map = {} - proc_operate_req: Dict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict) + proc_operate_req: Dict[str, Dict[str, Any]] = {} no_agent_id_job_task_ids: List[int] = [] for job_task in job_tasks: @@ -323,10 +322,10 @@ def _execute(self, data, parent_data, common_data): "bk_agent_id": host_info.get("bk_agent_id", ""), } - if op_type in proc_operate_req and local_inst_name in proc_operate_req[op_type]: - proc_operate_req[op_type][local_inst_name]["hosts"].append(host_identity) + if local_inst_name in proc_operate_req: + proc_operate_req[local_inst_name]["hosts"].append(host_identity) else: - proc_operate_req[op_type][local_inst_name] = { + proc_operate_req[local_inst_name] = { "meta": { "namespace": NAMESPACE.format(bk_biz_id=process_info["bk_biz_id"]), "name": local_inst_name, @@ -360,14 +359,11 @@ def _execute(self, data, parent_data, common_data): # pipeline-engine会把data转为json,不能用int作为key data.outputs.proc_op_status_map[str(job_task.id)] = GseDataErrorCode.RUNNING - proc_operate_req_list = [] - for local_inst_name_req in proc_operate_req.values(): - proc_operate_req_list.extend(local_inst_name_req.values()) - if not proc_operate_req_list: + if not proc_operate_req.values(): self.finish_schedule() return True - task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req_list) + task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=list(proc_operate_req.values())) data.outputs.task_id = task_id data.outputs.no_agent_id_job_task_ids = no_agent_id_job_task_ids diff --git a/apps/gsekit/process/exceptions.py b/apps/gsekit/process/exceptions.py index 3bef7dd..08f2354 100644 --- a/apps/gsekit/process/exceptions.py +++ b/apps/gsekit/process/exceptions.py @@ -56,3 +56,9 @@ class ProcessNotMatchException(ProcessBaseException): ERROR_CODE = "006" MESSAGE = _("查询进程不匹配") MESSAGE_TPL = _("查询进程不匹配: {user_bk_process_id} vs {cc_bk_process_id}") + + +class ProcessNoAgentIDException(ProcessBaseException): + ERROR_CODE = "007" + MESSAGE = _("找不到带有agent_id的进程进行状态同步,请联系管理员") + MESSAGE_TPL = _("找不到带有agent_id的进程进行状态同步,请联系管理员") diff --git a/apps/gsekit/process/handlers/process.py b/apps/gsekit/process/handlers/process.py index 8583dc9..514e6cc 100644 --- a/apps/gsekit/process/handlers/process.py +++ b/apps/gsekit/process/handlers/process.py @@ -1038,7 +1038,8 @@ def get_proc_inst_status_infos( }, }, } - + if not proc_operate_req.values(): + raise exceptions.ProcessNoAgentIDException() gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=list(proc_operate_req.values())) proc_inst_status_infos = []