Skip to content

Dynamic generated CLI #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 121 additions & 181 deletions yarn_api_client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
from pprint import pprint

from .auth import SimpleAuth
from .base import get_logger
from .constants import (YarnApplicationState, FinalApplicationStatus,
ApplicationState, JobStateInternal)
Expand All @@ -16,6 +17,10 @@ def get_parser():
description='Client for Hadoop® YARN API')

parser.add_argument('--endpoint', help='API endpoint (https://test.cluster.com:8090)')
#parser.add_argument('--api_class', help='Please provide api class - rm, hs, nm, am', required=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentionally commented out? If so, perhaps an additional comment as to why or when it should be uncommented would be helpful.

parser.add_argument('--timeout', help='Request timeout', default=30)
parser.add_argument('--auth', help='Authentication type', default=None, choices=['simple', None])
parser.add_argument('--verify', help='Verify cert or not', default=True)

subparsers = parser.add_subparsers()
populate_resource_manager_arguments(subparsers)
Expand All @@ -26,69 +31,71 @@ def get_parser():
return parser


def create_parsers(subparsers_instance, module_class, module_name, listing_of_apis):
for api in listing_of_apis:
_help_message = module_name + " " + api.replace("_", " ").title() + " API"
_new_parser = subparsers_instance.add_parser(
api, help=_help_message
)
_new_parser.set_defaults(method=api)
_method = getattr(module_class, api)
for _arg in _method.__code__.co_varnames[:_method.__code__.co_argcount]:
if _arg != 'self':
_new_parser.add_argument(_arg)


def populate_resource_manager_arguments(subparsers):
rm_parser = subparsers.add_parser(
'rm', help='ResourceManager REST API\'s')
rm_parser.set_defaults(api_class=ResourceManager)

rm_subparsers = rm_parser.add_subparsers()

ci_parser = rm_subparsers.add_parser(
'info', help='Cluster Information API')
ci_parser.set_defaults(method='cluster_information')

cm_parser = rm_subparsers.add_parser(
'metrics', help='Cluster Metrics API')
cm_parser.set_defaults(method='cluster_metrics')

cs_parser = rm_subparsers.add_parser(
'scheduler', help='Cluster Scheduler API')
cs_parser.set_defaults(method='cluster_scheduler')

cas_parser = rm_subparsers.add_parser(
'apps', help='Cluster Applications API')
cas_parser.add_argument('--state',
help='states of the applications',
choices=dict(YarnApplicationState).keys())
cas_parser.add_argument('--final-status',
choices=dict(FinalApplicationStatus).keys())
cas_parser.add_argument('--user')
cas_parser.add_argument('--queue')
cas_parser.add_argument('--limit')
cas_parser.add_argument('--started-time-begin')
cas_parser.add_argument('--started-time-end')
cas_parser.add_argument('--finished-time-begin')
cas_parser.add_argument('--finished-time-end')
cas_parser.set_defaults(method='cluster_applications')
cas_parser.set_defaults(method_kwargs=[
'state', 'user', 'queue', 'limit',
'started_time_begin', 'started_time_end', 'finished_time_begin',
'finished_time_end', 'final_status'])

ca_parser = rm_subparsers.add_parser(
'app', help='Cluster Application API')
ca_parser.add_argument('application_id')
ca_parser.set_defaults(method='cluster_application')
ca_parser.set_defaults(method_args=['application_id'])

caa_parser = rm_subparsers.add_parser(
'app_attempts', help='Cluster Application Attempts API')
caa_parser.add_argument('application_id')
caa_parser.set_defaults(method='cluster_application_attempts')
caa_parser.set_defaults(method_args=['application_id'])

cns_parser = rm_subparsers.add_parser(
'nodes', help='Cluster Nodes API')
cns_parser.add_argument('--state', help='the state of the node')
cns_parser.add_argument('--healthy', help='true or false')
cns_parser.set_defaults(method='cluster_nodes')
cns_parser.set_defaults(method_kargs=['state', 'healthy'])

cn_parser = rm_subparsers.add_parser(
'node', help='Cluster Node API')
cn_parser.add_argument('node_id')
cn_parser.set_defaults(method='cluster_node')
cn_parser.set_defaults(method_args=['node_id'])
listing_of_apis = [
'cluster_information',
'cluster_metrics',
'cluster_scheduler',
'cluster_applications',
'cluster_application_statistics',
'cluster_application',
'cluster_application_attempts',
'cluster_application_attempt_info',
'cluster_application_attempt_containers',
'cluster_application_attempt_container_info',
'cluster_application_state',
'cluster_application_kill',
'cluster_nodes',
'cluster_node',
'cluster_node_update_resource',
'cluster_submit_application',
'cluster_new_application',
'cluster_get_application_queue',
'cluster_change_application_queue',
'cluster_get_application_priority',
'cluster_change_application_priority',
'cluster_node_container_memory',
'cluster_scheduler_queue',
'cluster_scheduler_queue_availability',
'cluster_queue_partition',
'cluster_reservations',
'cluster_new_delegation_token',
'cluster_renew_delegation_token',
'cluster_cancel_delegation_token',
'cluster_new_reservation',
'cluster_submit_reservation',
'cluster_update_reservation',
'cluster_delete_reservation',
'cluster_application_timeouts',
'cluster_application_timeout',
'cluster_update_application_timeout',
'cluster_scheduler_conf_mutation',
'cluster_modify_scheduler_conf_mutation',
'cluster_container_signal',
'scheduler_activities',
'application_activities'
]

create_parsers(rm_subparsers, ResourceManager, "Resource Manager", listing_of_apis)


def populate_node_manager_arguments(subparsers):
Expand All @@ -98,44 +105,44 @@ def populate_node_manager_arguments(subparsers):

nm_subparsers = nm_parser.add_subparsers()

ni_parser = nm_subparsers.add_parser(
'info', help='NodeManager Information API')
ni_parser.set_defaults(method='node_information')

nas_parser = nm_subparsers.add_parser(
'apps', help='Applications API')
nas_parser.add_argument('--state',
help='application state',
choices=dict(ApplicationState).keys())
nas_parser.add_argument('--user',
help='user name')
nas_parser.set_defaults(method='node_applications')
nas_parser.set_defaults(method_kwargs=['state', 'user'])

na_parser = nm_subparsers.add_parser(
'app', help='Application API')
na_parser.add_argument('application_id')
na_parser.set_defaults(method='node_application')
na_parser.set_defaults(method_args=['application_id'])

ncs_parser = nm_subparsers.add_parser(
'containers', help='Containers API')
ncs_parser.set_defaults(method='node_containers')

nc_parser = nm_subparsers.add_parser(
'container', help='Container API')
nc_parser.add_argument('container_id')
nc_parser.set_defaults(method='node_container')
nc_parser.set_defaults(method_args=['container_id'])
listing_of_apis = [
'node_information',
'node_applications',
'node_application',
'node_containers',
'node_container',
'auxiliary_services',
'auxiliary_services_update'
]

create_parsers(nm_subparsers, NodeManager, "Node Manager", listing_of_apis)


def populate_application_master_arguments(subparsers):
am_parser = subparsers.add_parser(
'am', help='MapReduce Application Master REST API\'s')
am_parser.set_defaults(api_class=ApplicationMaster)
am_parser.add_argument('application_id')

# TODO: not implemented
am_subparsers = am_parser.add_subparsers()

listing_of_apis = [
'application_information',
'jobs',
'job',
'job_attempts',
'job_counters',
'job_conf',
'job_tasks',
'job_task',
'task_counters',
'task_attempts',
'task_attempt',
'task_attempt_state',
'task_attempt_state_kill',
'task_attempt_counters'
]

create_parsers(am_subparsers, ApplicationMaster, "Application Master", listing_of_apis)


def populate_history_server_arguments(subparsers):
Expand All @@ -145,113 +152,46 @@ def populate_history_server_arguments(subparsers):

hs_subparsers = hs_parser.add_subparsers()

hi_parser = hs_subparsers.add_parser(
'info', help='History Server Information API')
hi_parser.set_defaults(method='application_information')

hjs_parser = hs_subparsers.add_parser(
'jobs', help='Jobs API')
hjs_parser.add_argument('--state',
help='states of the applications',
choices=dict(JobStateInternal).keys())
hjs_parser.add_argument('--user')
hjs_parser.add_argument('--queue')
hjs_parser.add_argument('--limit')
hjs_parser.add_argument('--started-time-begin')
hjs_parser.add_argument('--started-time-end')
hjs_parser.add_argument('--finished-time-begin')
hjs_parser.add_argument('--finished-time-end')
hjs_parser.set_defaults(method='jobs')
hjs_parser.set_defaults(method_kwargs=[
'state', 'user', 'queue', 'limit',
'started_time_begin', 'started_time_end', 'finished_time_begin',
'finished_time_end'])

hj_parser = hs_subparsers.add_parser('job', help='Job API')
hj_parser.add_argument('job_id')
hj_parser.set_defaults(method='job')
hj_parser.set_defaults(method_args=['job_id'])

hja_parser = hs_subparsers.add_parser(
'job_attempts', help='Job Attempts API')
hja_parser.add_argument('job_id')
hja_parser.set_defaults(method='job_attempts')
hja_parser.set_defaults(method_args=['job_id'])

hjc_parser = hs_subparsers.add_parser(
'job_counters', help='Job Counters API')
hjc_parser.add_argument('job_id')
hjc_parser.set_defaults(method='job_counters')
hjc_parser.set_defaults(method_args=['job_id'])

hjcn_parser = hs_subparsers.add_parser(
'job_conf', help='Job Conf API')
hjcn_parser.add_argument('job_id')
hjcn_parser.set_defaults(method='job_conf')
hjcn_parser.set_defaults(method_args=['job_id'])

hts_parser = hs_subparsers.add_parser(
'tasks', help='Tasks API')
hts_parser.add_argument('job_id')
hts_parser.add_argument('--type', choices=['m', 'r'],
help=('type of task, m for map task '
'or r for reduce task.'))
hts_parser.set_defaults(method='job_tasks')
hts_parser.set_defaults(method_args=['job_id'])
hts_parser.set_defaults(method_kwargs=['type'])

ht_parser = hs_subparsers.add_parser(
'task', help='Task API')
ht_parser.add_argument('job_id')
ht_parser.add_argument('task_id')
ht_parser.set_defaults(method='job_task')
ht_parser.set_defaults(method_args=['job_id', 'task_id'])

htc_parser = hs_subparsers.add_parser(
'task_counters', help='Task Counters API')
htc_parser.add_argument('job_id')
htc_parser.add_argument('task_id')
htc_parser.set_defaults(method='task_counters')
htc_parser.set_defaults(method_args=['job_id', 'task_id'])

htas_parser = hs_subparsers.add_parser(
'task_attempts', help='Task Attempts API')
htas_parser.add_argument('job_id')
htas_parser.add_argument('task_id')
htas_parser.set_defaults(method='task_attempts')
htas_parser.set_defaults(method_args=['job_id', 'task_id'])

hta_parser = hs_subparsers.add_parser(
'task_attempt', help='Task Attempt API')
hta_parser.add_argument('job_id')
hta_parser.add_argument('task_id')
hta_parser.add_argument('attempt_id')
hta_parser.set_defaults(method='task_attempt')
hta_parser.set_defaults(method_args=['job_id', 'task_id', 'attempt_id'])

htac_parser = hs_subparsers.add_parser(
'task_attempt_counters', help='Task Attempt Counters API')
htac_parser.add_argument('job_id')
htac_parser.add_argument('task_id')
htac_parser.add_argument('attempt_id')
htac_parser.set_defaults(method='task_attempt_counters')
htac_parser.set_defaults(method_args=['job_id', 'task_id', 'attempt_id'])
listing_of_apis = [
'application_information',
'jobs',
'job',
'job_attempts',
'job_counters',
'job_conf',
'job_tasks',
'job_task',
'task_counters',
'task_attempts',
'task_attempt',
'task_attempt_counters'
]

create_parsers(hs_subparsers, HistoryServer, "History Server", listing_of_apis)


def main():
parser = get_parser()
opts = parser.parse_args()

class_kwargs = {}
if not hasattr(opts, 'api_class'):
raise Exception("Please provide api class - rm, hs, nm, am")
# Only ResourceManager supports HA
elif opts.endpoint:
if opts.endpoint:
if opts.api_class == ResourceManager:
class_kwargs['service_endpoints'] = opts.endpoint.split(",")
else:
class_kwargs['service_endpoint'] = opts.endpoint

# CLI requires some special accommodation for Auth - custom class imports
if opts.auth:
# Currenly only hadoop's SimpleAuth and none are supported out of the box
if opts.auth == 'simple':
class_kwargs['auth'] = SimpleAuth()
else:
raise Exception(
"This auth mentod is not supported by CLI, please write your own python script if needed"
)

api = opts.api_class(**class_kwargs)
# Construct positional arguments for method
if 'method_args' in opts:
Expand Down