Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: interval based API calls #1518

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 142 additions & 149 deletions src/pmdas/elasticsearch/pmdaelasticsearch.python
Original file line number Diff line number Diff line change
Expand Up @@ -45,64 +45,76 @@ from cpmda import PMDA_FETCH_NOVALUES

DEFAULT_PORT = 9200
DEFAULT_URL = f"http://localhost:{DEFAULT_PORT}/"
DEFAULT_VERSION = 2
DEFAULT_USER = "root"


def _is_pmda_setup():
"""checks if PMDA is in setup state"""
return os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS')
class ElasticsearchApi(object):
def __init__(self, user, baseurl, auth, password):
self.user = user
self.baseurl = baseurl
self.auth = auth
self.password = password
passman = httprequest.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, url, auth, pasw)
authhandler = httprequest.HTTPBasicAuthHandler(passman)
opener = httprequest.build_opener(authhandler)
httprequest.install_opener(opener)
# Just make sure we can actually communicate with
request = httprequest.urlopen(baseurl)
request.read()
request.close()
self.interval_end = 0
self.interval_dur = 10
self.cache = {}

def fetch(self, api_suffix):
""" Perform HTTP GET """
now = time.time()

while now > self.interval_end:
# Move the interval in increments of the interval duration until
# it catches up to the current time.
self.interval_end += self.interval_dur

try:
exp_ts, data = self.cache[api_suffix]
except KeyError:
# We do not have anything in the cache for this api_suffix
exp_ts, data = self.interval_end, None

if now >= exp_ts or data is None:
# self.log(f"url: {self.baseurl}/{api_suffix}")
data = json.loads(httprequest.urlopen(f"{self.baseurl}/{api_suffix}").read())
self.cache[api_suffix] = (self.interval_end, data)

def _setup_urllib(url, auth, pasw):
""" Setup urllib """
passman = httprequest.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, url, auth, pasw)
authhandler = httprequest.HTTPBasicAuthHandler(passman)
opener = httprequest.build_opener(authhandler)
httprequest.install_opener(opener)
return httprequest
return data


class ElasticsearchPMDA(PMDA):
""" PCP Elasticsearch PMDA """
def __init__(self, name, domain):
""" Constructor """
PMDA.__init__(self, name, domain)
self.user = DEFAULT_USER
self.baseurl = DEFAULT_URL
self.auth = None
self.password = None
self.request = None
self.error = False
self.version = DEFAULT_VERSION
self.read_config()
# statistics dicts
self.cluster = {}
self.nodes = {}
self.node_info = {}
self.info = {}
self.stat = {}
self.search = {}
self.perindex = {}
self.master_node = None
self.index = {}
self.pmda_setup = os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS')

user, baseurl, auth, password, = self._read_config()

if self.user != DEFAULT_USER and not _is_pmda_setup():
self.log(f"Switching to user '{self.user}'")
self.set_user(self.user)
self.refreshed = None # Timestamp

if user != DEFAULT_USER and not self.pmda_setup:
self.log(f"Switching to user '{user}'")
self.set_user(user)

self.connect_pmcd()

if not _is_pmda_setup():
self.es_api = None
if not self.pmda_setup:
try:
connection = self.baseurl
self.request = _setup_urllib(self.baseurl, self.auth, self.password)
request = self.request.urlopen(self.baseurl)
request.read()
request.close()
self.es_api = ElasticsearchApi(user, baseurl, auth, password)
except (BaseException, Exception):
self.log(f"Failed Elasticsearch connection attempt at {connection}")
self.log(f"Failed Elasticsearch connection attempt at {baseurl}")

self._init_statistics_dicts()

# metrics setup
self.cluster_indom = PM_INDOM_NULL
Expand Down Expand Up @@ -471,10 +483,33 @@ class ElasticsearchPMDA(PMDA):
metric[1], self.index_indom,
metric[2], metric[3]),
metric[4], metric[4])

self.url_map = {
self.cluster_cluster: "_cluster/health",
self.nodes_cluster: "_nodes/stats",
self.nodes_info_cluster: "_nodes",
self.master_node_cluster: "_nodes",
self.version_cluster: "",
self.search_cluster: "_stats/search",
self.perindex_cluster: "_stats/search",
self.index_cluster: "_cluster/state/metadata",
}

self.set_refresh(self.elasticsearch_refresh)
self.set_fetch_callback(self.elasticsearch_fetch_callback)

def read_config(self):
def _init_statistics_dicts(self):
self.cluster = {}
self.nodes = {}
self.node_info = {}
self.info = {}
self.stat = {}
self.search = {}
self.perindex = {}
self.master_node = None
self.index = {}

def _read_config(self):
""" Read Configuration file """

configdir = PCP.pmGetConfig('PCP_PMDAS_DIR')
Expand All @@ -489,147 +524,105 @@ class ElasticsearchPMDA(PMDA):
else:
config = ConfigParser.SafeConfigParser()
config.read(configfile)
user = DEFAULT_USER
baseurl = DEFAULT_URL
auth = None
password = None
if config.has_section('pmda'):
for opt in config.options('pmda'):
if opt == 'user':
self.user = config.get('pmda', opt)
user = config.get('pmda', opt)
elif opt == 'baseurl':
self.baseurl = config.get('pmda', opt)
baseurl = config.get('pmda', opt)
elif opt == 'auth':
self.auth = config.get('pmda', opt)
auth = config.get('pmda', opt)
elif opt == 'password':
self.password = config.get('pmda', opt)
password = config.get('pmda', opt)
else:
self.log(f"Ignoring directive '{opt}' in {configfile}.")
return (user, baseurl, auth, password)

def get_url(self, url):
""" Perform HTTP GET """
# self.log(f"url: {url}")
req = None
def elasticsearch_refresh(self, cluster):
""" Refresh current values for a given cluster """
try:
req = self.request.urlopen(url)
data = self.es_api.fetch(self.url_map[cluster])
except KeyError:
# Do nothing for unrecognized cluster
self.error = False
except (BaseException, Exception):
return
except (BaseException, Exception) as e:
if not self.error:
self.log(f"Failed to get URL {url}")
self.log(f"Failed to get URL: {e}")
self.error = True
if not req:
if self.cluster:
self.cluster = {}
if self.nodes:
self.nodes = {}
if self.node_info:
self.node_info = {}
if self.info:
self.info = {}
if self.stat:
self.stat = {}
if self.search:
self.search = {}
if self.perindex:
self.perindex = {}
if self.master_node:
self.master_node = None
if self.index:
self.index = {}
return req
self._init_statistics_dicts()
return
else:
self.error = False

def elasticsearch_refresh(self, cluster):
""" Refresh """
if cluster == self.cluster_cluster:
request = self.get_url(self.baseurl + "_cluster/health")
if request:
self.cluster = json.loads(request.read())
request.close()
return
self.cluster = data

if cluster == self.nodes_cluster:
temp = {}
elif cluster == self.nodes_cluster:
insts = {}
request = self.get_url(self.baseurl + "_nodes/stats")
if request:
temp = json.loads(request.read())
request.close()
self.nodes = {}
try:
self.nodes = {}
for _id, stats in temp['nodes'].items():
for _id, stats in data['nodes'].items():
name = stats['name']
insts[name] = c_int(1)
self.nodes[name] = stats
except KeyError:
pass
self.nodes_insts.set_instances(self.nodes_indom, insts)
self.replace_indom(self.nodes_indom, insts)
return

if cluster in (self.node_info_cluster, self.master_node_cluster):
temp = {}
elif cluster == self.master_node_cluster:
self.master_node = None
try:
for _id, info in data['nodes'].items():
if info['attributes']['master'] == "true":
self.master_node = str(info['name'])
break
except KeyError:
pass

elif cluster == self.node_info_cluster:
insts = {}
request = self.get_url(self.baseurl + "_nodes")
if request:
temp = json.loads(request.read())
request.close()
if cluster == self.master_node_cluster:
self.master_node = None
try:
for _id, info in temp['nodes'].items():
if info['attributes']['master'] == "true":
self.master_node = str(info['name'])
except KeyError:
pass
else:
try:
self.node_info = {}
for _id, info in temp['nodes'].items():
name = info['name']
insts[name] = c_int(1)
self.node_info[name] = info
except KeyError:
pass
self.node_info_insts.set_instances(self.node_info_indom, insts)
self.replace_indom(self.node_info_indom, insts)
return
self.node_info = {}
try:
for _id, info in data['nodes'].items():
name = info['name']
insts[name] = c_int(1)
self.node_info[name] = info
except KeyError:
pass
self.node_info_insts.set_instances(self.node_info_indom, insts)
self.replace_indom(self.node_info_indom, insts)

if cluster == self.version_cluster:
request = self.get_url(self.baseurl)
if request:
self.info = json.loads(request.read())
request.close()
return
elif cluster == self.version_cluster:
self.info = data

if cluster in (self.search_cluster, self.perindex_cluster):
temp = {}
insts = {}
request = self.get_url(self.baseurl + "_stats/search")
if request:
temp = json.loads(request.read())
request.close()
if cluster == self.search_cluster:
self.search = temp
else:
try:
self.perindex = temp['indices']
for index in temp['indices']:
insts[index] = c_int(1)
except KeyError:
pass
self.perindex_insts.set_instances(self.perindex_indom, insts)
self.replace_indom(self.perindex_indom, insts)
return
elif cluster == self.search_cluster:
self.search = data

if cluster == self.index_cluster:
temp = {}
elif cluster == self.perindex_cluster:
try:
self.perindex = data['indices']
except KeyError:
self.perindex = {}
insts = {}
request = self.get_url(self.baseurl + "_cluster/state/metadata")
if request:
temp = json.loads(request.read())
request.close()
for index in self.perindex:
insts[index] = c_int(1)
self.perindex_insts.set_instances(self.perindex_indom, insts)
self.replace_indom(self.perindex_indom, insts)

elif cluster == self.index_cluster:
try:
self.index = temp['metadata']['indices']
for index in temp['metadata']['indices']:
insts[index] = c_int(1)
self.index = data['metadata']['indices']
except KeyError:
pass
self.index = {}
insts = {}
for index in self.index:
insts[index] = c_int(1)
self.index_insts.set_instances(self.index_indom, insts)
self.replace_indom(self.index_indom, insts)

Expand Down