Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
changes to enable new update-state cirrus function (#29)
Browse files Browse the repository at this point in the history
- statedb.set_completed() outputs arg now optional, not required
- statedb.set_outputs() added

Co-authored-by: trevorskaggs <[email protected]>
  • Loading branch information
jkeifer and trevorskaggs authored Oct 1, 2021
1 parent 96ff1d8 commit 784456d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 13 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.5.1] - 2021-10-01

### Changed
- The `outputs` parameter to `stateddb.set_completed()` is no longer required, but now optional
- Added `statedb.set_outputs() to set outputs indepentent of execution state

## [v0.5.0] - 2021-08-20

### Removed
Expand Down
49 changes: 42 additions & 7 deletions cirruslib/statedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def delete_item(self, catid: str):
def delete(self):
# delete table (used for testing)
self.table.delete()
self.table.wait_until_not_exists()
self.table.wait_until_not_exists()

def get_dbitem(self, catid: str) -> Dict:
"""Get a DynamoDB item
Expand Down Expand Up @@ -242,9 +242,9 @@ def set_processing(self, catid, execution):
}
)
logger.debug("Add execution", extra=key.update({'execution': execution}))
return response
return response

def set_completed(self, catid: str, outputs: List[str]) -> str:
def set_outputs(self, catid: str, outputs: List[str]) -> str:
"""Set this catalog as COMPLETED
Args:
Expand All @@ -260,20 +260,55 @@ def set_completed(self, catid: str, outputs: List[str]) -> str:
expr = (
'SET '
'created = if_not_exists(created, :created), '
'state_updated=:state_updated, updated=:updated, '
'updated=:updated, '
'outputs=:outputs'
)
response = self.table.update_item(
Key=key,
UpdateExpression=expr,
ExpressionAttributeValues={
':created': now,
':state_updated': f"COMPLETED_{now}",
':updated': now,
':outputs': outputs
}
)
logger.debug("set completed", extra=key.update({'outputs': outputs}))
logger.debug("set outputs", extra=key.update({'outputs': outputs}))
return response

def set_completed(self, catid: str, outputs: Optional[List[str]]=None) -> str:
"""Set this catalog as COMPLETED
Args:
catid (str): The Cirrus Catalog
outputs (Optional[[str]], optional): List of URLs to output Items. Defaults to None.
Returns:
str: DynamoDB response
"""
now = datetime.now(timezone.utc).isoformat()
key = self.catid_to_key(catid)

expr = (
'SET '
'created = if_not_exists(created, :created), '
'state_updated=:state_updated, updated=:updated'
)
expr_attrs = {
':created': now,
':state_updated': f"COMPLETED_{now}",
':updated': now,
}

if outputs is not None:
expr += ', outputs=:outputs'
expr_attrs[':outputs'] = outputs

response = self.table.update_item(
Key=key,
UpdateExpression=expr,
ExpressionAttributeValues=expr_attrs,
)
logger.debug("set outputs", extra=key.update({'outputs': outputs}))
return response

def set_failed(self, catid, msg):
Expand Down Expand Up @@ -351,7 +386,7 @@ def query(self, collections_workflow: str, state: str=None, since: str=None,
Dict: DynamoDB response
"""
index = None if sort_index == 'default' else sort_index


# always use the hash of the table which is same in all Global Secondary Indices
expr = Key('collections_workflow').eq(collections_workflow)
Expand Down
2 changes: 1 addition & 1 deletion cirruslib/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.5.0'
__version__ = '0.5.1'
22 changes: 17 additions & 5 deletions tests/test_statedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def setup_table():
schema = json.loads(f.read())
table = client.create_table(**schema)
table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
return StateDB(table_name)
return StateDB(table_name)


class TestClassMethods(unittest.TestCase):
Expand Down Expand Up @@ -170,20 +170,32 @@ def test_set_processing(self):
assert(dbitem['state_updated'].startswith('PROCESSING'))
assert(dbitem['executions'] == ['testarn'])

def test_set_completed(self):
def test_set_outputs(self):
resp = self.statedb.set_completed(test_item['id'], outputs=['output-item'])
assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200)
dbitem = self.statedb.get_dbitem(test_item['id'])
assert(dbitem['state_updated'].startswith('COMPLETED'))
assert(dbitem['outputs'][0] == 'output-item')

def test_set_completed(self):
resp = self.statedb.set_completed(test_item['id'])
assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200)
dbitem = self.statedb.get_dbitem(test_item['id'])
assert(dbitem['state_updated'].startswith('COMPLETED'))

def test_set_failed(self):
resp = self.statedb.set_failed(test_item['id'], msg='test failure')
assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200)
dbitem = self.statedb.get_dbitem(test_item['id'])
assert(dbitem['state_updated'].startswith('FAILED'))
assert(dbitem['last_error'] == 'test failure')

def test_set_completed_with_outputs(self):
resp = self.statedb.set_completed(test_item['id'], outputs=['output-item2'])
assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200)
dbitem = self.statedb.get_dbitem(test_item['id'])
assert(dbitem['state_updated'].startswith('COMPLETED'))
assert(dbitem['outputs'][0] == 'output-item2')

def test_set_invalid(self):
resp = self.statedb.set_invalid(test_item['id'], msg='test failure')
assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200)
Expand All @@ -202,11 +214,11 @@ def test_get_counts(self):
assert(count == 1)
count = self.statedb.get_counts(test_dbitem['collections_workflow'], since='1h')


def _test_get_counts_paging(self):
for i in range(5000):
self.statedb.set_processing(test_item['id'] + f"_{i}", execution='arn::test')
count = self.statedb.get_counts(test_dbitem['collections_workflow'])
assert(count == 1004)
for i in range(5000):
self.statedb.delete_item(test_item['id'] + f"_{i}")
self.statedb.delete_item(test_item['id'] + f"_{i}")

0 comments on commit 784456d

Please sign in to comment.