From 784456d92ac20b43ba9e06e4ab4c5b047c23bc2c Mon Sep 17 00:00:00 2001 From: Jarrett Keifer Date: Fri, 1 Oct 2021 13:30:28 -0700 Subject: [PATCH] changes to enable new update-state cirrus function (#29) - statedb.set_completed() outputs arg now optional, not required - statedb.set_outputs() added Co-authored-by: trevorskaggs --- CHANGELOG.md | 6 ++++++ cirruslib/statedb.py | 49 ++++++++++++++++++++++++++++++++++++------- cirruslib/version.py | 2 +- tests/test_statedb.py | 22 ++++++++++++++----- 4 files changed, 66 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9628b5e..6badee9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.5.1] - 2021-10-01 + +### Changed +- The `outputs` parameter to `stateddb.set_completed()` is no longer required, but now optional +- Added `statedb.set_outputs() to set outputs indepentent of execution state + ## [v0.5.0] - 2021-08-20 ### Removed diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index f841edd..e6d6f3b 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -39,7 +39,7 @@ def delete_item(self, catid: str): def delete(self): # delete table (used for testing) self.table.delete() - self.table.wait_until_not_exists() + self.table.wait_until_not_exists() def get_dbitem(self, catid: str) -> Dict: """Get a DynamoDB item @@ -242,9 +242,9 @@ def set_processing(self, catid, execution): } ) logger.debug("Add execution", extra=key.update({'execution': execution})) - return response + return response - def set_completed(self, catid: str, outputs: List[str]) -> str: + def set_outputs(self, catid: str, outputs: List[str]) -> str: """Set this catalog as COMPLETED Args: @@ -260,7 +260,7 @@ def set_completed(self, catid: str, outputs: List[str]) -> str: expr = ( 'SET ' 'created = if_not_exists(created, :created), ' - 'state_updated=:state_updated, updated=:updated, ' + 'updated=:updated, ' 'outputs=:outputs' ) response = self.table.update_item( @@ -268,12 +268,47 @@ def set_completed(self, catid: str, outputs: List[str]) -> str: UpdateExpression=expr, ExpressionAttributeValues={ ':created': now, - ':state_updated': f"COMPLETED_{now}", ':updated': now, ':outputs': outputs } ) - logger.debug("set completed", extra=key.update({'outputs': outputs})) + logger.debug("set outputs", extra=key.update({'outputs': outputs})) + return response + + def set_completed(self, catid: str, outputs: Optional[List[str]]=None) -> str: + """Set this catalog as COMPLETED + + Args: + catid (str): The Cirrus Catalog + outputs (Optional[[str]], optional): List of URLs to output Items. Defaults to None. + + Returns: + str: DynamoDB response + """ + now = datetime.now(timezone.utc).isoformat() + key = self.catid_to_key(catid) + + expr = ( + 'SET ' + 'created = if_not_exists(created, :created), ' + 'state_updated=:state_updated, updated=:updated' + ) + expr_attrs = { + ':created': now, + ':state_updated': f"COMPLETED_{now}", + ':updated': now, + } + + if outputs is not None: + expr += ', outputs=:outputs' + expr_attrs[':outputs'] = outputs + + response = self.table.update_item( + Key=key, + UpdateExpression=expr, + ExpressionAttributeValues=expr_attrs, + ) + logger.debug("set outputs", extra=key.update({'outputs': outputs})) return response def set_failed(self, catid, msg): @@ -351,7 +386,7 @@ def query(self, collections_workflow: str, state: str=None, since: str=None, Dict: DynamoDB response """ index = None if sort_index == 'default' else sort_index - + # always use the hash of the table which is same in all Global Secondary Indices expr = Key('collections_workflow').eq(collections_workflow) diff --git a/cirruslib/version.py b/cirruslib/version.py index 2b8877c..93b60a1 100644 --- a/cirruslib/version.py +++ b/cirruslib/version.py @@ -1 +1 @@ -__version__ = '0.5.0' +__version__ = '0.5.1' diff --git a/tests/test_statedb.py b/tests/test_statedb.py index da9d60d..56e231a 100644 --- a/tests/test_statedb.py +++ b/tests/test_statedb.py @@ -41,7 +41,7 @@ def setup_table(): schema = json.loads(f.read()) table = client.create_table(**schema) table.meta.client.get_waiter('table_exists').wait(TableName=table_name) - return StateDB(table_name) + return StateDB(table_name) class TestClassMethods(unittest.TestCase): @@ -170,13 +170,18 @@ def test_set_processing(self): assert(dbitem['state_updated'].startswith('PROCESSING')) assert(dbitem['executions'] == ['testarn']) - def test_set_completed(self): + def test_set_outputs(self): resp = self.statedb.set_completed(test_item['id'], outputs=['output-item']) assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem['state_updated'].startswith('COMPLETED')) assert(dbitem['outputs'][0] == 'output-item') + def test_set_completed(self): + resp = self.statedb.set_completed(test_item['id']) + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = self.statedb.get_dbitem(test_item['id']) + assert(dbitem['state_updated'].startswith('COMPLETED')) + def test_set_failed(self): resp = self.statedb.set_failed(test_item['id'], msg='test failure') assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) @@ -184,6 +189,13 @@ def test_set_failed(self): assert(dbitem['state_updated'].startswith('FAILED')) assert(dbitem['last_error'] == 'test failure') + def test_set_completed_with_outputs(self): + resp = self.statedb.set_completed(test_item['id'], outputs=['output-item2']) + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = self.statedb.get_dbitem(test_item['id']) + assert(dbitem['state_updated'].startswith('COMPLETED')) + assert(dbitem['outputs'][0] == 'output-item2') + def test_set_invalid(self): resp = self.statedb.set_invalid(test_item['id'], msg='test failure') assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) @@ -202,11 +214,11 @@ def test_get_counts(self): assert(count == 1) count = self.statedb.get_counts(test_dbitem['collections_workflow'], since='1h') - + def _test_get_counts_paging(self): for i in range(5000): self.statedb.set_processing(test_item['id'] + f"_{i}", execution='arn::test') count = self.statedb.get_counts(test_dbitem['collections_workflow']) assert(count == 1004) for i in range(5000): - self.statedb.delete_item(test_item['id'] + f"_{i}") \ No newline at end of file + self.statedb.delete_item(test_item['id'] + f"_{i}")