diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..fd4646b --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,46 @@ +name: Python package + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [2.7, 3.5, 3.6, 3.7, 3.8] + + steps: + - uses: actions/checkout@v2 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Cache pip + uses: actions/cache@v2 + with: + # This path is specific to Ubuntu + path: ~/.cache/pip + # Look to see if there is a cache hit for the corresponding requirements file + key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + ${{ runner.os }}- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest pipenv + pip install -r requirements.txt + + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=120 --statistics + +# - name: Test with pytest +# run: | +# pytest diff --git a/.github/workflows/pypi.yaml b/.github/workflows/pypi.yaml new file mode 100644 index 0000000..bb3faa2 --- /dev/null +++ b/.github/workflows/pypi.yaml @@ -0,0 +1,46 @@ +name: Publish Python package + +on: + push: + tags: + +jobs: + publish: + name: Build and publish python packages + runs-on: ubuntu-18.04 + steps: + - uses: actions/checkout@v2 + + - name: Set up Python 3.7 + uses: actions/setup-python@v2 + with: + python-version: 3.7 + + - name: Install dependencies + run: python -m pip install --upgrade pip setuptools wheel + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Cache pip + uses: actions/cache@v2 + with: + # This path is specific to Ubuntu + path: ~/.cache/pip + # Look to see if there is a cache hit for the corresponding requirements file + key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + ${{ runner.os }}- + + - name: Build dist file + run: | + python setup.py sdist bdist_wheel + + - name: Publish distribution to PyPI + if: startsWith(github.ref, 'refs/tags') + uses: pypa/gh-action-pypi-publish@master + with: + password: ${{ secrets.pypi_password }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 937ca7f..dfef20f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,25 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## 2.3.0 - 2020-09-15 + +Removed develop branch, all pull requests will need to be against master from now +forward. Please update version number in setup.py in each PR. + +From this version no more docker images are build, please use pip install to +install pyclowder. + +### Added +- Simple extractors now support datasets, can also create new datasets. +- Ability to add tags from simple extractor to files and datasets. +- Ability to add additional files (outputs) to dataset in simple extractor. +- Use pipenv to manage dependencies. +- Add job_id to each status message returned by pyclowder. +- PyClowderExtractionAbort to indicate the message shoudl not be retried. + +### Changed +- Better handling of status messages + ## 2.2.3 - 2019-10-14 ### Fixed diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..5441353 --- /dev/null +++ b/Pipfile @@ -0,0 +1,15 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] + +[packages] +pika = "*" +requests = "*" +requests-toolbelt = "*" +pyyaml = "==5.1" + +[requires] +python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..bac9acb --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,92 @@ +{ + "_meta": { + "hash": { + "sha256": "d33480b241f0335d12224043e531f6982e24a7e3593cfdbb64bb93dbed8325e1" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.7" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "certifi": { + "hashes": [ + "sha256:5930595817496dd21bb8dc35dad090f1c2cd0adfaf21204bf6732ca5d8ee34d3", + "sha256:8fc0819f1f30ba15bdb34cceffb9ef04d99f420f68eb75d901e9560b8749fc41" + ], + "version": "==2020.6.20" + }, + "chardet": { + "hashes": [ + "sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", + "sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691" + ], + "version": "==3.0.4" + }, + "idna": { + "hashes": [ + "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6", + "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==2.10" + }, + "pika": { + "hashes": [ + "sha256:4e1a1a6585a41b2341992ec32aadb7a919d649eb82904fd8e4a4e0871c8cf3af", + "sha256:9fa76ba4b65034b878b2b8de90ff8660a59d925b087c5bb88f8fdbb4b64a1dbf" + ], + "index": "pypi", + "version": "==1.1.0" + }, + "pyyaml": { + "hashes": [ + "sha256:1adecc22f88d38052fb787d959f003811ca858b799590a5eaa70e63dca50308c", + "sha256:436bc774ecf7c103814098159fbb84c2715d25980175292c648f2da143909f95", + "sha256:460a5a4248763f6f37ea225d19d5c205677d8d525f6a83357ca622ed541830c2", + "sha256:5a22a9c84653debfbf198d02fe592c176ea548cccce47553f35f466e15cf2fd4", + "sha256:7a5d3f26b89d688db27822343dfa25c599627bc92093e788956372285c6298ad", + "sha256:9372b04a02080752d9e6f990179a4ab840227c6e2ce15b95e1278456664cf2ba", + "sha256:a5dcbebee834eaddf3fa7366316b880ff4062e4bcc9787b78c7fbb4a26ff2dd1", + "sha256:aee5bab92a176e7cd034e57f46e9df9a9862a71f8f37cad167c6fc74c65f5b4e", + "sha256:c51f642898c0bacd335fc119da60baae0824f2cde95b0330b56c0553439f0673", + "sha256:c68ea4d3ba1705da1e0d85da6684ac657912679a649e8868bd850d2c299cce13", + "sha256:e23d0cc5299223dcc37885dae624f382297717e459ea24053709675a976a3e19" + ], + "index": "pypi", + "version": "==5.1" + }, + "requests": { + "hashes": [ + "sha256:b3559a131db72c33ee969480840fff4bb6dd111de7dd27c8ee1f820f4f00231b", + "sha256:fe75cc94a9443b9246fc7049224f75604b113c36acb93f87b80ed42c44cbb898" + ], + "index": "pypi", + "version": "==2.24.0" + }, + "requests-toolbelt": { + "hashes": [ + "sha256:380606e1d10dc85c3bd47bf5a6095f815ec007be7a8b69c878507068df059e6f", + "sha256:968089d4584ad4ad7c171454f0a5c6dac23971e9472521ea3b6d49d610aa6fc0" + ], + "index": "pypi", + "version": "==0.9.1" + }, + "urllib3": { + "hashes": [ + "sha256:91056c15fa70756691db97756772bb1eb9678fa585d9184f24534b100dc60f4a", + "sha256:e7983572181f5e1522d9c98453462384ee92a0be7fac5f1413a1e35c56cc0461" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", + "version": "==1.25.10" + } + }, + "develop": {} +} diff --git a/pyclowder/collections.py b/pyclowder/collections.py index c1c34e7..8795e57 100644 --- a/pyclowder/collections.py +++ b/pyclowder/collections.py @@ -8,7 +8,6 @@ import requests from pyclowder.client import ClowderClient -from pyclowder.utils import StatusMessage def create_empty(connector, host, key, collectionname, description, parentid=None, spaceid=None): @@ -121,8 +120,7 @@ def upload_preview(connector, host, key, collectionid, previewfile, previewmetad section this preview should be associated with. """ - connector.status_update(StatusMessage.processing, {"type": "collection", "id": collectionid}, - "Uploading collection preview.") + connector.message_process({"type": "collection", "id": collectionid}, "Uploading collection preview.") logger = logging.getLogger(__name__) headers = {'Content-Type': 'application/json'} diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index e0c3af7..d39e477 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -230,7 +230,6 @@ def _build_resource(self, body, host, secret_key): "type": "dataset", "id": datasetid } - self.status_update(pyclowder.utils.StatusMessage.error, resource, msg) self.message_error(resource) return None @@ -392,7 +391,7 @@ def _process_message(self, body): self.register_extractor("%s?key=%s" % (url, secret_key)) # tell everybody we are starting to process the file - self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing") + self.status_update(pyclowder.utils.StatusMessage.start.value, resource, "Started processing.") # checks whether to process the file in this message or not # pylint: disable=too-many-nested-blocks @@ -456,41 +455,41 @@ def _process_message(self, body): logger.exception("Error removing temporary dataset directory") else: - self.status_update(pyclowder.utils.StatusMessage.processing, resource, "Skipped in check_message") + self.status_update(pyclowder.utils.StatusMessage.skip.value, resource, "Skipped in check_message") self.message_ok(resource) except SystemExit as exc: - status = "sys.exit : " + str(exc) - logger.exception("[%s] %s", resource['id'], status) - self.status_update(pyclowder.utils.StatusMessage.error, resource, status) - self.message_resubmit(resource, retry_count) + message = str.format("sys.exit: {}", str(exc)) + logger.exception("[%s] %s", resource['id'], message) + self.message_resubmit(resource, retry_count, message) raise except KeyboardInterrupt: - status = "keyboard interrupt" - logger.exception("[%s] %s", resource['id'], status) - self.status_update(pyclowder.utils.StatusMessage.error, resource, status) - self.message_resubmit(resource, retry_count) + message = "keyboard interrupt" + logger.exception("[%s] %s", resource['id'], message) + self.message_resubmit(resource, retry_count, message) raise except GeneratorExit: - status = "generator exit" - logger.exception("[%s] %s", resource['id'], status) - self.status_update(pyclowder.utils.StatusMessage.error, resource, status) - self.message_resubmit(resource, retry_count) + message = "generator exit" + logger.exception("[%s] %s", resource['id'], message) + self.message_resubmit(resource, retry_count, message) raise except subprocess.CalledProcessError as exc: - status = str.format("Error processing [exit code={}]\n{}", exc.returncode, exc.output) - logger.exception("[%s] %s", resource['id'], status) - self.status_update(pyclowder.utils.StatusMessage.error, resource, status) - self.message_error(resource) + message = str.format("Error in subprocess [exit code={}]:\n{}", exc.returncode, exc.output) + logger.exception("[%s] %s", resource['id'], message) + self.message_error(resource, message) + except PyClowderExtractionAbort as exc: + message = str.format("Aborting message: {}", exc.message) + logger.exception("[%s] %s", resource['id'], message) + self.message_error(resource, message) except Exception as exc: # pylint: disable=broad-except - status = "Error processing : " + str(exc) - logger.exception("[%s] %s", resource['id'], status) - self.status_update(pyclowder.utils.StatusMessage.error, resource, status) + message = str(exc) + logger.exception("[%s] %s", resource['id'], message) if retry_count < 10: - self.message_resubmit(resource, retry_count + 1) + message = "(#%s) %s" % (retry_count+1, message) + self.message_resubmit(resource, retry_count+1, message) else: - self.message_error(resource) + self.message_error(resource, message) def register_extractor(self, endpoints): """Register extractor info with Clowder. @@ -528,21 +527,23 @@ def status_update(self, status, resource, message): the instance know the progress of the extractor. Keyword arguments: - status - START | PROCESSING | DONE | ERROR + status - pyclowder.utils.StatusMessage value resource - descriptor object with {"type", "id"} fields message - contents of the status update """ logging.getLogger(__name__).info("[%s] : %s: %s", resource["id"], status, message) - def message_ok(self, resource): - self.status_update(pyclowder.utils.StatusMessage.done, resource, "Done processing") + def message_ok(self, resource, message="Done processing."): + self.status_update(pyclowder.utils.StatusMessage.done.value, resource, message) + + def message_error(self, resource, message="Error processing message."): + self.status_update(pyclowder.utils.StatusMessage.error.value, resource, message) - def message_error(self, resource): - self.status_update(pyclowder.utils.StatusMessage.error, resource, "Error processing message") + def message_resubmit(self, resource, retry_count, message="Resubmitting message."): + self.status_update(pyclowder.utils.StatusMessage.retry.value, resource, message) - def message_resubmit(self, resource, retry_count): - self.status_update(pyclowder.utils.StatusMessage.processing, resource, "Resubmitting message (attempt #%s)" - % retry_count) + def message_process(self, resource, message): + self.status_update(pyclowder.utils.StatusMessage.processing.value, resource, message) def get(self, url, params=None, raise_status=True, **kwargs): """ @@ -760,7 +761,12 @@ def on_message(self, channel, method, header, body): if 'routing_key' not in json_body and method.routing_key: json_body['routing_key'] = method.routing_key - self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, self.check_message, + if 'jobid' not in json_body: + job_id = None + else: + job_id = json_body['jobid'] + + self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message, self.process_message, self.ssl_verify, self.mounted_paths, method, header, body) self.worker.start_thread(json_body) @@ -835,13 +841,14 @@ class RabbitMQHandler(Connector): a queue of messages that the super- loop can access and send later. """ - def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, + def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, method=None, header=None, body=None): super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message, ssl_verify, mounted_paths) self.method = method self.header = header self.body = body + self.job_id = job_id self.messages = [] self.thread = None self.finished = False @@ -871,19 +878,22 @@ def process_messages(self, channel, rabbitmq_queue): with self.lock: msg = self.messages.pop(0) + # PROCESSING - Standard update message during extractor processing if msg["type"] == 'status': if self.header.reply_to: properties = pika.BasicProperties(delivery_mode=2, correlation_id=self.header.correlation_id) channel.basic_publish(exchange='', routing_key=self.header.reply_to, properties=properties, - body=json.dumps(msg['status'])) + body=json.dumps(msg['payload'])) + # DONE - Extractor finished without error elif msg["type"] == 'ok': channel.basic_ack(self.method.delivery_tag) with self.lock: self.finished = True + # ERROR - Extractor encountered error and message goes to error queue elif msg["type"] == 'error': properties = pika.BasicProperties(delivery_mode=2, reply_to=self.header.reply_to) channel.basic_publish(exchange='', @@ -894,18 +904,18 @@ def process_messages(self, channel, rabbitmq_queue): with self.lock: self.finished = True + # RESUBMITTING - Extractor encountered error and message is resubmitted to same queue elif msg["type"] == 'resubmit': - retry_count = msg['retry_count'] - queue = rabbitmq_queue - properties = pika.BasicProperties(delivery_mode=2, reply_to=self.header.reply_to) jbody = json.loads(self.body) - jbody['retry_count'] = retry_count + jbody['retry_count'] = msg['retry_count'] if 'exchange' not in jbody and self.method.exchange: jbody['exchange'] = self.method.exchange - if 'routing_key' not in jbody and self.method.routing_key and self.method.routing_key != queue: + if 'routing_key' not in jbody and self.method.routing_key and self.method.routing_key != rabbitmq_queue: jbody['routing_key'] = self.method.routing_key + + properties = pika.BasicProperties(delivery_mode=2, reply_to=self.header.reply_to) channel.basic_publish(exchange='', - routing_key=queue, + routing_key=rabbitmq_queue, properties=properties, body=json.dumps(jbody)) channel.basic_ack(self.method.delivery_tag) @@ -917,30 +927,35 @@ def process_messages(self, channel, rabbitmq_queue): def status_update(self, status, resource, message): super(RabbitMQHandler, self).status_update(status, resource, message) - status_report = dict() - # TODO: Update this to check resource["type"] once Clowder better supports dataset events - status_report['file_id'] = resource["id"] - status_report['extractor_id'] = self.extractor_info['name'] - status_report['status'] = "%s: %s" % (status, message) - status_report['start'] = pyclowder.utils.iso8601time() + with self.lock: + # TODO: Remove 'status' from payload later and read from message_type and message in Clowder 2.0 self.messages.append({"type": "status", - "status": status_report, "resource": resource, - "message": message}) - - def message_ok(self, resource): - super(RabbitMQHandler, self).message_ok(resource) + "payload": { + "file_id": resource["id"], + "extractor_id": self.extractor_info['name'], + "job_id": self.job_id, + "status": "%s: %s" % (status, message), + "start": pyclowder.utils.iso8601time(), + "message_type": status, + "message": message + }}) + + def message_ok(self, resource, message="Done processing."): + super(RabbitMQHandler, self).message_ok(resource, message) with self.lock: self.messages.append({"type": "ok"}) - def message_error(self, resource): - super(RabbitMQHandler, self).message_error(resource) + def message_error(self, resource, message="Error processing message."): + super(RabbitMQHandler, self).message_error(resource, message) with self.lock: self.messages.append({"type": "error"}) - def message_resubmit(self, resource, retry_count): - super(RabbitMQHandler, self).message_resubmit(resource, retry_count) + def message_resubmit(self, resource, retry_count, message=None): + if message is None: + message = "(#%s)" % retry_count + super(RabbitMQHandler, self).message_resubmit(resource, retry_count, message) with self.lock: self.messages.append({"type": "resubmit", "retry_count": retry_count}) @@ -949,10 +964,11 @@ class HPCConnector(Connector): """Takes pickle files and processes them.""" # pylint: disable=too-many-arguments - def __init__(self, extractor_name, extractor_info, picklefile, + def __init__(self, extractor_name, extractor_info, picklefile, job_id=None, check_message=None, process_message=None, ssl_verify=True, mounted_paths=None): super(HPCConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, ssl_verify, mounted_paths) + self.job_id = job_id self.picklefile = picklefile self.logfile = None @@ -991,6 +1007,7 @@ def status_update(self, status, resource, message): statusreport = dict() statusreport['file_id'] = resource["id"] statusreport['extractor_id'] = self.extractor_info['name'] + statusreport['job_id'] = self.job_id statusreport['status'] = "%s: %s" % (status, message) statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') log.write(json.dumps(statusreport) + '\n') @@ -1096,3 +1113,14 @@ def put(self, url, data=None, raise_status=True, **kwargs): def delete(self, url, raise_status=True, **kwargs): logging.getLogger(__name__).debug("DELETE: " + url) return None + + +class PyClowderExtractionAbort(Exception): + """Raise exception that will not be subject to retry attempts (i.e. errors that are expected to fail again). + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message): + self.message = message diff --git a/pyclowder/datasets.py b/pyclowder/datasets.py index cb04f93..2566d91 100644 --- a/pyclowder/datasets.py +++ b/pyclowder/datasets.py @@ -113,7 +113,7 @@ def download(connector, host, key, datasetid): datasetid -- the file that is currently being processed """ - connector.status_update(StatusMessage.processing, {"type": "dataset", "id": datasetid}, "Downloading dataset.") + connector.message_process({"type": "dataset", "id": datasetid}, "Downloading dataset.") # fetch dataset zipfile url = '%sapi/datasets/%s/download?key=%s' % (host, datasetid, key) @@ -257,6 +257,25 @@ def submit_extractions_by_collection(connector, host, key, collectionid, extract submit_extractions_by_collection(connector, host, key, coll['id'], extractorname, recursive) +def upload_tags(connector, host, key, datasetid, tags): + """Upload dataset tag to Clowder. + + Keyword arguments: + connector -- connector information, used to get missing parameters and send status updates + host -- the clowder host, including http and port, should end with a / + key -- the secret key to login to clowder + datasetid -- the dataset that is currently being processed + tags -- the tags to be uploaded + """ + + connector.status_update(StatusMessage.processing, {"type": "dataset", "id": datasetid}, "Uploading dataset tags.") + + headers = {'Content-Type': 'application/json'} + url = '%sapi/datasets/%s/tags?key=%s' % (host, datasetid, key) + result = connector.post(url, headers=headers, data=json.dumps(tags), + verify=connector.ssl_verify if connector else True) + + def upload_metadata(connector, host, key, datasetid, metadata): """Upload dataset JSON-LD metadata to Clowder. @@ -268,8 +287,7 @@ def upload_metadata(connector, host, key, datasetid, metadata): metadata -- the metadata to be uploaded """ - connector.status_update(StatusMessage.processing, {"type": "dataset", "id": datasetid}, - "Uploading dataset metadata.") + connector.message_process({"type": "dataset", "id": datasetid}, "Uploading dataset metadata.") headers = {'Content-Type': 'application/json'} url = '%sapi/datasets/%s/metadata.jsonld?key=%s' % (host, datasetid, key) diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index 7d9bf8e..7a31bb1 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -22,6 +22,7 @@ from pyclowder.connectors import RabbitMQConnector, HPCConnector, LocalConnector from pyclowder.utils import CheckMessage, setup_logging import pyclowder.files +import pyclowder.datasets class Extractor(object): @@ -308,36 +309,110 @@ def __init__(self): self.logger = logging.getLogger('__main__') self.logger.setLevel(logging.INFO) + # TODO: Support check_message() in simple extractors + def process_message(self, connector, host, secret_key, resource, parameters): """ - Process a clowder message. This will download the file to local disk and call the - process_file to do the actual processing of the file. The resulting dict is then + Process a clowder message. This will download the file(s) to local disk and call + process_file or process_dataset to do the actual processing. The resulting dict is then parsed and based on the keys in the dict it will upload the results to the right location in clowder. """ - input_file = resource["local_paths"][0] - file_id = resource['id'] + if 'files' in resource: + type = 'dataset' + input_files = resource['local_paths'] + dataset_id = resource['id'] + + elif 'local_paths' in resource: + type = 'file' + input_file = resource['local_paths'][0] + file_id = resource['id'] + dataset_id = resource['parent']['id'] + else: + # TODO: Eventually support other messages such as metadata.added + type = 'unknown' - # call the actual function that processes the file - if file_id and input_file: + # call the actual function that processes the message + if type == 'file' and file_id and input_file: result = self.process_file(input_file) + elif type == 'dataset' and dataset_id and input_files: + result = self.process_dataset(input_files) else: result = dict() - # return information to clowder try: + # upload metadata to the processed file or dataset if 'metadata' in result.keys(): - metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host) - self.logger.info("upload metadata") - self.logger.debug(metadata) - pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata) + self.logger.debug("upload metadata") + if type == 'file': + metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host) + self.logger.debug(metadata) + pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata) + elif type == 'dataset': + metadata = self.get_metadata(result.get('metadata'), 'dataset', dataset_id, host) + self.logger.debug(metadata) + pyclowder.datasets.upload_metadata(connector, host, secret_key, dataset_id, metadata) + else: + self.logger.error("unable to attach metadata to resource type: %s" % type) + + # upload previews to the processed file if 'previews' in result.keys(): - self.logger.info("upload previews") - for preview in result['previews']: - if os.path.exists(str(preview)): - preview = {'file': preview} - self.logger.info("upload preview") - pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview)) + if type == 'file': + for preview in result['previews']: + if os.path.exists(str(preview)): + preview = {'file': preview} + self.logger.debug("upload preview") + pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview)) + else: + # TODO: Add Clowder endpoint (& pyclowder method) to attach previews to datasets + self.logger.error("previews not currently supported for resource type: %s" % type) + + if 'tags' in result.keys(): + self.logger.debug("upload tags") + tags = {"tags": result["tags"]} + if type == 'file': + pyclowder.files.upload_tags(connector, host, secret_key, file_id, tags) + else: + pyclowder.datasets.upload_tags(connector, host, secret_key, dataset_id, tags) + + # upload output files to the processed file's parent dataset or processed dataset + if 'outputs' in result.keys(): + self.logger.debug("upload output files") + if type == 'file' or type == 'dataset': + for output in result['outputs']: + if os.path.exists(str(output)): + pyclowder.files.upload_to_dataset(connector, host, secret_key, dataset_id, str(output)) + else: + self.logger.error("unable to upload outputs to resource type: %s" % type) + + if 'new_dataset' in result.keys(): + if type == 'dataset': + nds = result['new_dataset'] + if 'name' not in nds.keys(): + self.logger.error("new datasets require a name") + else: + description = nds['description'] if 'description' in nds.keys() else "" + new_dataset_id = pyclowder.datasets.create_empty(connector, host, secret_key, nds['name'], + description) + self.logger.debug("created new dataset: %s" % new_dataset_id) + + if 'metadata' in nds.keys(): + self.logger.debug("upload metadata to new dataset") + metadata = self.get_metadata(nds.get('metadata'), 'dataset', new_dataset_id, host) + self.logger.debug(metadata) + pyclowder.datasets.upload_metadata(connector, host, secret_key, new_dataset_id, metadata) + + if 'outputs' in nds.keys(): + self.logger.debug("upload output files to new dataset") + for output in nds['outputs']: + if os.path.exists(str(output)): + pyclowder.files.upload_to_dataset(connector, host, secret_key, new_dataset_id, + str(output)) + + if 'previews' in nds.keys(): + # TODO: Add Clowder endpoint (& pyclowder method) to attach previews to datasets + self.logger.error("previews not currently supported for resource type: %s" % type) + finally: self.cleanup_data(result) @@ -345,13 +420,33 @@ def process_file(self, input_file): """ This function will process the file and return a dict that contains the result. This dict can have the following keys: - - metadata: the metadata to be associated with the file - - previews: files on disk with the preview to be uploaded + - metadata: the metadata to be associated with the processed file + - previews: images on disk with the preview to be uploaded to the processed file + - outputs: files on disk to be added to processed file's parent :param input_file: the file to be processed. :return: the specially formatted dict. """ return dict() + def process_dataset(self, input_files): + """ + This function will process the file list and return a dict that contains the result. This + dict can have the following keys: + - metadata: the metadata to be associated with the processed dataset + - outputs: files on disk to be added to the dataset + - previews: images to be associated with the dataset + - new_dataset: a dict describing a new dataset to be created for the outputs, with the following keys: + - name: the name of the new dataset to be created (including adding the outputs, + metadata and previews contained in new_dataset) + - description: description for the new dataset to be created + - previews: (see above) + - metadata: (see above) + - outputs: (see above) + :param input_files: the files to be processed. + :return: the specially formatted dict. + """ + return dict() + def cleanup_data(self, result): """ Once the information is uploaded to clowder this function is called for cleanup. This diff --git a/pyclowder/files.py b/pyclowder/files.py index 15ef634..1b5a9a5 100644 --- a/pyclowder/files.py +++ b/pyclowder/files.py @@ -14,7 +14,6 @@ from pyclowder.datasets import get_file_list from pyclowder.collections import get_datasets, get_child_collections -from pyclowder.utils import StatusMessage # Some sources of urllib3 support warning suppression, but not all try: @@ -38,7 +37,7 @@ def download(connector, host, key, fileid, intermediatefileid=None, ext=""): ext -- the file extension, the downloaded file will end with this extension """ - connector.status_update(StatusMessage.processing, {"type": "file", "id": fileid}, "Downloading file.") + connector.message_process({"type": "file", "id": fileid}, "Downloading file.") # TODO: intermediateid doesn't really seem to be used here, can we remove entirely? if not intermediatefileid: @@ -180,7 +179,7 @@ def upload_metadata(connector, host, key, fileid, metadata): metadata -- the metadata to be uploaded """ - connector.status_update(StatusMessage.processing, {"type": "file", "id": fileid}, "Uploading file metadata.") + connector.message_process({"type": "file", "id": fileid}, "Uploading file metadata.") headers = {'Content-Type': 'application/json'} url = '%sapi/files/%s/metadata.jsonld?key=%s' % (host, fileid, key) @@ -204,7 +203,7 @@ def upload_preview(connector, host, key, fileid, previewfile, previewmetadata=No file itself and this parameter can be ignored. E.g. 'application/vnd.clowder+custom+xml' """ - connector.status_update(StatusMessage.processing, {"type": "file", "id": fileid}, "Uploading file preview.") + connector.message_process({"type": "file", "id": fileid}, "Uploading file preview.") logger = logging.getLogger(__name__) headers = {'Content-Type': 'application/json'} @@ -248,7 +247,7 @@ def upload_tags(connector, host, key, fileid, tags): tags -- the tags to be uploaded """ - connector.status_update(StatusMessage.processing, {"type": "file", "id": fileid}, "Uploading file tags.") + connector.message_process({"type": "file", "id": fileid}, "Uploading file tags.") headers = {'Content-Type': 'application/json'} url = '%sapi/files/%s/tags?key=%s' % (host, fileid, key) diff --git a/pyclowder/sections.py b/pyclowder/sections.py index ef6800a..fab0bd4 100644 --- a/pyclowder/sections.py +++ b/pyclowder/sections.py @@ -8,8 +8,6 @@ import requests -from pyclowder.utils import StatusMessage - def upload(connector, host, key, sectiondata): """Upload section to Clowder. @@ -47,7 +45,7 @@ def upload_tags(connector, host, key, sectionid, tags): tags -- the tags to be uploaded """ - connector.status_update(StatusMessage.processing, {"type": "section", "id": sectionid}, "Uploading section tags.") + connector.message_process({"type": "section", "id": sectionid}, "Uploading section tags.") headers = {'Content-Type': 'application/json'} url = '%sapi/sections/%s/tags?key=%s' % (host, sectionid, key) @@ -67,8 +65,8 @@ def upload_description(connector, host, key, sectionid, description): description -- the description to be uploaded """ - connector.status_update(StatusMessage.processing, {"type": "section", "id": sectionid}, - "Uploading section description.") + connector.message_process({"type": "section", "id": sectionid}, + "Uploading section description.") headers = {'Content-Type': 'application/json'} url = '%sapi/sections/%s/description?key=%s' % (host, sectionid, key) diff --git a/pyclowder/utils.py b/pyclowder/utils.py index f71ecb8..8839f84 100644 --- a/pyclowder/utils.py +++ b/pyclowder/utils.py @@ -44,10 +44,12 @@ class StatusMessage(Enum): full string will be STATUS: MESSAGE. """ - start = "START" + start = "STARTED" processing = "PROCESSING" - done = "DONE" + done = "SUCCEEDED" + skip = "SKIPPED" error = "ERROR" + retry = "RESUBMITTED" def iso8601time(): diff --git a/requirements.txt b/requirements.txt index b03e32c..27a4ff8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,4 @@ -enum34==1.1.6 -pika==1.0.0 +pika==1.1.0 PyYAML==5.1 -requests==2.21.0 -wheel==0.33.1 -urllib3==1.24.1 -pytest==4.3.1 -pytest-pep8==1.0.6 -requests-toolbelt==0.9.1 \ No newline at end of file +requests==2.24.0 +requests-toolbelt==0.9.1 diff --git a/sample-extractors/simple-extractor/README.md b/sample-extractors/simple-extractor/README.md index 4d9ee42..5cfc304 100644 --- a/sample-extractors/simple-extractor/README.md +++ b/sample-extractors/simple-extractor/README.md @@ -18,12 +18,13 @@ you have to write your extractor the normal way using [PyClowder](https://openso To write an extractor using the Simple Extractor, you need to have your Python program available. The main function of this Python program is supposed to take an input file path as its parameter. It needs to return a Python dictionary that -can contain either metadata information ("metadata"), details about file previews ("previews") or both. For example: +can contain either metadata information ("metadata"), details about file previews ("previews"), tags for the file ("tags) or all. For example: ``` json { "metadata": dict(), - "previews": array() + "previews": array(), + "tags": array() } ``` diff --git a/sample-extractors/simple-extractor/simple_extractor.py b/sample-extractors/simple-extractor/simple_extractor.py index c923361..5146906 100644 --- a/sample-extractors/simple-extractor/simple_extractor.py +++ b/sample-extractors/simple-extractor/simple_extractor.py @@ -10,3 +10,6 @@ def __init__(self, extraction): def process_file(self, input_file): return self.extraction(input_file) + + def process_dataset(self, input_files): + return self.extraction(input_files) diff --git a/sample-extractors/wordcount/wordcount.py b/sample-extractors/wordcount/wordcount.py index 1f5055f..eb4693d 100755 --- a/sample-extractors/wordcount/wordcount.py +++ b/sample-extractors/wordcount/wordcount.py @@ -32,24 +32,30 @@ def process_message(self, connector, host, secret_key, resource, parameters): inputfile = resource["local_paths"][0] file_id = resource['id'] - # call actual program + # These process messages will appear in the Clowder UI under Extractions. + connector.message_process(resource, "Loading contents of file...") + + # Call actual program result = subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT) result = result.decode('utf-8') (lines, words, characters, _) = result.split() - # store results as metadata + connector.message_process(resource, "Found %s lines and %s words..." % (lines, words)) + + # Store results as metadata result = { 'lines': lines, 'words': words, 'characters': characters } metadata = self.get_metadata(result, 'file', file_id, host) + + # Normal logs will appear in the extractor log, but NOT in the Clowder UI. logger.debug(metadata) - # upload metadata + # Upload metadata to original file pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata) - if __name__ == "__main__": extractor = WordCount() extractor.start() diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 1531e1a..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[tool:pytest] -pep8maxlinelength = 120 diff --git a/setup.py b/setup.py index aadf4cb..91c17b3 100644 --- a/setup.py +++ b/setup.py @@ -9,16 +9,16 @@ def description(): setup(name='pyclowder', - version='2.2.3', + version='2.3.0', packages=find_packages(), description='Python SDK for the Clowder Data Management System', long_description=description(), author='Rob Kooper', author_email='kooper@illinois.edu', - url='https://clowder.ncsa.illinois.edu', + url='https://clowderframework.org', project_urls={ - 'Source': 'https://opensource.ncsa.illinois.edu/bitbucket/scm/cats/pyclowder.git', + 'Source': 'https://github.com/clowder-framework/pyclowder', }, license='BSD', @@ -32,10 +32,9 @@ def description(): keywords=['clowder', 'data management system'], install_requires=[ - 'enum34==1.1.6', - 'pika==1.0.0', + 'pika==1.1.0', 'PyYAML==5.1', - 'requests==2.21.0', + 'requests==2.24.0', 'requests-toolbelt==0.9.1', ],