Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into no-rabbitmq-stop
Browse files Browse the repository at this point in the history
  • Loading branch information
robkooper committed Sep 30, 2020
2 parents fe56f53 + 288c366 commit cef0d75
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- When rabbitmq restarts the extractor would not stop and restart, resulting
in the extractor no longer receiving any messages. #17

### Added
- Can specify url to use for extractor downloads, this is helpful for instances
that have access to the internal URL for clowder, for example in docker/kubernetes.

### Removed
- Removed ability to run multiple connectors in the same python process. If
parallelism is needed, use multiple processes (or containers).
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ENV PYTHON_VERSION=${PYTHON_VERSION} \
RABBITMQ_URI="amqp://guest:guest@rabbitmq:5672/%2F" \
RABBITMQ_EXCHANGE="clowder" \
RABBITMQ_QUEUE="" \
CLOWDER_URL="" \
REGISTRATION_ENDPOINTS="" \
EMAIL_SERVER="" \
EMAIL_SENDER="extractor" \
Expand Down
40 changes: 22 additions & 18 deletions pyclowder/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Connector(object):
registered_clowder = list()

def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None):
mounted_paths=None, clowder_url=None):
self.extractor_name = extractor_name
self.extractor_info = extractor_info
self.check_message = check_message
Expand All @@ -76,6 +76,7 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
self.mounted_paths = {}
else:
self.mounted_paths = mounted_paths
self.clowder_url = clowder_url

filename = 'notifications.json'
self.smtp_server = None
Expand Down Expand Up @@ -373,19 +374,21 @@ def _process_message(self, body):
if body.get('notifies'):
emailaddrlist = body.get('notifies')
logger.debug(emailaddrlist)
host = body.get('host', '')
if host == '':
# source_host is original from the message, host is remapped to CLOWDER_URL if given
source_host = body.get('host', '')
host = self.clowder_url if self.clowder_url is not None else source_host
if host == '' or source_host == '':
return
elif not host.endswith('/'):
host += '/'
if not source_host.endswith('/'): source_host += '/'
if not host.endswith('/'): host += '/'
secret_key = body.get('secretKey', '')
retry_count = 0 if 'retry_count' not in body else body['retry_count']
resource = self._build_resource(body, host, secret_key)
if not resource:
return

# register extractor
url = "%sapi/extractors" % host
url = "%sapi/extractors" % source_host
if url not in Connector.registered_clowder:
Connector.registered_clowder.append(url)
self.register_extractor("%s?key=%s" % (url, secret_key))
Expand All @@ -398,7 +401,7 @@ def _process_message(self, body):
try:
check_result = pyclowder.utils.CheckMessage.download
if self.check_message:
check_result = self.check_message(self, host, secret_key, resource, body)
check_result = self.check_message(self, source_host, secret_key, resource, body)
if check_result != pyclowder.utils.CheckMessage.ignore:
if self.process_message:

Expand All @@ -418,10 +421,10 @@ def _process_message(self, body):
found_local = True
resource['local_paths'] = [file_path]

self.process_message(self, host, secret_key, resource, body)
self.process_message(self, source_host, secret_key, resource, body)

clowderurl = "%sfiles/%s" % (host, body.get('id', ''))
# notificatino of extraction job is done by email.
clowderurl = "%sfiles/%s" % (source_host, body.get('id', ''))
# notification of extraction job is done by email.
self.email(emailaddrlist, clowderurl)
finally:
if file_path is not None and not found_local:
Expand All @@ -438,8 +441,8 @@ def _process_message(self, body):
(file_paths, tmp_files, tmp_dirs) = self._prepare_dataset(host, secret_key, resource)
resource['local_paths'] = file_paths

self.process_message(self, host, secret_key, resource, body)
clowderurl = "%sdatasets/%s" % (host, body.get('datasetId', ''))
self.process_message(self, source_host, secret_key, resource, body)
clowderurl = "%sdatasets/%s" % (source_host, body.get('datasetId', ''))
# notificatino of extraction job is done by email.
self.email(emailaddrlist, clowderurl)
finally:
Expand Down Expand Up @@ -624,9 +627,9 @@ class RabbitMQConnector(Connector):
def __init__(self, extractor_name, extractor_info,
rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
heartbeat=5*60):
heartbeat=5*60, clowder_url=None):
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths)
ssl_verify, mounted_paths, clowder_url)
self.rabbitmq_uri = rabbitmq_uri
self.rabbitmq_exchange = rabbitmq_exchange
self.rabbitmq_key = rabbitmq_key
Expand All @@ -639,7 +642,7 @@ def __init__(self, extractor_name, extractor_info,
self.consumer_tag = None
self.worker = None
self.announcer = None
self.heartbeat = 5*60
self.heartbeat = heartbeat

def connect(self):
"""connect to rabbitmq using URL parameters"""
Expand Down Expand Up @@ -770,7 +773,7 @@ def on_message(self, channel, method, header, body):
job_id = json_body['jobid']

self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message,
self.process_message, self.ssl_verify, self.mounted_paths,
self.process_message, self.ssl_verify, self.mounted_paths, self.clowder_url,
method, header, body)
self.worker.start_thread(json_body)

Expand Down Expand Up @@ -848,9 +851,10 @@ class RabbitMQHandler(Connector):
"""

def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, method=None, header=None, body=None):
mounted_paths=None, clowder_url=None, method=None, header=None, body=None):

super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths)
ssl_verify, mounted_paths, clowder_url)
self.method = method
self.header = header
self.body = body
Expand Down
6 changes: 5 additions & 1 deletion pyclowder/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(self):
rabbitmq_queuename = self.extractor_info['name']
rabbitmq_uri = os.getenv('RABBITMQ_URI', "amqp://guest:[email protected]/%2f")
rabbitmq_exchange = os.getenv('RABBITMQ_EXCHANGE', "clowder")
clowder_url = os.getenv("CLOWDER_URL", "")
registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "")
logging_config = os.getenv("LOGGING")
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
Expand All @@ -83,6 +84,8 @@ def __init__(self):
self.parser.add_argument('--pickle', nargs='*', dest="hpc_picklefile",
default=None, action='append',
help='pickle file that needs to be processed (only needed for HPC)')
self.parser.add_argument('--clowderURL', nargs='?', dest='clowder_url', default=clowder_url,
help='Clowder host URL')
self.parser.add_argument('--register', '-r', nargs='?', dest="registration_endpoints",
default=registration_endpoints,
help='Clowder registration URL (default=%s)' % registration_endpoints)
Expand Down Expand Up @@ -157,7 +160,8 @@ def start(self):
rabbitmq_exchange=self.args.rabbitmq_exchange,
rabbitmq_key=rabbitmq_key,
rabbitmq_queue=self.args.rabbitmq_queuename,
mounted_paths=json.loads(self.args.mounted_paths))
mounted_paths=json.loads(self.args.mounted_paths),
clowder_url=self.args.clowder_url)
connector.connect()
connector.register_extractor(self.args.registration_endpoints)
threading.Thread(target=connector.listen, name="RabbitMQConnector").start()
Expand Down

0 comments on commit cef0d75

Please sign in to comment.