From 3d76de0745bfe3fe197b11667696e884685190c7 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 29 Jul 2020 09:40:43 -0500 Subject: [PATCH 1/3] Add Clowder URL hooks --- Dockerfile | 1 + pyclowder/connectors.py | 11 ++++++----- pyclowder/extractors.py | 3 +++ 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 4456c78..4d40a69 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,6 +8,7 @@ ENV PYTHON_VERSION=${PYTHON_VERSION} \ RABBITMQ_URI="amqp://guest:guest@rabbitmq:5672/%2F" \ RABBITMQ_EXCHANGE="clowder" \ RABBITMQ_QUEUE="" \ + CLOWDER_URL="" \ REGISTRATION_ENDPOINTS="" \ EMAIL_SERVER="" \ EMAIL_SENDER="extractor" \ diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index e0c3af7..f345a4a 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -66,7 +66,7 @@ class Connector(object): registered_clowder = list() def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None): + mounted_paths=None, clowder_url=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -76,6 +76,7 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m self.mounted_paths = {} else: self.mounted_paths = mounted_paths + self.clowder_url = clowder_url filename = 'notifications.json' self.smtp_server = None @@ -374,7 +375,7 @@ def _process_message(self, body): if body.get('notifies'): emailaddrlist = body.get('notifies') logger.debug(emailaddrlist) - host = body.get('host', '') + host = self.clowder_url if self.clowder_url is not None else body.get('host', '') if host == '': return elif not host.endswith('/'): @@ -623,9 +624,9 @@ class RabbitMQConnector(Connector): def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None, check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, - heartbeat=5*60): + heartbeat=5*60, clowder_url=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths) + ssl_verify, mounted_paths, clowder_url) self.rabbitmq_uri = rabbitmq_uri self.rabbitmq_exchange = rabbitmq_exchange self.rabbitmq_key = rabbitmq_key @@ -638,7 +639,7 @@ def __init__(self, extractor_name, extractor_info, self.consumer_tag = None self.worker = None self.announcer = None - self.heartbeat = 5*60 + self.heartbeat = heartbeat def connect(self): """connect to rabbitmq using URL parameters""" diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index a062947..e56c17f 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -63,6 +63,7 @@ def __init__(self): rabbitmq_queuename = self.extractor_info['name'] rabbitmq_uri = os.getenv('RABBITMQ_URI', "amqp://guest:guest@127.0.0.1/%2f") rabbitmq_exchange = os.getenv('RABBITMQ_EXCHANGE', "clowder") + clowder_url = os.getenv("CLOWDER_URL", "") registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") @@ -84,6 +85,8 @@ def __init__(self): self.parser.add_argument('--pickle', nargs='*', dest="hpc_picklefile", default=None, action='append', help='pickle file that needs to be processed (only needed for HPC)') + self.parser.add_argument('--clowderURL', nargs='?', dest='clowder_url', default=clowder_url, + help='Clowder host URL') self.parser.add_argument('--register', '-r', nargs='?', dest="registration_endpoints", default=registration_endpoints, help='Clowder registration URL (default=%s)' % registration_endpoints) From 91ad98aea111edb43ae69db26410d3d3b7105c8a Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 19 Aug 2020 09:25:12 -0500 Subject: [PATCH 2/3] Pass clowder_url into Handler as well --- pyclowder/connectors.py | 6 +++--- pyclowder/extractors.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index f345a4a..4b8f6ef 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -762,7 +762,7 @@ def on_message(self, channel, method, header, body): json_body['routing_key'] = method.routing_key self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, self.check_message, - self.process_message, self.ssl_verify, self.mounted_paths, + self.process_message, self.ssl_verify, self.mounted_paths, self.clowder_url, method, header, body) self.worker.start_thread(json_body) @@ -837,9 +837,9 @@ class RabbitMQHandler(Connector): """ def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, method=None, header=None, body=None): + mounted_paths=None, clowder_url=None, method=None, header=None, body=None): super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths) + ssl_verify, mounted_paths, clowder_url) self.method = method self.header = header self.body = body diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index e56c17f..2a9c6ab 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -161,7 +161,8 @@ def start(self): rabbitmq_exchange=self.args.rabbitmq_exchange, rabbitmq_key=rabbitmq_key, rabbitmq_queue=self.args.rabbitmq_queuename, - mounted_paths=json.loads(self.args.mounted_paths)) + mounted_paths=json.loads(self.args.mounted_paths), + clowder_url=self.args.clowder_url) rconn.connect() rconn.register_extractor(self.args.registration_endpoints) connectors.append(rconn) From 872625372d47a2c2f72a934a9d1dd491d8d202da Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Mon, 24 Aug 2020 12:46:42 -0500 Subject: [PATCH 3/3] Send original host to extractors --- pyclowder/connectors.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 1374346..31c1349 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -375,11 +375,13 @@ def _process_message(self, body): if body.get('notifies'): emailaddrlist = body.get('notifies') logger.debug(emailaddrlist) - host = self.clowder_url if self.clowder_url is not None else body.get('host', '') - if host == '': + # source_host is original from the message, host is remapped to CLOWDER_URL if given + source_host = body.get('host', '') + host = self.clowder_url if self.clowder_url is not None else source_host + if host == '' or source_host == '': return - elif not host.endswith('/'): - host += '/' + if not source_host.endswith('/'): source_host += '/' + if not host.endswith('/'): host += '/' secret_key = body.get('secretKey', '') retry_count = 0 if 'retry_count' not in body else body['retry_count'] resource = self._build_resource(body, host, secret_key) @@ -387,7 +389,7 @@ def _process_message(self, body): return # register extractor - url = "%sapi/extractors" % host + url = "%sapi/extractors" % source_host if url not in Connector.registered_clowder: Connector.registered_clowder.append(url) self.register_extractor("%s?key=%s" % (url, secret_key)) @@ -400,7 +402,7 @@ def _process_message(self, body): try: check_result = pyclowder.utils.CheckMessage.download if self.check_message: - check_result = self.check_message(self, host, secret_key, resource, body) + check_result = self.check_message(self, source_host, secret_key, resource, body) if check_result != pyclowder.utils.CheckMessage.ignore: if self.process_message: @@ -420,10 +422,10 @@ def _process_message(self, body): found_local = True resource['local_paths'] = [file_path] - self.process_message(self, host, secret_key, resource, body) + self.process_message(self, source_host, secret_key, resource, body) - clowderurl = "%sfiles/%s" % (host, body.get('id', '')) - # notificatino of extraction job is done by email. + clowderurl = "%sfiles/%s" % (source_host, body.get('id', '')) + # notification of extraction job is done by email. self.email(emailaddrlist, clowderurl) finally: if file_path is not None and not found_local: @@ -440,8 +442,8 @@ def _process_message(self, body): (file_paths, tmp_files, tmp_dirs) = self._prepare_dataset(host, secret_key, resource) resource['local_paths'] = file_paths - self.process_message(self, host, secret_key, resource, body) - clowderurl = "%sdatasets/%s" % (host, body.get('datasetId', '')) + self.process_message(self, source_host, secret_key, resource, body) + clowderurl = "%sdatasets/%s" % (source_host, body.get('datasetId', '')) # notificatino of extraction job is done by email. self.email(emailaddrlist, clowderurl) finally: