Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
Merge pull request #8 from cirrus-geo/develop
Browse files Browse the repository at this point in the history
publish 0.3.2
  • Loading branch information
matthewhanson authored Oct 26, 2020
2 parents 408261f + 31f9b75 commit eff2bff
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 113 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.3.2] - 2020-10-25

### Added
- `cirruslib.logging` module configuring structured (JSON) logging and get_task_logger for logging from tasks
- `cirruslib.stac` module for working with the Cirrus static STAC catalog on s3, uses PySTAC
- `utils.dict_merged` function for doing recursive merges

### Changed
- Parsing payload for a task should now use `Catalog.from_payload` instead of `Catalogs.from_payload`, which returns a `Catalog` instead of an array of `Catalog` objects that always had a length of one
- Claned up logging across all modules

### Removed
- `Catalogs.from_payload`, replaced by `Catalog.from_payload`
- `QUEUED` as potential processing state

## [v0.3.1] - 2020-09-27

### Changed
Expand Down
1 change: 1 addition & 0 deletions cirruslib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .catalog import Catalog, Catalogs
from .logging import get_task_logger
from .statedb import StateDB
from .version import __version__
107 changes: 51 additions & 56 deletions cirruslib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@
import os
import re
import uuid
from typing import Dict, Optional, List

from boto3utils import s3
from cirruslib.statedb import StateDB
from cirruslib.logging import get_task_logger
from cirruslib.transfer import get_s3_session
from cirruslib.utils import get_path
from traceback import format_exc
from typing import Dict, Optional, List

# envvars
LOG_LEVEL = os.getenv('CIRRUS_LOG_LEVEL', 'INFO')
DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None)
CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET', None)
PUBLISH_TOPIC_ARN = os.getenv('CIRRUS_PUBLISH_TOPIC_ARN', None)

logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)

# clients
statedb = StateDB()
snsclient = boto3.client('sns')
stepfunctions = boto3.client('stepfunctions')

# logging
logger = logging.getLogger(__name__)


class Catalog(dict):

Expand All @@ -39,9 +38,15 @@ def __init__(self, *args, update=False, state_item=None, **kwargs):
"""
super(Catalog, self).__init__(*args, **kwargs)

# convert old functions field to tasks
if 'functions' in self['process']:
self['process']['tasks'] = self['process'].pop('functions')

if update:
self.update()

self.logger = get_task_logger(__name__, catalog=self)

# validate process block
assert(self['type'] == 'FeatureCollection')
assert('process' in self)
Expand All @@ -64,12 +69,37 @@ def __init__(self, *args, update=False, state_item=None, **kwargs):

self.state_item = state_item

def update(self):
@classmethod
def from_payload(cls, payload: Dict, **kwargs) -> Catalog:
"""Parse a Cirrus payload and return a Catalog instance
# convert old functions field to tasks
if 'functions' in self['process']:
self['process']['tasks'] = self['process'].pop('functions')
Args:
payload (Dict): A payload from SNS, SQS, or containing an s3 URL to payload
Returns:
Catalog: A Catalog instance
"""
if 'Records' in payload:
records = [json.loads(r['body']) for r in payload['Records']]
# there should be only one
assert(len(records) == 1)
if 'Message' in records[0]:
# SNS
cat = json.loads(records[0]['Message'])
else:
# SQS
cat = records[0]
elif 'url' in payload:
cat = s3().read_json(payload['url'])
elif 'Parameters' in payload and 'url' in payload['Parameters']:
# this is Batch, get the output payload
url = payload['Parameters']['url'].replace('.json', '_out.json')
cat = s3().read_json(url)
else:
cat = payload
return cls(cat)

def update(self):
# Input collections
if 'input_collections' not in self['process']:
cols = sorted(list(set([i['collection'] for i in self['features'] if 'collection' in i])))
Expand All @@ -81,7 +111,7 @@ def update(self):
if 'id' not in self:
self['id'] = f"{collections_str}/workflow-{self['process']['workflow']}/{items_str}"

logger.debug(f"Catalog after validate_and_update: {json.dumps(self)}")
self.logger.debug(f"Catalog after validate_and_update: {json.dumps(self)}")


# assign collections to Items given a mapping of Col ID: ID regex
Expand All @@ -96,7 +126,7 @@ def assign_collections(self):
for col in collections:
regex = re.compile(collections[col])
if regex.match(item['id']):
logger.debug(f"Setting {item['id']} collection to {col}")
self.logger.debug(f"Setting collection to {col}")
item['collection'] = col

def get_payload(self) -> Dict:
Expand Down Expand Up @@ -158,7 +188,7 @@ def publish_to_s3(self, bucket, public=False) -> List:
extra.update(headers)
s3session.upload_json(item, url, public=public, extra=extra)
s3urls.append(url)
logger.info(f"Uploaded STAC Item {item['id']} as {url}")
self.logger.info("Published to s3")

return s3urls

Expand Down Expand Up @@ -212,10 +242,9 @@ def publish_to_sns(self, topic_arn=PUBLISH_TOPIC_ARN):
topic_arn (str, optional): ARN of SNS Topic. Defaults to PUBLISH_TOPIC_ARN.
"""
for item in self['features']:
logger.info(f"Publishing item {item['id']} to {topic_arn}")
response = snsclient.publish(TopicArn=topic_arn, Message=json.dumps(item),
MessageAttributes=self.sns_attributes(item))
logger.debug(f"Response: {json.dumps(response)}")
MessageAttributes=self.sns_attributes(item))
self.logger.debug(f"Published item to {topic_arn}")

def process(self) -> str:
"""Add this Catalog to Cirrus and start workflow
Expand All @@ -230,23 +259,19 @@ def process(self) -> str:
# add input catalog to s3
url = f"s3://{CATALOG_BUCKET}/{self['id']}/input.json"
s3().upload_json(self, url)
logger.debug(f"Uploaded {url}")

# invoke step function
arn = os.getenv('BASE_WORKFLOW_ARN') + self['process']['workflow']
logger.info(f"Running {arn} on {self['id']}")
self.logger.debug(f"Running Step Function {arn}")
exe_response = stepfunctions.start_execution(stateMachineArn=arn, input=json.dumps(self.get_payload()))
logger.debug(f"Start execution response: {exe_response}")

# create DynamoDB record - this will always overwrite any existing process
resp = statedb.add_item(self, exe_response['executionArn'])
logger.debug(f"Add state item response: {resp}")

return self['id']
except Exception as err:
msg = f"process: failed starting {self['id']} ({err})"
logger.error(msg)
logger.error(format_exc())
msg = f"failed starting workflow ({err})"
self.logger.error(msg, exc_info=True)
statedb.add_failed_item(self, msg)
raise err

Expand All @@ -271,36 +296,6 @@ def catids(self) -> List[str]:
"""
return [c['id'] for c in self.catalogs]

@classmethod
def from_payload(cls, payload: Dict, **kwargs) -> Catalogs:
"""Parse a Cirrus payload and return a Catalogs instance
Args:
payload (Dict): A payload from SNS, SQS, or containing an s3 URL to payload
Returns:
Catalogs: A Catalogs instance
"""
catalogs = []
if 'Records' in payload:
for record in [json.loads(r['body']) for r in payload['Records']]:
if 'Message' in record:
# SNS
cat = Catalog(json.loads(record['Message']))
catalogs.append(cat)
else:
# SQS
catalogs.append(Catalog(record))
elif 'url' in payload:
catalogs = [Catalog(s3().read_json(payload['url']))]
elif 'Parameters' in payload and 'url' in payload['Parameters']:
# this is Batch, get the output payload
url = payload['Parameters']['url'].replace('.json', '_out.json')
catalogs = [Catalog(s3().read_json(url))]
else:
catalogs = [Catalog(payload)]
return cls(catalogs)

@classmethod
def from_catids(cls, catids: List[str], **kwargs) -> Catalogs:
"""Create Catalogs from list of Catalog IDs
Expand Down Expand Up @@ -328,15 +323,15 @@ def from_statedb_paged(cls, collections, state, since: str=None, index: str='inp
for it in resp['items']:
cat = Catalog(s3().read_json(it['input_catalog']))
catalogs.append(cat)
logger.debug(f"Retrieved {len(catalogs)} from state db")
self.logger.debug(f"Retrieved {len(catalogs)} from state db")
yield cls(catalogs, state_items=resp['items'])
catalogs = []
while 'nextkey' in resp:
resp = statedb.get_items_page(collections, state, since, index, nextkey=resp['nextkey'])
for it in resp['items']:
cat = Catalog(s3().read_json(it['input_catalog']))
catalogs.append(cat)
logger.debug(f"Retrieved {len(catalogs)} from state db")
self.logger.debug(f"Retrieved {len(catalogs)} from state db")
yield cls(catalogs, state_items=resp['items'])
"""

Expand Down Expand Up @@ -390,7 +385,7 @@ def process(self, replace=False):
if state in ['FAILED', ''] or _replace:
catids.append(cat.process())
else:
logger.info(f"Skipping {cat['id']}, in {state} state")
logger.info(f"Skipping, input already in {state} state")
continue

return catids
70 changes: 70 additions & 0 deletions cirruslib/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging
import logging.config
from os import getenv


config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"format": "%(asctime)s %(name)s %(levelname)s %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S%z",
},
"json": {
"format": "%(asctime)s %(name)s %(levelname)s %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S%z",
"class": "pythonjsonlogger.jsonlogger.JsonFormatter"
}
},
"handlers": {
"standard": {
"class": "logging.StreamHandler",
"formatter": "json"
}
},
"loggers": {
"": {
"propagate": False
},
"lambda_function": {
"handlers": ["standard"],
"level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG')
},
"feeder": {
"handlers": ["standard"],
"level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG')
},
"task": {
"handlers": ["standard"],
"level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG')
},
"cirruslib": {
"handlers": ["standard"],
"level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG')
}
}
}

logging.config.dictConfig(config)


class DynamicLoggerAdapter(logging.LoggerAdapter):

def __init__(self, *args, keys=None, **kwargs):
super(DynamicLoggerAdapter, self).__init__(*args, **kwargs)
self.keys = keys

def process(self, msg, kwargs):
if self.keys is not None:
kwargs = {k: self.extra[k] for k in self.keys if k in self.extra}
return (msg, {"extra": kwargs})
else:
return (msg, kwargs)


def get_task_logger(*args, catalog, **kwargs):
_logger = logging.getLogger(*args, **kwargs)
logger = DynamicLoggerAdapter(_logger, catalog, keys=['id', 'stac_version'])
return logger

Loading

0 comments on commit eff2bff

Please sign in to comment.