diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/diagnostics/diagnostics_collector.py b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/diagnostics/diagnostics_collector.py new file mode 100644 index 0000000000000..374f4054b9ed7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/diagnostics/diagnostics_collector.py @@ -0,0 +1,317 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function + +import argparse +import sys, os +import subprocess +from datetime import datetime, timedelta +from urllib import request, error +import xml.etree.ElementTree as ET +import re +import time + +TEMP_DIR = "/tmp" +HADOOP_CONF_DIR = "/etc/hadoop" +YARN_SITE_XML = "yarn-site.xml" +RM_ADDRESS_PROPERTY_NAME = "yarn.resourcemanager.webapp.address" + +RM_LOG_REGEX = r"(?<=\")\/logs.+?RESOURCEMANAGER.+?(?=\")" +NM_LOG_REGEX = r"(?<=\")\/logs.+?NODEMANAGER.+?(?=\")" +INPUT_TIME_FORMAT = '%a %b %d %H:%M:%S %Z %Y' # e.g. Wed May 28 07:35:39 UTC 2025 +OUTPUT_TIME_FORMAT = '%Y-%m-%d %H:%M:%S,%f' # e.g. 2025-05-28 11:57:05,435 +OUTPUT_TIME_FORMAT_WITHOUT_SECOND = '%Y-%m-%d %H:%M' # e.g. 2025-05-28 11:57 +NUMBER_OF_JSTACK = 3 + + +def application_diagnostic(): + """ + Application Logs, Application Info, Application Attempts + Multiple JStack of Hanging Containers and NodeManager + ResourceManager logs during job duration. + NodeManager logs from NodeManager where hanging containers of jobs run during the duration of containers. + """ + + if args.arguments is None or len(args.arguments) == 0: + print("Missing application or job id, exiting...") + sys.exit(os.EX_USAGE) + + app_id = args.arguments[0] + + output_path = create_output_dir(os.path.join(TEMP_DIR, app_id)) + + # Get JStack of the hanging containers + nm_address = get_nodemanager_address(app_id) + app_jstack = create_request("http://{}/ws/v1/node/apps/{}/jstack".format(nm_address, app_id), False) + write_output(output_path, "application_jstack", app_jstack) + + # Get JStack of the hanging NodeManager + nm_jstack = create_request("http://{}/ws/v1/node/jstack".format(nm_address), False) + write_output(output_path, "nm_{}_jstack".format(nm_address), nm_jstack) + + # Get application info + app_info= create_request("http://{}/ws/v1/cluster/apps/{}".format(RM_ADDRESS, app_id)) + write_output(output_path, "application_info", app_info) + + # Get application attempts + app_attempts = create_request("http://{}/ws/v1/cluster/apps/{}/appattempts".format(RM_ADDRESS, app_id)) + write_output(output_path, "application_attempts", app_attempts) + + # Get start_time and end_time of the application + start_time, end_time = get_application_time(app_info) + + # Get RM log + log_address = get_node_log_address(RM_ADDRESS, RM_LOG_REGEX) + write_output(os.path.join(output_path, "node_log"), "resourcemanager_log", + filter_node_log(log_address, start_time, end_time)) + + # Get NodeManager logs in the duration of containers belonging to app_id + if "amHostHttpAddress" in app_info: + app_info = ET.fromstring(app_info) + nm_address = app_info.find("amHostHttpAddress").text + log_address = get_node_log_address(nm_address, NM_LOG_REGEX) + write_output(os.path.join(output_path, "node_log"), "nodemanager_log", get_container_log(log_address, app_id)) + + # Get application log + command = run_cmd_and_save_output(os.path.join(output_path, "app_logs"), app_id, "yarn", "logs", "-applicationId", + app_id) # TODO user permission? + + command.communicate() + return output_path + + +def scheduler_related_issue(): + """ + ResourceManager Scheduler Logs with DEBUG enabled for 2 minutes. + Multiple Jstack of ResourceManager + YARN and Scheduler Configuration + Cluster Scheduler API /ws/v1/cluster/scheduler and Cluster Nodes API /ws/v1/cluster/nodes response + Scheduler Activities /ws/v1/cluster/scheduler/bulk-activities response + """ + output_path = create_output_dir(os.path.join(TEMP_DIR, "scheduler_related_issue" + str(time.time()).split(".")[0])) + + # Multiple JStack of ResourceManager + rm_pids = get_resourcemanager_pid() + jstacks_output = get_multiple_jstack(rm_pids) + write_output(output_path, "jstacks_resourcemanager", jstacks_output) + + # Get Cluster Scheduler Info + scheduler_info = create_request("http://{}/ws/v1/cluster/scheduler".format(RM_ADDRESS)) + write_output(output_path, "scheduler_info", scheduler_info) + + # Get Cluster Nodes Info + nodes_info = create_request("http://{}/ws/v1/cluster/nodes".format(RM_ADDRESS)) + write_output(output_path, "nodemanager_info", nodes_info) + + # Get Scheduler Activities + scheduler_activities = create_request("http://{}/ws/v1/cluster/scheduler/bulk-activities".format(RM_ADDRESS)) + write_output(output_path, "scheduler_activities", scheduler_activities) + + # Get Scheduler Configuration + scheduler_config = create_request("http://{}/ws/v1/cluster/scheduler-conf".format(RM_ADDRESS)) + write_output(output_path, "scheduler_configuration", scheduler_config) + + # Get YARN configuration yarn-site.xml + yarn_conf = run_command("cat", os.path.join(HADOOP_CONF_DIR, YARN_SITE_XML)) + write_output(output_path, "yarn_site", yarn_conf) + + # Get RM Debug log for the last 2 minutes + enable_debug_log = set_rm_scheduler_log_level("DEBUG") + print(enable_debug_log) + log_address = get_node_log_address(RM_ADDRESS, RM_LOG_REGEX) + start_time, end_time = (format_datetime_no_seconds(datetime.now() - timedelta(seconds=120)), + format_datetime_no_seconds(datetime.now())) + rm_debug_log = filter_node_log(log_address, start_time, end_time) + write_output(output_path, "rm_debug_log_2min", rm_debug_log) + enable_info_log = set_rm_scheduler_log_level("INFO") + print(enable_info_log) + + return output_path + +####################################################### Utils Functions ############################################### + + +def list_issues(): + print("application_failed:appId", "application_hanging:appId", "scheduler_related_issue", + "rm_nm_start_failure:nodeId", sep="\n") + + +def parse_url_from_conf(conf_file, url_property_name): + root = ET.parse(os.path.join(HADOOP_CONF_DIR, conf_file)) + for prop in root.findall("property"): + prop_name = prop.find("name").text + if prop_name == url_property_name: + return prop.find("value").text + + return None + + +def create_output_dir(dir_path): + if not os.path.exists(dir_path): + os.makedirs(dir_path) + return dir_path + + +def write_output(output_path, out_filename, value): + output_path = create_output_dir(output_path) + with open(os.path.join(output_path, out_filename), 'w') as f: + f.write(value) + + +def run_command(*argv): + try: + cmd = " ".join(arg for arg in argv) + print("Running command with arguments:", cmd) + response = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, check=True) + response_str = response.stdout.decode('utf-8') + except subprocess.CalledProcessError as e: + response_str = "Command failed with error: {}".format(e) + print("Unable to run command: ", response_str) + except Exception as e: + response_str = "Exception occurred: {}".format(e) + print("Exception occurred: ", response_str) + + return response_str + + +def run_cmd_and_save_output(output_path, out_filename, *argv): + file_path = os.path.join(create_output_dir(output_path), out_filename) + with open(file_path, 'w') as f: + return subprocess.Popen(argv, stdout=f) + + +def create_request(url, xml_type=True): + headers = {} + # TODO auth can be handled here + if xml_type: + headers["Accept"] = "application/xml" + + try: + req = request.Request(url, headers=headers) + response = request.urlopen(req) + response_str = response.read().decode('utf-8') + except error.HTTPError as e: + response_str = "HTTP error occurred: {} - {}".format(e.code, e.reason) + print("Request failed: ", response_str) + except Exception as e: + response_str = "Unexpected error: {}".format(e) + print("Request failed: {}".format(response_str)) + + return response_str + + +def get_nodemanager_address(app_id): + app_info = create_request("http://{}/ws/v1/cluster/apps/{}".format(RM_ADDRESS, app_id)) + app_info_xml = ET.fromstring(app_info) + return app_info_xml.find("amHostHttpAddress").text + + +def get_node_log_address(node_address, link_regex): + try: + log_page = create_request("http://{}/logs/".format(node_address), False) + matches = re.findall(link_regex, log_page, re.MULTILINE) + if not matches: + return "Warning: No matching log links found at {}/logs/".format(node_address) + return node_address + matches[0] + except Exception as e: + return "Failed to retrieve node logs address from {}: {}".format(node_address, e) + + +def filter_node_log(node_log_address: str, start_time: str, end_time: str): + return run_command("curl", "-s", "http://{}".format(node_log_address), "|", "sed", "-n", + "'/{}/,/{}/p'".format(start_time, end_time)) + + +def get_container_log(log_address, id): + return run_command("curl", "http://{}".format(log_address), "|", "grep", re.sub(r"^(job|application)", "container", id)) + + +def get_application_time(app_info_string): + app_element = ET.fromstring(app_info_string) + start_time_epoch = int(app_element.find("startedTime").text) + finish_time_epoch = int(app_element.find("finishedTime").text) + + start_time_str = datetime.fromtimestamp(start_time_epoch / 1000).strftime(OUTPUT_TIME_FORMAT)[:-4] # -4, the time conversion is not accurrate + finish_time_str = datetime.fromtimestamp(finish_time_epoch / 1000).strftime(OUTPUT_TIME_FORMAT)[:-4] + + return start_time_str, finish_time_str + + +def get_resourcemanager_pid(): + results = run_command("ps", "aux", "|", "grep", "resourcemanager", "|", "grep", "-v", "grep") + + pids = [] + for result in results.strip().splitlines(): + pid = result.split()[1] + pids.append(pid) + + return pids + + +def get_multiple_jstack(pids): + all_jstacks = [] + + for pid in pids: + for i in range(NUMBER_OF_JSTACK): # Get multiple jstack + jstack_output = run_command("jstack", pid) + all_jstacks.append("--- JStack iteration-{} for PID: {} ---\n{}".format(i, pid, jstack_output)) + + return "\n".join(all_jstacks) + + +def set_rm_scheduler_log_level(log_level): + return run_command("yarn", "daemonlog", "-setlevel", RM_ADDRESS, + "org.apache.hadoop.yarn.server.resourcemanager.scheduler", log_level) + + +def format_datetime_no_seconds(datetime_obj): + return datetime_obj.strftime(OUTPUT_TIME_FORMAT_WITHOUT_SECOND) + + +def main(): + + ISSUE_MAP = { + "application_diagnostic": application_diagnostic, + "scheduler_related_issue": scheduler_related_issue, + } + + parser = argparse.ArgumentParser() + parser.add_argument("-l", "--list", help="List the available issue types.", action="store_true") + parser.add_argument("-c", "--command", choices=list(ISSUE_MAP), help="Initiate the diagnostic information collecton" + "for diagnosing the selected issue type.") + parser.add_argument("-a", "--arguments", nargs='*', help="The required arguments for the selected issue type.") + global args + args = parser.parse_args() + + if not (args.list or args.command): + parser.error('No action requested, use --list or --command') + + if args.list: + list_issues() + sys.exit(os.EX_OK) + + global RM_ADDRESS + RM_ADDRESS = parse_url_from_conf(YARN_SITE_XML, RM_ADDRESS_PROPERTY_NAME) + if RM_ADDRESS is None: + print("RM address can't be found, exiting...") + sys.exit(1) + + selected_option = ISSUE_MAP[args.command] + print(selected_option()) # print the resulted output path that will be used by the DiagnosticsService.java + + +if __name__ == "__main__": + main() \ No newline at end of file