Skip to content

Commit

Permalink
feat: 进程操作request_body压缩 (closed #345)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Aug 2, 2024
1 parent 0fac58c commit 21b5674
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
16 changes: 6 additions & 10 deletions apps/gsekit/pipeline_plugins/components/collections/gse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
from collections import defaultdict
import json
import logging
from typing import Any, Dict, List
Expand Down Expand Up @@ -272,7 +271,7 @@ def _execute(self, data, parent_data, common_data):
op_type = data.get_one_of_inputs("op_type")
data.outputs.proc_op_status_map = {}

proc_operate_req: Dict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict)
proc_operate_req: Dict[str, Dict[str, Any]] = {}
no_agent_id_job_task_ids: List[int] = []
for job_task in job_tasks:

Expand Down Expand Up @@ -323,10 +322,10 @@ def _execute(self, data, parent_data, common_data):
"bk_agent_id": host_info.get("bk_agent_id", ""),
}

if op_type in proc_operate_req and local_inst_name in proc_operate_req[op_type]:
proc_operate_req[op_type][local_inst_name]["hosts"].append(host_identity)
if local_inst_name in proc_operate_req:
proc_operate_req[local_inst_name]["hosts"].append(host_identity)
else:
proc_operate_req[op_type][local_inst_name] = {
proc_operate_req[local_inst_name] = {
"meta": {
"namespace": NAMESPACE.format(bk_biz_id=process_info["bk_biz_id"]),
"name": local_inst_name,
Expand Down Expand Up @@ -360,14 +359,11 @@ def _execute(self, data, parent_data, common_data):
# pipeline-engine会把data转为json,不能用int作为key
data.outputs.proc_op_status_map[str(job_task.id)] = GseDataErrorCode.RUNNING

proc_operate_req_list = []
for local_inst_name_req in proc_operate_req.values():
proc_operate_req_list.extend(local_inst_name_req.values())
if not proc_operate_req_list:
if not proc_operate_req.values():
self.finish_schedule()
return True

task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=proc_operate_req_list)
task_id = common_data.gse_api_helper.operate_proc_multi(proc_operate_req=list(proc_operate_req.values()))

data.outputs.task_id = task_id
data.outputs.no_agent_id_job_task_ids = no_agent_id_job_task_ids
Expand Down
6 changes: 6 additions & 0 deletions apps/gsekit/process/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ class ProcessNotMatchException(ProcessBaseException):
ERROR_CODE = "006"
MESSAGE = _("查询进程不匹配")
MESSAGE_TPL = _("查询进程不匹配: {user_bk_process_id} vs {cc_bk_process_id}")


class ProcessNoAgentIDException(ProcessBaseException):
ERROR_CODE = "007"
MESSAGE = _("找不到带有agent_id的进程进行状态同步,请联系管理员")
MESSAGE_TPL = _("找不到带有agent_id的进程进行状态同步,请联系管理员")
3 changes: 2 additions & 1 deletion apps/gsekit/process/handlers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,8 @@ def get_proc_inst_status_infos(
},
},
}

if not proc_operate_req.values():
raise exceptions.ProcessNoAgentIDException()
gse_task_id: str = gse_api_helper.operate_proc_multi(proc_operate_req=list(proc_operate_req.values()))

proc_inst_status_infos = []
Expand Down

0 comments on commit 21b5674

Please sign in to comment.