Skip to content

Commit

Permalink
Merge pull request #57 from anmolbabu/fixes
Browse files Browse the repository at this point in the history
Fixes
  • Loading branch information
r0h4n authored Apr 17, 2017
2 parents 34bf1c9 + 4772df0 commit 717eb23
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 53 deletions.
11 changes: 11 additions & 0 deletions tendrl/alerting/central_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ class AlertingEtcdPersister(central_store.EtcdCentralStore):
def __init__(self):
super(AlertingEtcdPersister, self).__init__()

def get_event_ids(self):
event_ids = []
etcd_events = NS.etcd_orm.client.read(
'/messages/events'
)
for event in etcd_events.leaves:
event_parts = event.key.split('/')
if len(event_parts) >= 4:
event_ids.append(event_parts[3])
return event_ids

def save_config(self, config):
NS.etcd_orm.save(config)

Expand Down
11 changes: 5 additions & 6 deletions tendrl/alerting/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import etcd
import gevent
import importlib
import inspect
import json
import multiprocessing
import os
from tendrl.alerting.utils import list_modules_in_package_path
import six
Expand All @@ -12,7 +12,6 @@
from tendrl.commons.message import ExceptionMessage
from tendrl.commons.message import Message
from tendrl.commons.utils.time_utils import now
import time


class NoHandlerException(Exception):
Expand Down Expand Up @@ -119,7 +118,7 @@ def handle(self, alert_obj):
)


class AlertHandlerManager(multiprocessing.Process):
class AlertHandlerManager(gevent.greenlet.Greenlet):
def load_handlers(self):
path = os.path.dirname(os.path.abspath(__file__))
pkg = 'tendrl.alerting.handlers'
Expand All @@ -135,7 +134,7 @@ def __init__(self):
super(AlertHandlerManager, self).__init__()
try:
self.alert_handlers = []
self.complete = multiprocessing.Event()
self.complete = gevent.event.Event()
self.load_handlers()
self.init_alerttypes()
except (SyntaxError, ValueError, ImportError) as ex:
Expand All @@ -155,11 +154,11 @@ def init_alerttypes(self):
for handler in AlertHandler.handlers:
NS.alert_types.append(handler.representive_name)

def run(self):
def _run(self):
try:
while not self.complete.is_set():
new_msg_id = NS.alert_queue.get()
time.sleep(15)
gevent.sleep(15)
msg_priority = NS.etcd_orm.client.read(
'/messages/events/%s/priority' % new_msg_id
).value
Expand Down
8 changes: 4 additions & 4 deletions tendrl/alerting/manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import multiprocessing
from gevent.queue import Queue
import os
import signal
from tendrl.alerting.central_store import AlertingEtcdPersister
Expand All @@ -17,7 +17,7 @@
class TendrlAlertingManager(object):
def __init__(self):
try:
NS.alert_queue = multiprocessing.Queue()
NS.alert_queue = Queue()
NS.alert_types = []
NS.notification_medium = []
self.alert_handler_manager = AlertHandlerManager()
Expand All @@ -38,6 +38,7 @@ def __init__(self):

def start(self):
try:
NS.central_store_thread.start()
self.alert_handler_manager.start()
self.watch_manager.run()
except (
Expand All @@ -59,9 +60,8 @@ def start(self):

def stop(self):
try:
NS.alert_queue.close()
self.watch_manager.stop()
os.system("ps -C tendrl-alerting -o pid=|xargs kill -9")
NS.central_store_thread.stop()
except Exception as ex:
Event(
ExceptionMessage(
Expand Down
69 changes: 26 additions & 43 deletions tendrl/alerting/watcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,38 @@
import multiprocessing
from tendrl.commons.etcdobj import Server as etcd_server
from etcd import EtcdKeyNotFound
import gevent
from tendrl.commons.event import Event
from tendrl.commons.message import ExceptionMessage


class AlertsWatchManager(multiprocessing.Process):
class AlertsWatchManager(gevent.greenlet.Greenlet):
def __init__(self):
super(AlertsWatchManager, self).__init__()
etcd_kwargs = {
'port': int(NS.alerting.config.data["etcd_port"]),
'host': NS.alerting.config.data["etcd_connection"]
}
self.etcd_client = etcd_server(etcd_kwargs=etcd_kwargs).client
self.complete = multiprocessing.Event()
self.complete = gevent.event.Event()
self.handled_msgs = []

def run(self):
try:
while not self.complete.is_set():
new_message = self.etcd_client.watch(
'/messages/events',
recursive=True,
timeout=0
def _run(self):
while not self.complete.is_set():
try:
event_ids = NS.central_store_thread.get_event_ids()
for event_id in event_ids:
if event_id not in self.handled_msgs:
self.handled_msgs.append(event_id)
NS.alert_queue.put(event_id)
except EtcdKeyNotFound:
continue
except Exception as ex:
Event(
ExceptionMessage(
priority="error",
publisher="alerting",
payload={
"message": 'Exception in alert watcher',
"exception": ex
}
)
)
if (
new_message is None or
str(new_message.action) == 'delete' or
not new_message
):
continue
msg_parts = new_message.key.split('/')
if (
new_message.key.startswith('/messages/events') and
len(msg_parts) >= 4
):
message_id = msg_parts[3]
if message_id not in self.handled_msgs:
self.handled_msgs.append(message_id)
NS.alert_queue.put(message_id)
except Exception as ex:
Event(
ExceptionMessage(
priority="error",
publisher="alerting",
payload={
"message": 'Exception in alert watcher',
"exception": ex
}
)
)
raise ex
raise ex
gevent.sleep(30)

def stop(self):
self.complete.set()

0 comments on commit 717eb23

Please sign in to comment.