Skip to content

Commit

Permalink
wip: interval based API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
portante committed Feb 5, 2022
1 parent 909d235 commit 34f8520
Showing 1 changed file with 142 additions and 149 deletions.
291 changes: 142 additions & 149 deletions src/pmdas/elasticsearch/pmdaelasticsearch.python
Original file line number Diff line number Diff line change
Expand Up @@ -45,64 +45,76 @@ from cpmda import PMDA_FETCH_NOVALUES

DEFAULT_PORT = 9200
DEFAULT_URL = f"http://localhost:{DEFAULT_PORT}/"
DEFAULT_VERSION = 2
DEFAULT_USER = "root"


def _is_pmda_setup():
"""checks if PMDA is in setup state"""
return os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS')
class ElasticsearchApi(object):
def __init__(self, user, baseurl, auth, password):
self.user = user
self.baseurl = baseurl
self.auth = auth
self.password = password
passman = httprequest.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, url, auth, pasw)
authhandler = httprequest.HTTPBasicAuthHandler(passman)
opener = httprequest.build_opener(authhandler)
httprequest.install_opener(opener)
# Just make sure we can actually communicate with
request = httprequest.urlopen(baseurl)
request.read()
request.close()
self.interval_end = 0
self.interval_dur = 10
self.cache = {}

def fetch(self, api_suffix):
""" Perform HTTP GET """
now = time.time()

while now > self.interval_end:
# Move the interval in increments of the interval duration until
# it catches up to the current time.
self.interval_end += self.interval_dur

try:
exp_ts, data = self.cache[api_suffix]
except KeyError:
# We do not have anything in the cache for this api_suffix
exp_ts, data = self.interval_end, None

if now >= exp_ts or data is None:
# self.log(f"url: {self.baseurl}/{api_suffix}")
data = json.loads(httprequest.urlopen(f"{self.baseurl}/{api_suffix}").read())
self.cache[api_suffix] = (self.interval_end, data)

def _setup_urllib(url, auth, pasw):
""" Setup urllib """
passman = httprequest.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, url, auth, pasw)
authhandler = httprequest.HTTPBasicAuthHandler(passman)
opener = httprequest.build_opener(authhandler)
httprequest.install_opener(opener)
return httprequest
return data


class ElasticsearchPMDA(PMDA):
""" PCP Elasticsearch PMDA """
def __init__(self, name, domain):
""" Constructor """
PMDA.__init__(self, name, domain)
self.user = DEFAULT_USER
self.baseurl = DEFAULT_URL
self.auth = None
self.password = None
self.request = None
self.error = False
self.version = DEFAULT_VERSION
self.read_config()
# statistics dicts
self.cluster = {}
self.nodes = {}
self.node_info = {}
self.info = {}
self.stat = {}
self.search = {}
self.perindex = {}
self.master_node = None
self.index = {}
self.pmda_setup = os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS')

user, baseurl, auth, password, = self._read_config()

if self.user != DEFAULT_USER and not _is_pmda_setup():
self.log(f"Switching to user '{self.user}'")
self.set_user(self.user)
self.refreshed = None # Timestamp

if user != DEFAULT_USER and not self.pmda_setup:
self.log(f"Switching to user '{user}'")
self.set_user(user)

self.connect_pmcd()

if not _is_pmda_setup():
self.es_api = None
if not self.pmda_setup:
try:
connection = self.baseurl
self.request = _setup_urllib(self.baseurl, self.auth, self.password)
request = self.request.urlopen(self.baseurl)
request.read()
request.close()
self.es_api = ElasticsearchApi(user, baseurl, auth, password)
except (BaseException, Exception):
self.log(f"Failed Elasticsearch connection attempt at {connection}")
self.log(f"Failed Elasticsearch connection attempt at {baseurl}")

self._init_statistics_dicts()

# metrics setup
self.cluster_indom = PM_INDOM_NULL
Expand Down Expand Up @@ -471,10 +483,33 @@ class ElasticsearchPMDA(PMDA):
metric[1], self.index_indom,
metric[2], metric[3]),
metric[4], metric[4])

self.url_map = {
self.cluster_cluster: "_cluster/health",
self.nodes_cluster: "_nodes/stats",
self.nodes_info_cluster: "_nodes",
self.master_node_cluster: "_nodes",
self.version_cluster: "",
self.search_cluster: "_stats/search",
self.perindex_cluster: "_stats/search",
self.index_cluster: "_cluster/state/metadata",
}

self.set_refresh(self.elasticsearch_refresh)
self.set_fetch_callback(self.elasticsearch_fetch_callback)

def read_config(self):
def _init_statistics_dicts(self):
self.cluster = {}
self.nodes = {}
self.node_info = {}
self.info = {}
self.stat = {}
self.search = {}
self.perindex = {}
self.master_node = None
self.index = {}

def _read_config(self):
""" Read Configuration file """

configdir = PCP.pmGetConfig('PCP_PMDAS_DIR')
Expand All @@ -489,147 +524,105 @@ class ElasticsearchPMDA(PMDA):
else:
config = ConfigParser.SafeConfigParser()
config.read(configfile)
user = DEFAULT_USER
baseurl = DEFAULT_URL
auth = None
password = None
if config.has_section('pmda'):
for opt in config.options('pmda'):
if opt == 'user':
self.user = config.get('pmda', opt)
user = config.get('pmda', opt)
elif opt == 'baseurl':
self.baseurl = config.get('pmda', opt)
baseurl = config.get('pmda', opt)
elif opt == 'auth':
self.auth = config.get('pmda', opt)
auth = config.get('pmda', opt)
elif opt == 'password':
self.password = config.get('pmda', opt)
password = config.get('pmda', opt)
else:
self.log(f"Ignoring directive '{opt}' in {configfile}.")
return (user, baseurl, auth, password)

def get_url(self, url):
""" Perform HTTP GET """
# self.log(f"url: {url}")
req = None
def elasticsearch_refresh(self, cluster):
""" Refresh current values for a given cluster """
try:
req = self.request.urlopen(url)
data = self.es_api.fetch(self.url_map[cluster])
except KeyError:
# Do nothing for unrecognized cluster
self.error = False
except (BaseException, Exception):
return
except (BaseException, Exception) as e:
if not self.error:
self.log(f"Failed to get URL {url}")
self.log(f"Failed to get URL: {e}")
self.error = True
if not req:
if self.cluster:
self.cluster = {}
if self.nodes:
self.nodes = {}
if self.node_info:
self.node_info = {}
if self.info:
self.info = {}
if self.stat:
self.stat = {}
if self.search:
self.search = {}
if self.perindex:
self.perindex = {}
if self.master_node:
self.master_node = None
if self.index:
self.index = {}
return req
self._init_statistics_dicts()
return
else:
self.error = False

def elasticsearch_refresh(self, cluster):
""" Refresh """
if cluster == self.cluster_cluster:
request = self.get_url(self.baseurl + "_cluster/health")
if request:
self.cluster = json.loads(request.read())
request.close()
return
self.cluster = data

if cluster == self.nodes_cluster:
temp = {}
elif cluster == self.nodes_cluster:
insts = {}
request = self.get_url(self.baseurl + "_nodes/stats")
if request:
temp = json.loads(request.read())
request.close()
self.nodes = {}
try:
self.nodes = {}
for _id, stats in temp['nodes'].items():
for _id, stats in data['nodes'].items():
name = stats['name']
insts[name] = c_int(1)
self.nodes[name] = stats
except KeyError:
pass
self.nodes_insts.set_instances(self.nodes_indom, insts)
self.replace_indom(self.nodes_indom, insts)
return

if cluster in (self.node_info_cluster, self.master_node_cluster):
temp = {}
elif cluster == self.master_node_cluster:
self.master_node = None
try:
for _id, info in data['nodes'].items():
if info['attributes']['master'] == "true":
self.master_node = str(info['name'])
break
except KeyError:
pass

elif cluster == self.node_info_cluster:
insts = {}
request = self.get_url(self.baseurl + "_nodes")
if request:
temp = json.loads(request.read())
request.close()
if cluster == self.master_node_cluster:
self.master_node = None
try:
for _id, info in temp['nodes'].items():
if info['attributes']['master'] == "true":
self.master_node = str(info['name'])
except KeyError:
pass
else:
try:
self.node_info = {}
for _id, info in temp['nodes'].items():
name = info['name']
insts[name] = c_int(1)
self.node_info[name] = info
except KeyError:
pass
self.node_info_insts.set_instances(self.node_info_indom, insts)
self.replace_indom(self.node_info_indom, insts)
return
self.node_info = {}
try:
for _id, info in data['nodes'].items():
name = info['name']
insts[name] = c_int(1)
self.node_info[name] = info
except KeyError:
pass
self.node_info_insts.set_instances(self.node_info_indom, insts)
self.replace_indom(self.node_info_indom, insts)

if cluster == self.version_cluster:
request = self.get_url(self.baseurl)
if request:
self.info = json.loads(request.read())
request.close()
return
elif cluster == self.version_cluster:
self.info = data

if cluster in (self.search_cluster, self.perindex_cluster):
temp = {}
insts = {}
request = self.get_url(self.baseurl + "_stats/search")
if request:
temp = json.loads(request.read())
request.close()
if cluster == self.search_cluster:
self.search = temp
else:
try:
self.perindex = temp['indices']
for index in temp['indices']:
insts[index] = c_int(1)
except KeyError:
pass
self.perindex_insts.set_instances(self.perindex_indom, insts)
self.replace_indom(self.perindex_indom, insts)
return
elif cluster == self.search_cluster:
self.search = data

if cluster == self.index_cluster:
temp = {}
elif cluster == self.perindex_cluster:
try:
self.perindex = data['indices']
except KeyError:
self.perindex = {}
insts = {}
request = self.get_url(self.baseurl + "_cluster/state/metadata")
if request:
temp = json.loads(request.read())
request.close()
for index in self.perindex:
insts[index] = c_int(1)
self.perindex_insts.set_instances(self.perindex_indom, insts)
self.replace_indom(self.perindex_indom, insts)

elif cluster == self.index_cluster:
try:
self.index = temp['metadata']['indices']
for index in temp['metadata']['indices']:
insts[index] = c_int(1)
self.index = data['metadata']['indices']
except KeyError:
pass
self.index = {}
insts = {}
for index in self.index:
insts[index] = c_int(1)
self.index_insts.set_instances(self.index_indom, insts)
self.replace_indom(self.index_indom, insts)

Expand Down

0 comments on commit 34f8520

Please sign in to comment.