diff --git a/src/pmdas/elasticsearch/pmdaelasticsearch.python b/src/pmdas/elasticsearch/pmdaelasticsearch.python index c9bc65b5d5..2590b456ad 100644 --- a/src/pmdas/elasticsearch/pmdaelasticsearch.python +++ b/src/pmdas/elasticsearch/pmdaelasticsearch.python @@ -45,23 +45,49 @@ from cpmda import PMDA_FETCH_NOVALUES DEFAULT_PORT = 9200 DEFAULT_URL = f"http://localhost:{DEFAULT_PORT}/" -DEFAULT_VERSION = 2 DEFAULT_USER = "root" -def _is_pmda_setup(): - """checks if PMDA is in setup state""" - return os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS') +class ElasticsearchApi(object): + def __init__(self, user, baseurl, auth, password): + self.user = user + self.baseurl = baseurl + self.auth = auth + self.password = password + passman = httprequest.HTTPPasswordMgrWithDefaultRealm() + passman.add_password(None, url, auth, pasw) + authhandler = httprequest.HTTPBasicAuthHandler(passman) + opener = httprequest.build_opener(authhandler) + httprequest.install_opener(opener) + # Just make sure we can actually communicate with + request = httprequest.urlopen(baseurl) + request.read() + request.close() + self.interval_end = 0 + self.interval_dur = 10 + self.cache = {} + def fetch(self, api_suffix): + """ Perform HTTP GET """ + now = time.time() + + while now > self.interval_end: + # Move the interval in increments of the interval duration until + # it catches up to the current time. + self.interval_end += self.interval_dur + + try: + exp_ts, data = self.cache[api_suffix] + except KeyError: + # We do not have anything in the cache for this api_suffix + exp_ts, data = self.interval_end, None + + if now >= exp_ts or data is None: + # self.log(f"url: {self.baseurl}/{api_suffix}") + data = json.loads(httprequest.urlopen(f"{self.baseurl}/{api_suffix}").read()) + self.cache[api_suffix] = (self.interval_end, data) -def _setup_urllib(url, auth, pasw): - """ Setup urllib """ - passman = httprequest.HTTPPasswordMgrWithDefaultRealm() - passman.add_password(None, url, auth, pasw) - authhandler = httprequest.HTTPBasicAuthHandler(passman) - opener = httprequest.build_opener(authhandler) - httprequest.install_opener(opener) - return httprequest + return data class ElasticsearchPMDA(PMDA): @@ -69,40 +95,26 @@ class ElasticsearchPMDA(PMDA): def __init__(self, name, domain): """ Constructor """ PMDA.__init__(self, name, domain) - self.user = DEFAULT_USER - self.baseurl = DEFAULT_URL - self.auth = None - self.password = None - self.request = None - self.error = False - self.version = DEFAULT_VERSION - self.read_config() - # statistics dicts - self.cluster = {} - self.nodes = {} - self.node_info = {} - self.info = {} - self.stat = {} - self.search = {} - self.perindex = {} - self.master_node = None - self.index = {} + self.pmda_setup = os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS') + + user, baseurl, auth, password, = self._read_config() - if self.user != DEFAULT_USER and not _is_pmda_setup(): - self.log(f"Switching to user '{self.user}'") - self.set_user(self.user) + self.refreshed = None # Timestamp + + if user != DEFAULT_USER and not self.pmda_setup: + self.log(f"Switching to user '{user}'") + self.set_user(user) self.connect_pmcd() - if not _is_pmda_setup(): + self.es_api = None + if not self.pmda_setup: try: - connection = self.baseurl - self.request = _setup_urllib(self.baseurl, self.auth, self.password) - request = self.request.urlopen(self.baseurl) - request.read() - request.close() + self.es_api = ElasticsearchApi(user, baseurl, auth, password) except (BaseException, Exception): - self.log(f"Failed Elasticsearch connection attempt at {connection}") + self.log(f"Failed Elasticsearch connection attempt at {baseurl}") + + self._init_statistics_dicts() # metrics setup self.cluster_indom = PM_INDOM_NULL @@ -471,10 +483,33 @@ class ElasticsearchPMDA(PMDA): metric[1], self.index_indom, metric[2], metric[3]), metric[4], metric[4]) + + self.url_map = { + self.cluster_cluster: "_cluster/health", + self.nodes_cluster: "_nodes/stats", + self.nodes_info_cluster: "_nodes", + self.master_node_cluster: "_nodes", + self.version_cluster: "", + self.search_cluster: "_stats/search", + self.perindex_cluster: "_stats/search", + self.index_cluster: "_cluster/state/metadata", + } + self.set_refresh(self.elasticsearch_refresh) self.set_fetch_callback(self.elasticsearch_fetch_callback) - def read_config(self): + def _init_statistics_dicts(self): + self.cluster = {} + self.nodes = {} + self.node_info = {} + self.info = {} + self.stat = {} + self.search = {} + self.perindex = {} + self.master_node = None + self.index = {} + + def _read_config(self): """ Read Configuration file """ configdir = PCP.pmGetConfig('PCP_PMDAS_DIR') @@ -489,70 +524,49 @@ class ElasticsearchPMDA(PMDA): else: config = ConfigParser.SafeConfigParser() config.read(configfile) + user = DEFAULT_USER + baseurl = DEFAULT_URL + auth = None + password = None if config.has_section('pmda'): for opt in config.options('pmda'): if opt == 'user': - self.user = config.get('pmda', opt) + user = config.get('pmda', opt) elif opt == 'baseurl': - self.baseurl = config.get('pmda', opt) + baseurl = config.get('pmda', opt) elif opt == 'auth': - self.auth = config.get('pmda', opt) + auth = config.get('pmda', opt) elif opt == 'password': - self.password = config.get('pmda', opt) + password = config.get('pmda', opt) else: self.log(f"Ignoring directive '{opt}' in {configfile}.") + return (user, baseurl, auth, password) - def get_url(self, url): - """ Perform HTTP GET """ - # self.log(f"url: {url}") - req = None + def elasticsearch_refresh(self, cluster): + """ Refresh current values for a given cluster """ try: - req = self.request.urlopen(url) + data = self.es_api.fetch(self.url_map[cluster]) + except KeyError: + # Do nothing for unrecognized cluster self.error = False - except (BaseException, Exception): + return + except (BaseException, Exception) as e: if not self.error: - self.log(f"Failed to get URL {url}") + self.log(f"Failed to get URL: {e}") self.error = True - if not req: - if self.cluster: - self.cluster = {} - if self.nodes: - self.nodes = {} - if self.node_info: - self.node_info = {} - if self.info: - self.info = {} - if self.stat: - self.stat = {} - if self.search: - self.search = {} - if self.perindex: - self.perindex = {} - if self.master_node: - self.master_node = None - if self.index: - self.index = {} - return req + self._init_statistics_dicts() + return + else: + self.error = False - def elasticsearch_refresh(self, cluster): - """ Refresh """ if cluster == self.cluster_cluster: - request = self.get_url(self.baseurl + "_cluster/health") - if request: - self.cluster = json.loads(request.read()) - request.close() - return + self.cluster = data - if cluster == self.nodes_cluster: - temp = {} + elif cluster == self.nodes_cluster: insts = {} - request = self.get_url(self.baseurl + "_nodes/stats") - if request: - temp = json.loads(request.read()) - request.close() + self.nodes = {} try: - self.nodes = {} - for _id, stats in temp['nodes'].items(): + for _id, stats in data['nodes'].items(): name = stats['name'] insts[name] = c_int(1) self.nodes[name] = stats @@ -560,76 +574,55 @@ class ElasticsearchPMDA(PMDA): pass self.nodes_insts.set_instances(self.nodes_indom, insts) self.replace_indom(self.nodes_indom, insts) - return - if cluster in (self.node_info_cluster, self.master_node_cluster): - temp = {} + elif cluster == self.master_node_cluster: + self.master_node = None + try: + for _id, info in data['nodes'].items(): + if info['attributes']['master'] == "true": + self.master_node = str(info['name']) + break + except KeyError: + pass + + elif cluster == self.node_info_cluster: insts = {} - request = self.get_url(self.baseurl + "_nodes") - if request: - temp = json.loads(request.read()) - request.close() - if cluster == self.master_node_cluster: - self.master_node = None - try: - for _id, info in temp['nodes'].items(): - if info['attributes']['master'] == "true": - self.master_node = str(info['name']) - except KeyError: - pass - else: - try: - self.node_info = {} - for _id, info in temp['nodes'].items(): - name = info['name'] - insts[name] = c_int(1) - self.node_info[name] = info - except KeyError: - pass - self.node_info_insts.set_instances(self.node_info_indom, insts) - self.replace_indom(self.node_info_indom, insts) - return + self.node_info = {} + try: + for _id, info in data['nodes'].items(): + name = info['name'] + insts[name] = c_int(1) + self.node_info[name] = info + except KeyError: + pass + self.node_info_insts.set_instances(self.node_info_indom, insts) + self.replace_indom(self.node_info_indom, insts) - if cluster == self.version_cluster: - request = self.get_url(self.baseurl) - if request: - self.info = json.loads(request.read()) - request.close() - return + elif cluster == self.version_cluster: + self.info = data - if cluster in (self.search_cluster, self.perindex_cluster): - temp = {} - insts = {} - request = self.get_url(self.baseurl + "_stats/search") - if request: - temp = json.loads(request.read()) - request.close() - if cluster == self.search_cluster: - self.search = temp - else: - try: - self.perindex = temp['indices'] - for index in temp['indices']: - insts[index] = c_int(1) - except KeyError: - pass - self.perindex_insts.set_instances(self.perindex_indom, insts) - self.replace_indom(self.perindex_indom, insts) - return + elif cluster == self.search_cluster: + self.search = data - if cluster == self.index_cluster: - temp = {} + elif cluster == self.perindex_cluster: + try: + self.perindex = data['indices'] + except KeyError: + self.perindex = {} insts = {} - request = self.get_url(self.baseurl + "_cluster/state/metadata") - if request: - temp = json.loads(request.read()) - request.close() + for index in self.perindex: + insts[index] = c_int(1) + self.perindex_insts.set_instances(self.perindex_indom, insts) + self.replace_indom(self.perindex_indom, insts) + + elif cluster == self.index_cluster: try: - self.index = temp['metadata']['indices'] - for index in temp['metadata']['indices']: - insts[index] = c_int(1) + self.index = data['metadata']['indices'] except KeyError: - pass + self.index = {} + insts = {} + for index in self.index: + insts[index] = c_int(1) self.index_insts.set_instances(self.index_indom, insts) self.replace_indom(self.index_indom, insts)