Skip to content

Commit

Permalink
Merge pull request #18 from clowder-framework/no-rabbitmq-stop
Browse files Browse the repository at this point in the history
No rabbitmq stop
  • Loading branch information
robkooper authored Sep 30, 2020
2 parents 288c366 + cef0d75 commit 325d1d6
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 59 deletions.
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,27 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.3.2 - 2020-09-24

### Fixed
- When rabbitmq restarts the extractor would not stop and restart, resulting
in the extractor no longer receiving any messages. #17

### Added
- Can specify url to use for extractor downloads, this is helpful for instances
that have access to the internal URL for clowder, for example in docker/kubernetes.

### Removed
- Removed ability to run multiple connectors in the same python process. If
parallelism is needed, use multiple processes (or containers).

## 2.3.1 - 2020-09-18

With this version we no longer gurantee support for versions of python below 3.

### Fixed
- There was an issue where status messages could cause an exception. This would prevent most extractors from running correctly.
- There was an issue where status messages could cause an exception. This would
prevent most extractors from running correctly.

## 2.3.0 - 2020-09-15

Expand Down
7 changes: 7 additions & 0 deletions pyclowder/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,9 @@ def listen(self):
self.connection.close()
except Exception:
logging.getLogger(__name__).exception("Error while closing connection.")
if self.announcer:
self.announcer.stop_thread()

self.connection = None

def stop(self):
Expand Down Expand Up @@ -811,6 +814,9 @@ def start_thread(self):
self.thread.setDaemon(True)
self.thread.start()

def stop_thread(self):
self.thread = None

def send_heartbeat(self):
# create the message we will send
message = {
Expand Down Expand Up @@ -872,6 +878,7 @@ def start_thread(self, json_body):
"""
self.thread = threading.Thread(target=self._process_message, args=(json_body,))
self.thread.start()
self.thread.setDaemon(True)

def is_finished(self):
with self.lock:
Expand Down
108 changes: 51 additions & 57 deletions pyclowder/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ def __init__(self):
help='connector to use (default=RabbitMQ)')
self.parser.add_argument('--logging', '-l', nargs='?', default=logging_config,
help='file or url or logging coonfiguration (default=None)')
self.parser.add_argument('--num', '-n', type=int, nargs='?', default=1,
help='number of parallel instances (default=1)')
self.parser.add_argument('--pickle', nargs='*', dest="hpc_picklefile",
default=None, action='append',
help='pickle file that needs to be processed (only needed for HPC)')
Expand Down Expand Up @@ -129,32 +127,32 @@ def setup(self):
def start(self):
"""Create the connector and start listening.
Based on the num command line argument this will start multiple instances of a connector and run each of them
in their own thread. Once the connector(s) are created this function will go into a endless loop until either
Start a single instance of a connector and run it in their own thread.
Once the connector(s) are created this function will go into a endless loop until either
all connectors have stopped or the user kills the program.
"""
logger = logging.getLogger(__name__)
connectors = list()
for connum in range(self.args.num):
if self.args.connector == "RabbitMQ":
if 'rabbitmq_uri' not in self.args:
logger.error("Missing URI for RabbitMQ")
else:
rabbitmq_key = []
if not self.args.nobind:
for key, value in self.extractor_info['process'].items():
for mt in value:
# Replace trailing '*' with '#'
mt = re.sub('(\*$)', '#', mt)
if mt.find('*') > -1:
logger.error("Invalid '*' found in rabbitmq_key: %s" % mt)
connector = None

if self.args.connector == "RabbitMQ":
if 'rabbitmq_uri' not in self.args:
logger.error("Missing URI for RabbitMQ")
else:
rabbitmq_key = []
if not self.args.nobind:
for key, value in self.extractor_info['process'].items():
for mt in value:
# Replace trailing '*' with '#'
mt = re.sub('(\*$)', '#', mt)
if mt.find('*') > -1:
logger.error("Invalid '*' found in rabbitmq_key: %s" % mt)
else:
if mt == "":
rabbitmq_key.append("*.%s.#" % key)
else:
if mt == "":
rabbitmq_key.append("*.%s.#" % key)
else:
rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", ".")))
rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", ".")))

rconn = RabbitMQConnector(self.args.rabbitmq_queuename,
connector = RabbitMQConnector(self.args.rabbitmq_queuename,
self.extractor_info,
check_message=self.check_message,
process_message=self.process_message,
Expand All @@ -164,54 +162,50 @@ def start(self):
rabbitmq_queue=self.args.rabbitmq_queuename,
mounted_paths=json.loads(self.args.mounted_paths),
clowder_url=self.args.clowder_url)
rconn.connect()
rconn.register_extractor(self.args.registration_endpoints)
connectors.append(rconn)
threading.Thread(target=rconn.listen, name="Connector-" + str(connum)).start()
elif self.args.connector == "HPC":
if 'hpc_picklefile' not in self.args:
logger.error("Missing hpc_picklefile for HPCExtractor")
else:
hconn = HPCConnector(self.extractor_info['name'],
connector.connect()
connector.register_extractor(self.args.registration_endpoints)
threading.Thread(target=connector.listen, name="RabbitMQConnector").start()

elif self.args.connector == "HPC":
if 'hpc_picklefile' not in self.args:
logger.error("Missing hpc_picklefile for HPCExtractor")
else:
connector = HPCConnector(self.extractor_info['name'],
self.extractor_info,
check_message=self.check_message,
process_message=self.process_message,
picklefile=self.args.hpc_picklefile,
mounted_paths=json.loads(self.args.mounted_paths))
hconn.register_extractor(self.args.registration_endpoints)
connectors.append(hconn)
threading.Thread(target=hconn.listen, name="Connector-" + str(connum)).start()
elif self.args.connector == "Local":

if self.args.input_file_path is None:
logger.error("Environment variable INPUT_FILE_PATH or parameter --input-file-path is not set. "
"Please try again after setting one of these")
elif not os.path.isfile(self.args.input_file_path):
logger.error("Local input file is not a regular file. Please check the path.")
else:
local_connector = LocalConnector(self.extractor_info['name'],
self.extractor_info,
self.args.input_file_path,
process_message=self.process_message,
output_file_path=self.args.output_file_path)
connectors.append(local_connector)
threading.Thread(target=local_connector.listen, name="Connector-" + str(connum)).start()
connector.register_extractor(self.args.registration_endpoints)
threading.Thread(target=connector.listen, name="HPCConnector").start()

elif self.args.connector == "Local":
if self.args.input_file_path is None:
logger.error("Environment variable INPUT_FILE_PATH or parameter "
"--input-file-path is not set. Please try again after "
"setting one of these")
elif not os.path.isfile(self.args.input_file_path):
logger.error("Local input file is not a regular file. Please check the path.")
else:
logger.error("Could not create instance of %s connector.", self.args.connector)
sys.exit(-1)
connector = LocalConnector(self.extractor_info['name'],
self.extractor_info,
self.args.input_file_path,
process_message=self.process_message,
output_file_path=self.args.output_file_path)
threading.Thread(target=connector.listen, name="LocalConnector").start()
else:
logger.error("Could not create instance of %s connector.", self.args.connector)
sys.exit(-1)

logger.info("Waiting for messages. To exit press CTRL+C")
try:
while connectors:
while connector.alive():
time.sleep(1)
connectors = filter(lambda x: x.alive(), connectors)
except KeyboardInterrupt:
pass
except BaseException:
logger.exception("Error while consuming messages.")

for c in connectors:
c.stop()
connector.stop()

def get_metadata(self, content, resource_type, resource_id, server=None):
"""Generate a metadata field.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def description():


setup(name='pyclowder',
version='2.3.1',
version='2.3.2',
packages=find_packages(),
description='Python SDK for the Clowder Data Management System',
long_description=description(),
Expand Down

0 comments on commit 325d1d6

Please sign in to comment.