From b140557f3d163c9630c5de8d168eed321ffe789d Mon Sep 17 00:00:00 2001 From: Dmitry Romanenko <Dmitry@Romanenko.in> Date: Sun, 21 Nov 2021 23:13:28 -0500 Subject: [PATCH 1/3] Dynamic generated CLI --- yarn_api_client/main.py | 283 +++++++++++++++------------------------- 1 file changed, 105 insertions(+), 178 deletions(-) diff --git a/yarn_api_client/main.py b/yarn_api_client/main.py index adb64ad..3522ccd 100644 --- a/yarn_api_client/main.py +++ b/yarn_api_client/main.py @@ -26,6 +26,19 @@ def get_parser(): return parser +def create_parsers(subparsers_instance, module_class, module_name, listing_of_apis): + for api in listing_of_apis: + _help_message = module_name + " " + api.replace("_", " ").title() + " API" + _new_parser = subparsers_instance.add_parser( + api, help=_help_message + ) + _new_parser.set_defaults(method=api) + _method = getattr(module_class, api) + for _arg in _method.__code__.co_varnames: + if _arg != 'self': + _new_parser.add_argument(_arg) + + def populate_resource_manager_arguments(subparsers): rm_parser = subparsers.add_parser( 'rm', help='ResourceManager REST API\'s') @@ -33,62 +46,51 @@ def populate_resource_manager_arguments(subparsers): rm_subparsers = rm_parser.add_subparsers() - ci_parser = rm_subparsers.add_parser( - 'info', help='Cluster Information API') - ci_parser.set_defaults(method='cluster_information') - - cm_parser = rm_subparsers.add_parser( - 'metrics', help='Cluster Metrics API') - cm_parser.set_defaults(method='cluster_metrics') - - cs_parser = rm_subparsers.add_parser( - 'scheduler', help='Cluster Scheduler API') - cs_parser.set_defaults(method='cluster_scheduler') - - cas_parser = rm_subparsers.add_parser( - 'apps', help='Cluster Applications API') - cas_parser.add_argument('--state', - help='states of the applications', - choices=dict(YarnApplicationState).keys()) - cas_parser.add_argument('--final-status', - choices=dict(FinalApplicationStatus).keys()) - cas_parser.add_argument('--user') - cas_parser.add_argument('--queue') - cas_parser.add_argument('--limit') - cas_parser.add_argument('--started-time-begin') - cas_parser.add_argument('--started-time-end') - cas_parser.add_argument('--finished-time-begin') - cas_parser.add_argument('--finished-time-end') - cas_parser.set_defaults(method='cluster_applications') - cas_parser.set_defaults(method_kwargs=[ - 'state', 'user', 'queue', 'limit', - 'started_time_begin', 'started_time_end', 'finished_time_begin', - 'finished_time_end', 'final_status']) - - ca_parser = rm_subparsers.add_parser( - 'app', help='Cluster Application API') - ca_parser.add_argument('application_id') - ca_parser.set_defaults(method='cluster_application') - ca_parser.set_defaults(method_args=['application_id']) - - caa_parser = rm_subparsers.add_parser( - 'app_attempts', help='Cluster Application Attempts API') - caa_parser.add_argument('application_id') - caa_parser.set_defaults(method='cluster_application_attempts') - caa_parser.set_defaults(method_args=['application_id']) - - cns_parser = rm_subparsers.add_parser( - 'nodes', help='Cluster Nodes API') - cns_parser.add_argument('--state', help='the state of the node') - cns_parser.add_argument('--healthy', help='true or false') - cns_parser.set_defaults(method='cluster_nodes') - cns_parser.set_defaults(method_kargs=['state', 'healthy']) - - cn_parser = rm_subparsers.add_parser( - 'node', help='Cluster Node API') - cn_parser.add_argument('node_id') - cn_parser.set_defaults(method='cluster_node') - cn_parser.set_defaults(method_args=['node_id']) + listing_of_apis = [ + 'cluster_information', + 'cluster_metrics', + 'cluster_scheduler', + 'cluster_applications', + 'cluster_application_statistics', + 'cluster_application', + 'cluster_application_attempts', + 'cluster_application_attempt_info', + 'cluster_application_attempt_containers', + 'cluster_application_attempt_container_info', + 'cluster_application_state', + 'cluster_application_kill', + 'cluster_nodes', + 'cluster_node', + 'cluster_node_update_resource', + 'cluster_submit_application', + 'cluster_new_application', + 'cluster_get_application_queue', + 'cluster_change_application_queue', + 'cluster_get_application_priority', + 'cluster_change_application_priority', + 'cluster_node_container_memory', + 'cluster_scheduler_queue', + 'cluster_scheduler_queue_availability', + 'cluster_queue_partition', + 'cluster_reservations', + 'cluster_new_delegation_token', + 'cluster_renew_delegation_token', + 'cluster_cancel_delegation_token', + 'cluster_new_reservation', + 'cluster_submit_reservation', + 'cluster_update_reservation', + 'cluster_delete_reservation', + 'cluster_application_timeouts', + 'cluster_application_timeout', + 'cluster_update_application_timeout', + 'cluster_scheduler_conf_mutation', + 'cluster_modify_scheduler_conf_mutation', + 'cluster_container_signal', + 'scheduler_activities', + 'application_activities' + ] + + create_parsers(rm_subparsers, ResourceManager, "Resource Manager", listing_of_apis) def populate_node_manager_arguments(subparsers): @@ -98,44 +100,44 @@ def populate_node_manager_arguments(subparsers): nm_subparsers = nm_parser.add_subparsers() - ni_parser = nm_subparsers.add_parser( - 'info', help='NodeManager Information API') - ni_parser.set_defaults(method='node_information') - - nas_parser = nm_subparsers.add_parser( - 'apps', help='Applications API') - nas_parser.add_argument('--state', - help='application state', - choices=dict(ApplicationState).keys()) - nas_parser.add_argument('--user', - help='user name') - nas_parser.set_defaults(method='node_applications') - nas_parser.set_defaults(method_kwargs=['state', 'user']) - - na_parser = nm_subparsers.add_parser( - 'app', help='Application API') - na_parser.add_argument('application_id') - na_parser.set_defaults(method='node_application') - na_parser.set_defaults(method_args=['application_id']) - - ncs_parser = nm_subparsers.add_parser( - 'containers', help='Containers API') - ncs_parser.set_defaults(method='node_containers') - - nc_parser = nm_subparsers.add_parser( - 'container', help='Container API') - nc_parser.add_argument('container_id') - nc_parser.set_defaults(method='node_container') - nc_parser.set_defaults(method_args=['container_id']) + listing_of_apis = [ + 'node_information', + 'node_applications', + 'node_application', + 'node_containers', + 'node_container', + 'auxiliary_services', + 'auxiliary_services_update' + ] + + create_parsers(nm_subparsers, NodeManager, "Node Manager", listing_of_apis) def populate_application_master_arguments(subparsers): am_parser = subparsers.add_parser( 'am', help='MapReduce Application Master REST API\'s') am_parser.set_defaults(api_class=ApplicationMaster) - am_parser.add_argument('application_id') - # TODO: not implemented + am_subparsers = am_parser.add_subparsers() + + listing_of_apis = [ + 'application_information', + 'jobs', + 'job', + 'job_attempts', + 'job_counters', + 'job_conf', + 'job_tasks', + 'job_task', + 'task_counters', + 'task_attempts', + 'task_attempt', + 'task_attempt_state', + 'task_attempt_state_kill', + 'task_attempt_counters' + ] + + create_parsers(am_subparsers, ApplicationMaster, "Application Master", listing_of_apis) def populate_history_server_arguments(subparsers): @@ -145,97 +147,22 @@ def populate_history_server_arguments(subparsers): hs_subparsers = hs_parser.add_subparsers() - hi_parser = hs_subparsers.add_parser( - 'info', help='History Server Information API') - hi_parser.set_defaults(method='application_information') - - hjs_parser = hs_subparsers.add_parser( - 'jobs', help='Jobs API') - hjs_parser.add_argument('--state', - help='states of the applications', - choices=dict(JobStateInternal).keys()) - hjs_parser.add_argument('--user') - hjs_parser.add_argument('--queue') - hjs_parser.add_argument('--limit') - hjs_parser.add_argument('--started-time-begin') - hjs_parser.add_argument('--started-time-end') - hjs_parser.add_argument('--finished-time-begin') - hjs_parser.add_argument('--finished-time-end') - hjs_parser.set_defaults(method='jobs') - hjs_parser.set_defaults(method_kwargs=[ - 'state', 'user', 'queue', 'limit', - 'started_time_begin', 'started_time_end', 'finished_time_begin', - 'finished_time_end']) - - hj_parser = hs_subparsers.add_parser('job', help='Job API') - hj_parser.add_argument('job_id') - hj_parser.set_defaults(method='job') - hj_parser.set_defaults(method_args=['job_id']) - - hja_parser = hs_subparsers.add_parser( - 'job_attempts', help='Job Attempts API') - hja_parser.add_argument('job_id') - hja_parser.set_defaults(method='job_attempts') - hja_parser.set_defaults(method_args=['job_id']) - - hjc_parser = hs_subparsers.add_parser( - 'job_counters', help='Job Counters API') - hjc_parser.add_argument('job_id') - hjc_parser.set_defaults(method='job_counters') - hjc_parser.set_defaults(method_args=['job_id']) - - hjcn_parser = hs_subparsers.add_parser( - 'job_conf', help='Job Conf API') - hjcn_parser.add_argument('job_id') - hjcn_parser.set_defaults(method='job_conf') - hjcn_parser.set_defaults(method_args=['job_id']) - - hts_parser = hs_subparsers.add_parser( - 'tasks', help='Tasks API') - hts_parser.add_argument('job_id') - hts_parser.add_argument('--type', choices=['m', 'r'], - help=('type of task, m for map task ' - 'or r for reduce task.')) - hts_parser.set_defaults(method='job_tasks') - hts_parser.set_defaults(method_args=['job_id']) - hts_parser.set_defaults(method_kwargs=['type']) - - ht_parser = hs_subparsers.add_parser( - 'task', help='Task API') - ht_parser.add_argument('job_id') - ht_parser.add_argument('task_id') - ht_parser.set_defaults(method='job_task') - ht_parser.set_defaults(method_args=['job_id', 'task_id']) - - htc_parser = hs_subparsers.add_parser( - 'task_counters', help='Task Counters API') - htc_parser.add_argument('job_id') - htc_parser.add_argument('task_id') - htc_parser.set_defaults(method='task_counters') - htc_parser.set_defaults(method_args=['job_id', 'task_id']) - - htas_parser = hs_subparsers.add_parser( - 'task_attempts', help='Task Attempts API') - htas_parser.add_argument('job_id') - htas_parser.add_argument('task_id') - htas_parser.set_defaults(method='task_attempts') - htas_parser.set_defaults(method_args=['job_id', 'task_id']) - - hta_parser = hs_subparsers.add_parser( - 'task_attempt', help='Task Attempt API') - hta_parser.add_argument('job_id') - hta_parser.add_argument('task_id') - hta_parser.add_argument('attempt_id') - hta_parser.set_defaults(method='task_attempt') - hta_parser.set_defaults(method_args=['job_id', 'task_id', 'attempt_id']) - - htac_parser = hs_subparsers.add_parser( - 'task_attempt_counters', help='Task Attempt Counters API') - htac_parser.add_argument('job_id') - htac_parser.add_argument('task_id') - htac_parser.add_argument('attempt_id') - htac_parser.set_defaults(method='task_attempt_counters') - htac_parser.set_defaults(method_args=['job_id', 'task_id', 'attempt_id']) + listing_of_apis = [ + 'application_information', + 'jobs', + 'job', + 'job_attempts', + 'job_counters', + 'job_conf', + 'job_tasks', + 'job_task', + 'task_counters', + 'task_attempts', + 'task_attempt', + 'task_attempt_counters' + ] + + create_parsers(hs_subparsers, HistoryServer, "History Server", listing_of_apis) def main(): From 7af126800fecf37dff47bb946ae35494a2fff525 Mon Sep 17 00:00:00 2001 From: Dmitry Romanenko <Dmitry@Romanenko.in> Date: Sun, 5 Dec 2021 23:32:57 -0500 Subject: [PATCH 2/3] Add support for other constructor values, fix leak of local vars --- yarn_api_client/main.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/yarn_api_client/main.py b/yarn_api_client/main.py index 3522ccd..a8f84f8 100644 --- a/yarn_api_client/main.py +++ b/yarn_api_client/main.py @@ -3,6 +3,7 @@ import argparse from pprint import pprint +from .auth import SimpleAuth from .base import get_logger from .constants import (YarnApplicationState, FinalApplicationStatus, ApplicationState, JobStateInternal) @@ -16,6 +17,10 @@ def get_parser(): description='Client for Hadoop® YARN API') parser.add_argument('--endpoint', help='API endpoint (https://test.cluster.com:8090)') + #parser.add_argument('--api_class', help='Please provide api class - rm, hs, nm, am', required=True) + parser.add_argument('--timeout', help='Request timeout', default=30) + parser.add_argument('--auth', help='Authentication type', default=None, choices=['simple', None]) + parser.add_argument('--verify', help='Verify cert or not', default=True) subparsers = parser.add_subparsers() populate_resource_manager_arguments(subparsers) @@ -34,7 +39,7 @@ def create_parsers(subparsers_instance, module_class, module_name, listing_of_ap ) _new_parser.set_defaults(method=api) _method = getattr(module_class, api) - for _arg in _method.__code__.co_varnames: + for _arg in _method.__code__.co_varnames[:_method.__code__.co_argcount]: if _arg != 'self': _new_parser.add_argument(_arg) @@ -170,15 +175,23 @@ def main(): opts = parser.parse_args() class_kwargs = {} - if not hasattr(opts, 'api_class'): - raise Exception("Please provide api class - rm, hs, nm, am") # Only ResourceManager supports HA - elif opts.endpoint: + if opts.endpoint: if opts.api_class == ResourceManager: class_kwargs['service_endpoints'] = opts.endpoint.split(",") else: class_kwargs['service_endpoint'] = opts.endpoint + # CLI requires some special accommodation for Auth - custom class imports + if opts.auth: + # Currenly only hadoop's SimpleAuth and none are supported out of the box + if opts.auth == 'simple': + class_kwargs['auth'] = SimpleAuth() + else: + raise Exception( + "This auth mentod is not supported by CLI, please write your own python script if needed" + ) + api = opts.api_class(**class_kwargs) # Construct positional arguments for method if 'method_args' in opts: From 2fce182b2d6ff3f25bfdfa2bab6ee2c195b990cb Mon Sep 17 00:00:00 2001 From: Dmitry Romanenko <Dmitry@Romanenko.in> Date: Tue, 7 Dec 2021 21:01:29 +0000 Subject: [PATCH 3/3] Simpler exception handling Co-authored-by: Kevin Bates <kbates4@gmail.com> --- yarn_api_client/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn_api_client/main.py b/yarn_api_client/main.py index a8f84f8..e55923f 100644 --- a/yarn_api_client/main.py +++ b/yarn_api_client/main.py @@ -189,7 +189,7 @@ def main(): class_kwargs['auth'] = SimpleAuth() else: raise Exception( - "This auth mentod is not supported by CLI, please write your own python script if needed" + f"This auth method ({opts.auth}) is not supported by the CLI." ) api = opts.api_class(**class_kwargs)