From f26319be0cf7ad7d5b92e527908ca3a644ae340f Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Fri, 2 Oct 2020 16:23:48 -0400 Subject: [PATCH 01/32] bump pystac version --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 0f2f760..c0c4fb7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ boto3-utils~=0.3.1 -pystac==0.5.0-RC1 +pystac~=0.5 requests>=2.24.0 From a306716950c9dafc864e9925eab84ffc22f0a0b0 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Mon, 5 Oct 2020 16:09:10 -0400 Subject: [PATCH 02/32] add in stac logic from stac lambda --- cirruslib/stac.py | 62 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/cirruslib/stac.py b/cirruslib/stac.py index e820436..b5573bc 100644 --- a/cirruslib/stac.py +++ b/cirruslib/stac.py @@ -6,60 +6,82 @@ from boto3utils import s3 from typing import Dict, Optional, List -from pystac import STAC_IO, Catalog, CatalogType, Collection +from pystac import STAC_IO, Catalog, CatalogType, Collection, Link logger = logging.getLogger(__name__) logger.setLevel(os.getenv('CIRRUS_LOG_LEVEL', 'INFO')) - # envvars DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) PUBLIC_CATALOG = os.getenv('CIRRUS_PUBLIC_CATALOG', False) STAC_VERSION = os.getenv('CIRRUS_STAC_VERSION', '1.0.0-beta.2') DESCRIPTION = os.getenv('CIRRUS_STAC_DESCRIPTION', 'Cirrus STAC') AWS_REGION = os.getenv('AWS_REGION') +PUBLISH_TOPIC = os.getenv('CIRRUS_PUBLISH_TOPIC_ARN', None) + +# root catalog +ROOT_URL = f"s3://{DATA_BUCKET}" +PUBLIC_ROOT_URL = s3.s3_to_https(ROOT_URL, region=AWS_REGION) +ROOT_CATALOG = get_root_catalog() -ROOT_URL = f"s3://{DATA_BUCKET}/catalog.json" +# boto3 clients +snsclient = boto3.client('sns') def s3stac_read(uri): - if uri.startswith('s3'): - return json.dumps(s3().read_json(uri)) - else: - return STAC_IO.default_read_text_method(uri) + if uri.startswith('http'): + uri = s3.https_to_s3(uri) + return json.dumps(s3().read_json(uri)) def s3stac_write(uri, txt): extra = { 'ContentType': 'application/json' } - if uri.startswith('s3'): - s3().upload_json(json.loads(txt), uri, extra=extra, public=PUBLIC_CATALOG) - else: - STAC_IO.default_write_text_method(uri, txt) + if uri.startswith('http'): + uri = s3.https_to_s3(uri) + s3().upload_json(json.loads(txt), uri, extra=extra, public=PUBLIC_CATALOG) STAC_IO.read_text_method = s3stac_read STAC_IO.write_text_method = s3stac_write -def get_root_catalog() -> Dict: +def get_root_catalog(): """Get Cirrus root catalog from s3 Returns: Dict: STAC root catalog """ - if s3().exists(ROOT_URL): - cat = Catalog.from_file(ROOT_URL) + caturl = f"{ROOT_URL}/catalog.json" + if s3().exists(caturl): + cat = Catalog.from_file(caturl) else: catid = DATA_BUCKET.split('-data-')[0] cat = Catalog(id=catid, description=DESCRIPTION) + if PUBLIC_CATALOG: + cat.normalize_and_save(PUBLIC_ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) + else: + cat.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) logger.debug(f"Fetched {cat.describe()}") return cat # add this collection to Cirrus catalog -def add_collection(collection): - cat = get_root_catalog() - col = Collection(**collection) - cat.add_child(col) - cat.normalize_and_save(ROOT_URL, CatalogType=CatalogType.ABSOLUTE_PUBLISHED) - return cat \ No newline at end of file +def add_collections(collections, publish=True): + + for collection in collections: + collection.remove_links('child') + link = Link('copied_from', collection) + collection.add_link(link, collection.get_self_href()) + ROOT_CATALOG.add_child(collection) + if publish: + child_json = json.dumps(collection.to_dict()) + logger.debug(f"Publishing {collection.id}: {child_json}") + response = snsclient.publish(TopicArn=PUBLISH_TOPIC, Message=child_json) + logger.debug(f"SNS Publish response: {json.dumps(response)}") + + if PUBLIC_CATALOG: + ROOT_CATALOG.normalize_and_save(PUBLIC_ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) + else: + ROOT_CATALOG.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) + + return ROOT_CATALOG \ No newline at end of file From e1a344d764565f1f78a4d58715aeaad196ebd6e4 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Mon, 5 Oct 2020 20:50:31 -0400 Subject: [PATCH 03/32] move root_catalog init to end of module --- cirruslib/stac.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cirruslib/stac.py b/cirruslib/stac.py index b5573bc..9071f66 100644 --- a/cirruslib/stac.py +++ b/cirruslib/stac.py @@ -22,7 +22,6 @@ # root catalog ROOT_URL = f"s3://{DATA_BUCKET}" PUBLIC_ROOT_URL = s3.s3_to_https(ROOT_URL, region=AWS_REGION) -ROOT_CATALOG = get_root_catalog() # boto3 clients snsclient = boto3.client('sns') @@ -84,4 +83,7 @@ def add_collections(collections, publish=True): else: ROOT_CATALOG.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) - return ROOT_CATALOG \ No newline at end of file + return ROOT_CATALOG + + +ROOT_CATALOG = get_root_catalog() \ No newline at end of file From abe8dde379fe88a9e3fea0919e139391eb9fc4b3 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Mon, 5 Oct 2020 20:50:45 -0400 Subject: [PATCH 04/32] remove QUEUED as valid state --- cirruslib/statedb.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index d628d00..8627d8f 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -15,7 +15,7 @@ # envvars CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET') -STATES = ['QUEUED', 'PROCESSING', 'COMPLETED', 'FAILED', 'INVALID'] +STATES = ['PROCESSING', 'COMPLETED', 'FAILED', 'INVALID'] INDEX_KEYS = { 'input_state': 'input_collections', 'output_state': 'output_collections' @@ -35,12 +35,12 @@ def __init__(self, table_name: str=os.getenv('CIRRUS_STATE_DB', 'test')): self.table_name = table_name self.table = self.db.Table(table_name) - def create_item(self, catalog: Dict, state: str='QUEUED'): + def create_item(self, catalog: Dict, state: str='PROCESSING'): """Create an item in DynamoDB Args: catalog (Dict): A Cirrus Input Catalog - state (str, optional): Set items to this state. Defaults to 'QUEUED'. + state (str, optional): Set items to this state. Defaults to 'PROCESSING'. """ now = datetime.now().isoformat() opts = catalog['process']['output_options'] @@ -199,7 +199,7 @@ def get_items_page(self, collection: str, state: str, since: Optional[str]=None, Args: collection (str): /-separated list of collections (input or output depending on index) - state (str): State of Items to get (QUEUED, PROCESSING, COMPLETED, FAILED, INVALID) + state (str): State of Items to get (PROCESSING, COMPLETED, FAILED, INVALID) since (Optional[str], optional): Get Items since this amount of time in the past. Defaults to None. index (str, optional): Query this index (input_state or output_state). Defaults to 'input_state'. @@ -254,7 +254,7 @@ def get_state(self, catid: str) -> str: catid (str): The catalog ID Returns: - str: Current state: QUEUED, PROCESSING, COMPLETED, FAILED, INVALID + str: Current state: PROCESSING, COMPLETED, FAILED, INVALID """ response = self.table.get_item(Key=self.catid_to_key(catid)) if 'Item' in response: From c9a54b12ea799ed99b0ee28ba49c222979cfd454 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 7 Oct 2020 00:46:29 -0400 Subject: [PATCH 05/32] add logic for public catalogs into stac module --- cirruslib/stac.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/cirruslib/stac.py b/cirruslib/stac.py index 9071f66..b4331e4 100644 --- a/cirruslib/stac.py +++ b/cirruslib/stac.py @@ -21,7 +21,8 @@ # root catalog ROOT_URL = f"s3://{DATA_BUCKET}" -PUBLIC_ROOT_URL = s3.s3_to_https(ROOT_URL, region=AWS_REGION) +if PUBLIC_CATALOG: + ROOT_URL = s3.s3_to_https(ROOT_URL, region=AWS_REGION) # boto3 clients snsclient = boto3.client('sns') @@ -56,10 +57,7 @@ def get_root_catalog(): else: catid = DATA_BUCKET.split('-data-')[0] cat = Catalog(id=catid, description=DESCRIPTION) - if PUBLIC_CATALOG: - cat.normalize_and_save(PUBLIC_ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) - else: - cat.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) + cat.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) logger.debug(f"Fetched {cat.describe()}") return cat @@ -76,13 +74,9 @@ def add_collections(collections, publish=True): child_json = json.dumps(collection.to_dict()) logger.debug(f"Publishing {collection.id}: {child_json}") response = snsclient.publish(TopicArn=PUBLISH_TOPIC, Message=child_json) - logger.debug(f"SNS Publish response: {json.dumps(response)}") - - if PUBLIC_CATALOG: - ROOT_CATALOG.normalize_and_save(PUBLIC_ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) - else: - ROOT_CATALOG.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) - + logger.debug(f"SNS Publish response: {json.dumps(response)}") + + ROOT_CATALOG.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) return ROOT_CATALOG From b37c13d8ed30a631a9ba0e32fa540d1a5fd50f30 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 7 Oct 2020 12:01:50 -0400 Subject: [PATCH 06/32] quiet some excessive logging --- cirruslib/statedb.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index 8627d8f..e74edb9 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -117,7 +117,7 @@ def get_dbitem(self, catid: str) -> Dict: """ try: response = self.table.get_item(Key=self.catid_to_key(catid)) - logger.debug(f"Fetched {response['Item']}") + #logger.debug(f"Fetched {response['Item']}") return response['Item'] except Exception as err: logger.info(f"Error fetching item {catid}: {err}") @@ -238,10 +238,10 @@ def get_items(self, *args, limit=None, **kwargs) -> Dict: """ resp = self.get_items_page(*args, **kwargs) items = resp['items'] - logger.debug(f"Fetched page of {len(items)} items from statedb") + #logger.debug(f"Fetched page of {len(items)} items from statedb") while 'nextkey' in resp and (limit is None or len(items) < limit): resp = self.get_items_page(*args, nextkey=resp['nextkey'], **kwargs) - logger.debug(f"Fetched page of {len(resp['items'])} items from statedb") + #logger.debug(f"Fetched page of {len(resp['items'])} items from statedb") items += resp['items'] if limit is None or len(items) < limit: return items From 965382a9ded4e1a1e051145fea19799b0d898c39 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Thu, 8 Oct 2020 23:32:28 -0400 Subject: [PATCH 07/32] enforce timestamps as all UTC --- cirruslib/statedb.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index e74edb9..deed9e0 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -4,7 +4,7 @@ from boto3utils import s3 from boto3.dynamodb.conditions import Key -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from logging import getLogger from traceback import format_exc from typing import Dict, Optional, List @@ -42,7 +42,7 @@ def create_item(self, catalog: Dict, state: str='PROCESSING'): catalog (Dict): A Cirrus Input Catalog state (str, optional): Set items to this state. Defaults to 'PROCESSING'. """ - now = datetime.now().isoformat() + now = datetime.now(timezone.utc).isoformat() opts = catalog['process']['output_options'] output_collections = '/'.join(sorted(opts['collections'].keys())) key = self.catid_to_key(catalog['id']) @@ -60,7 +60,7 @@ def create_item(self, catalog: Dict, state: str='PROCESSING'): def add_item(self, catalog, execution): """ Adds new item with state function execution """ - now = datetime.now().isoformat() + now = datetime.now(timezone.utc).isoformat() opts = catalog['process']['output_options'] output_collections = '/'.join(sorted(opts['collections'].keys())) key = self.catid_to_key(catalog['id']) @@ -80,7 +80,7 @@ def add_item(self, catalog, execution): def add_failed_item(self, catalog, error_message): """ Adds new item as failed """ """ Adds new item with state function execution """ - now = datetime.now().isoformat() + now = datetime.now(timezone.utc).isoformat() opts = catalog['process']['output_options'] output_collections = '/'.join(sorted(opts['collections'].keys())) key = self.catid_to_key(catalog['id']) @@ -292,7 +292,7 @@ def set_processing(self, catid: str, execution: str) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, execution=:exe', ExpressionAttributeValues={ - ':p': f"PROCESSING_{datetime.now().isoformat()}", + ':p': f"PROCESSING_{datetime.now(timezone.utc).isoformat()}", ':exe': execution } ) @@ -312,7 +312,7 @@ def set_completed(self, catid: str, urls: List[str]) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, output_urls=:urls', ExpressionAttributeValues={ - ':p': f"COMPLETED_{datetime.now().isoformat()}", + ':p': f"COMPLETED_{datetime.now(timezone.utc).isoformat()}", ':urls': urls } ) @@ -332,7 +332,7 @@ def set_failed(self, catid: str, msg: str) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, error_message=:err', ExpressionAttributeValues={ - ':p': f"FAILED_{datetime.now().isoformat()}", + ':p': f"FAILED_{datetime.now(timezone.utc).isoformat()}", ':err': msg } ) @@ -352,7 +352,7 @@ def set_invalid(self, catid: str, msg: str) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, error_message=:err', ExpressionAttributeValues={ - ':p': f"INVALID_{datetime.now().isoformat()}", + ':p': f"INVALID_{datetime.now(timezone.utc).isoformat()}", ':err': msg } ) @@ -374,9 +374,9 @@ def query(self, collection: str, state: str=None, since: str=None, """ expr = Key(INDEX_KEYS[index]).eq(collection) if state and since: - start = datetime.now() - self.since_to_timedelta(since) + start = datetime.now(timezone.utc) - self.since_to_timedelta(since) begin = f"{state}_{start.isoformat()}" - end = f"{state}_{datetime.now().isoformat()}" + end = f"{state}_{datetime.now(timezone.utc).isoformat()}" expr = expr & Key('current_state').between(begin, end) elif state: expr = expr & Key('current_state').begins_with(state) From 72c5d6203837b8efdf98090dcc2f804a6c27a816 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Fri, 9 Oct 2020 15:36:11 -0400 Subject: [PATCH 08/32] add dict_merge function to utils --- cirruslib/utils.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/cirruslib/utils.py b/cirruslib/utils.py index 8e72e14..f6e3d9b 100644 --- a/cirruslib/utils.py +++ b/cirruslib/utils.py @@ -9,6 +9,7 @@ from os import getenv from string import Formatter, Template from typing import Dict, Optional, List +from collections.abc import Mapping # configure logger - CRITICAL, ERROR, WARNING, INFO, DEBUG logger = logging.getLogger(__name__) @@ -73,4 +74,39 @@ def get_path(item: Dict, template: str='${collection}/${id}') -> str: # Item property else: subs[key] = item['properties'][key.replace('__colon__', ':')] - return Template(_template).substitute(**subs).replace('__colon__', ':') \ No newline at end of file + return Template(_template).substitute(**subs).replace('__colon__', ':') + + +# from https://gist.github.com/angstwad/bf22d1822c38a92ec0a9#gistcomment-2622319 +def dict_merge(dct, merge_dct, add_keys=True): + """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of + updating only top-level keys, dict_merge recurses down into dicts nested + to an arbitrary depth, updating keys. The ``merge_dct`` is merged into + ``dct``. + This version will return a copy of the dictionary and leave the original + arguments untouched. + The optional argument ``add_keys``, determines whether keys which are + present in ``merge_dict`` but not ``dct`` should be included in the + new dict. + Args: + dct (dict) onto which the merge is executed + merge_dct (dict): dct merged into dct + add_keys (bool): whether to add new keys + Returns: + dict: updated dict + """ + dct = dct.copy() + if not add_keys: + merge_dct = { + k: merge_dct[k] + for k in set(dct).intersection(set(merge_dct)) + } + + for k, v in merge_dct.items(): + if (k in dct and isinstance(dct[k], dict) + and isinstance(merge_dct[k], Mapping)): + dct[k] = dict_merge(dct[k], merge_dct[k], add_keys=add_keys) + else: + dct[k] = merge_dct[k] + + return dct \ No newline at end of file From 9fffebc8d2c2be4da38759b6bbf6371bc0670288 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Thu, 15 Oct 2020 21:08:39 -0400 Subject: [PATCH 09/32] backwards compatible check --- cirruslib/catalog.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 2f3b986..d3926f2 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -39,6 +39,10 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): """ super(Catalog, self).__init__(*args, **kwargs) + # convert old functions field to tasks + if 'functions' in self['process']: + self['process']['tasks'] = self['process'].pop('functions') + if update: self.update() @@ -66,10 +70,6 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): def update(self): - # convert old functions field to tasks - if 'functions' in self['process']: - self['process']['tasks'] = self['process'].pop('functions') - # Input collections if 'input_collections' not in self['process']: cols = sorted(list(set([i['collection'] for i in self['features'] if 'collection' in i]))) @@ -212,10 +212,9 @@ def publish_to_sns(self, topic_arn=PUBLISH_TOPIC_ARN): topic_arn (str, optional): ARN of SNS Topic. Defaults to PUBLISH_TOPIC_ARN. """ for item in self['features']: - logger.info(f"Publishing item {item['id']} to {topic_arn}") + logger.debug(f"Publishing item {item['id']} to {topic_arn}") response = snsclient.publish(TopicArn=topic_arn, Message=json.dumps(item), - MessageAttributes=self.sns_attributes(item)) - logger.debug(f"Response: {json.dumps(response)}") + MessageAttributes=self.sns_attributes(item)) def process(self) -> str: """Add this Catalog to Cirrus and start workflow @@ -234,13 +233,11 @@ def process(self) -> str: # invoke step function arn = os.getenv('BASE_WORKFLOW_ARN') + self['process']['workflow'] - logger.info(f"Running {arn} on {self['id']}") + logger.debug(f"Running {arn} on {self['id']}") exe_response = stepfunctions.start_execution(stateMachineArn=arn, input=json.dumps(self.get_payload())) - logger.debug(f"Start execution response: {exe_response}") # create DynamoDB record - this will always overwrite any existing process resp = statedb.add_item(self, exe_response['executionArn']) - logger.debug(f"Add state item response: {resp}") return self['id'] except Exception as err: From 1e69a8c80b470760f21981b519a1792e56b95094 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Fri, 16 Oct 2020 23:24:50 -0400 Subject: [PATCH 10/32] add JSON logger to Catalog class --- cirruslib/catalog.py | 15 +++++++++++++-- requirements.txt | 1 + 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index d3926f2..0d47566 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -6,13 +6,15 @@ import os import re import uuid +from traceback import format_exc +from typing import Dict, Optional, List from boto3utils import s3 from cirruslib.statedb import StateDB from cirruslib.transfer import get_s3_session from cirruslib.utils import get_path -from traceback import format_exc -from typing import Dict, Optional, List +from pythonjsonlogger import jsonlogger + # envvars LOG_LEVEL = os.getenv('CIRRUS_LOG_LEVEL', 'INFO') @@ -39,6 +41,15 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): """ super(Catalog, self).__init__(*args, **kwargs) + # setup logger + logger = logging.getLogger(__name__) + syslog = logging.StreamHandler() + formatter = formatter = jsonlogger.JsonFormatter() + syslog.setFormatter(formatter) + logger.setLevel(logging.INFO) + logger.addHandler(syslog) + self.logger = logging.LoggerAdapter(logger, self) + # convert old functions field to tasks if 'functions' in self['process']: self['process']['tasks'] = self['process'].pop('functions') diff --git a/requirements.txt b/requirements.txt index c0c4fb7..6e10b72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ boto3-utils~=0.3.1 pystac~=0.5 requests>=2.24.0 +python-json-logger~=2.0 From 008bbbadac364eee9b45074fa632bedc92904857 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Tue, 20 Oct 2020 23:13:05 -0400 Subject: [PATCH 11/32] add logging module for structured JSON logs --- cirruslib/__init__.py | 1 + cirruslib/logging.py | 49 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 cirruslib/logging.py diff --git a/cirruslib/__init__.py b/cirruslib/__init__.py index e4c63c1..aaeacfb 100644 --- a/cirruslib/__init__.py +++ b/cirruslib/__init__.py @@ -1,3 +1,4 @@ from .catalog import Catalog, Catalogs +from .logging import logger from .statedb import StateDB from .version import __version__ \ No newline at end of file diff --git a/cirruslib/logging.py b/cirruslib/logging.py new file mode 100644 index 0000000..c347db0 --- /dev/null +++ b/cirruslib/logging.py @@ -0,0 +1,49 @@ +import logging +from os import getenv + + +config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "standard": { + "format": "%(asctime)s %(name)s %(levelname)s %(message)s", + "datefmt": "%Y-%m-%dT%H:%M:%S%z", + }, + "json": { + "format": "%(asctime)s %(name)s %(levelname)s %(message)s", + "datefmt": "%Y-%m-%dT%H:%M:%S%z", + "class": "pythonjsonlogger.jsonlogger.JsonFormatter" + } + }, + "handlers": { + "standard": { + "class": "logging.StreamHandler", + "formatter": "json" + } + }, + "loggers": { + "": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + } + } +} + +logging.config.dictConfig(config) + +logger = logging.getLogger(__name__) + + +class DynamicLoggerAdapter(logging.LoggerAdapter): + + def __init__(self, *args, keys=None, **kwargs): + super(DynamicLoggerAdapter, self).__init__(*args, **kwargs) + self.keys = keys + + def process(self, msg, kwargs): + if self.keys is not None: + kwargs = {k: self.extra[k] for k in self.keys if k in self.extra} + return (msg, {"extra": kwargs}) + else: + return (msg, kwargs) \ No newline at end of file From 6c601afe1b5bf605bd9ae959a5018e64ae6e74f4 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Tue, 20 Oct 2020 23:14:00 -0400 Subject: [PATCH 12/32] use dynamic logger adapter in Catalog --- cirruslib/catalog.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 0d47566..1f7c61a 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -10,21 +10,18 @@ from typing import Dict, Optional, List from boto3utils import s3 -from cirruslib.statedb import StateDB +from cirruslib import StateDB, logger +from cirruslib.logging import DynamicLoggerAdapter from cirruslib.transfer import get_s3_session from cirruslib.utils import get_path from pythonjsonlogger import jsonlogger # envvars -LOG_LEVEL = os.getenv('CIRRUS_LOG_LEVEL', 'INFO') DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET', None) PUBLISH_TOPIC_ARN = os.getenv('CIRRUS_PUBLISH_TOPIC_ARN', None) -logger = logging.getLogger(__name__) -logger.setLevel(LOG_LEVEL) - # clients statedb = StateDB() snsclient = boto3.client('sns') @@ -41,14 +38,7 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): """ super(Catalog, self).__init__(*args, **kwargs) - # setup logger - logger = logging.getLogger(__name__) - syslog = logging.StreamHandler() - formatter = formatter = jsonlogger.JsonFormatter() - syslog.setFormatter(formatter) - logger.setLevel(logging.INFO) - logger.addHandler(syslog) - self.logger = logging.LoggerAdapter(logger, self) + self.logger = DynamicLoggerAdapter(logger, self, keys=['id', 'stac_version']) # convert old functions field to tasks if 'functions' in self['process']: From 35cdb10ebde1cf3845a9dcf10903d9d3e058ed68 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 00:07:45 -0400 Subject: [PATCH 13/32] start of catalog test --- tests/fixtures/test-catalog.json | 38 ++++++++++++++++++++++++++++++++ tests/test_catalog.py | 20 +++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 tests/fixtures/test-catalog.json create mode 100644 tests/test_catalog.py diff --git a/tests/fixtures/test-catalog.json b/tests/fixtures/test-catalog.json new file mode 100644 index 0000000..9a948de --- /dev/null +++ b/tests/fixtures/test-catalog.json @@ -0,0 +1,38 @@ +{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "tiles-32-R-MS-2020-10-20-0", + "collection": "sentinel-s2-l2a-aws", + "properties": {}, + "assets": { + "json": { + "href": "https://roda.sentinel-hub.com/sentinel-s2-l2a/tiles/32/R/MS/2020/10/20/0/tileInfo.json" + } + }, + "links": [] + } + ], + "process": { + "description": "Convert Original Sentinel-2 metadata to STAC and publish", + "input_collections": [ + "sentinel-s2-l2a-aws" + ], + "workflow": "publish-sentinel", + "output_options": { + "path_template": "/${collection}/${sentinel:utm_zone}/${sentinel:latitude_band}/${sentinel:grid_square}/${year}/${month}/${id}", + "collections": { + "sentinel-s2-l1c": ".*L1C", + "sentinel-s2-l2a": ".*L2A" + } + }, + "tasks": { + "publish": { + "public": true + } + }, + "replace": false + }, + "id": "sentinel-s2-l2a-aws/workflow-publish-sentinel/tiles-32-R-MS-2020-10-20-0" +} \ No newline at end of file diff --git a/tests/test_catalog.py b/tests/test_catalog.py new file mode 100644 index 0000000..b1d7a6d --- /dev/null +++ b/tests/test_catalog.py @@ -0,0 +1,20 @@ +import os +import json +import unittest + +from cirruslib import Catalog + +testpath = os.path.dirname(__file__) + + +class TestClassMethods(unittest.TestCase): + + def open_fixture(self, filename='test-catalog.json'): + with open(os.path.join(testpath, filename)) as f: + data = json.loads(f.read()) + return data + + def test_open_catalog(self): + data = self.open_fixture() + cat = Catalog(**data) + import pdb; pdb.set_trace() From f0bfcc09f5737277539a1362d6d6bf3c4cd8d5e8 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 00:09:19 -0400 Subject: [PATCH 14/32] fix logging module import config --- cirruslib/logging.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index c347db0..30d7167 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -1,4 +1,5 @@ import logging +import logging.config from os import getenv From 69a96f5260c5d11fcf41a88080cf304bf4fc2151 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 00:09:42 -0400 Subject: [PATCH 15/32] fix imports from Catalog --- cirruslib/catalog.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 1f7c61a..358d9ef 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -10,8 +10,8 @@ from typing import Dict, Optional, List from boto3utils import s3 -from cirruslib import StateDB, logger -from cirruslib.logging import DynamicLoggerAdapter +from cirruslib.statedb import StateDB +from cirruslib.logging import logger, DynamicLoggerAdapter from cirruslib.transfer import get_s3_session from cirruslib.utils import get_path from pythonjsonlogger import jsonlogger From 44cc7add4de1b8c6ff20cbb2a0be736fcae9aaed Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 00:36:50 -0400 Subject: [PATCH 16/32] do not create global module level logger --- cirruslib/__init__.py | 1 - cirruslib/catalog.py | 4 +++- cirruslib/logging.py | 5 +++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cirruslib/__init__.py b/cirruslib/__init__.py index aaeacfb..e4c63c1 100644 --- a/cirruslib/__init__.py +++ b/cirruslib/__init__.py @@ -1,4 +1,3 @@ from .catalog import Catalog, Catalogs -from .logging import logger from .statedb import StateDB from .version import __version__ \ No newline at end of file diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 358d9ef..ce5d427 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -11,11 +11,13 @@ from boto3utils import s3 from cirruslib.statedb import StateDB -from cirruslib.logging import logger, DynamicLoggerAdapter +from cirruslib.logging import DynamicLoggerAdapter from cirruslib.transfer import get_s3_session from cirruslib.utils import get_path from pythonjsonlogger import jsonlogger +logger = logging.getLogger(__name__) + # envvars DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index 30d7167..869fd70 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -27,14 +27,15 @@ "": { "handlers": ["standard"], "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + }, + "botocore": { + "propagate": False } } } logging.config.dictConfig(config) -logger = logging.getLogger(__name__) - class DynamicLoggerAdapter(logging.LoggerAdapter): From b23c977589dd4f1e01a201c02444ac21b2b754d9 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 00:47:23 -0400 Subject: [PATCH 17/32] do not config root logger --- cirruslib/logging.py | 7 ++++--- cirruslib/version.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index 869fd70..73c5d22 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -24,12 +24,13 @@ } }, "loggers": { - "": { + "lambda_function": { "handlers": ["standard"], "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') }, - "botocore": { - "propagate": False + "cirruslib": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') } } } diff --git a/cirruslib/version.py b/cirruslib/version.py index e1424ed..bfd5071 100644 --- a/cirruslib/version.py +++ b/cirruslib/version.py @@ -1 +1 @@ -__version__ = '0.3.1' +__version__ = '0.3.2-b1' From a9fb722d9f507b4727f9e903e30885567e3735ea Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 01:25:21 -0400 Subject: [PATCH 18/32] clean up logging --- cirruslib/transfer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cirruslib/transfer.py b/cirruslib/transfer.py index acc6251..b4992df 100644 --- a/cirruslib/transfer.py +++ b/cirruslib/transfer.py @@ -49,7 +49,9 @@ def get_s3_session(bucket: str=None, s3url: str=None, **kwargs) -> s3: creds.update(_creds) logger.debug(f"Using credentials for bucket {bucket}: {json.dumps(creds)}") except ClientError: - logger.debug(f"Using default credentials for bucket {bucket}") + # using default credentials + pass + requester_pays = creds.pop('requester_pays', False) session = boto3.Session(**creds) @@ -168,7 +170,7 @@ def upload_item_assets(item: Dict, assets: List[str]=None, public_assets: List[s s3_session = get_s3_session(parts['bucket']) # upload - logger.info(f"Uploading {filename} to {url}") + logger.debug(f"Uploading {filename} to {url}") url_out = s3_session.upload(filename, url, public=public, extra=_headers, http_url=not s3_urls) _item['assets'][key]['href'] = url_out return _item From 2be7f2de70fe14e3668e6a58a7e16e97ce0df493 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 01:33:14 -0400 Subject: [PATCH 19/32] disable other loggers --- cirruslib/logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index 73c5d22..1c5ae5c 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -5,7 +5,7 @@ config = { "version": 1, - "disable_existing_loggers": False, + "disable_existing_loggers": True, "formatters": { "standard": { "format": "%(asctime)s %(name)s %(levelname)s %(message)s", From bfd669e2ea2e140ad2083b56afb5fba1cc996414 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Wed, 21 Oct 2020 01:58:11 -0400 Subject: [PATCH 20/32] no popagate on root logger --- cirruslib/logging.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index 1c5ae5c..ef0e147 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -5,7 +5,7 @@ config = { "version": 1, - "disable_existing_loggers": True, + "disable_existing_loggers": False, "formatters": { "standard": { "format": "%(asctime)s %(name)s %(levelname)s %(message)s", @@ -24,6 +24,9 @@ } }, "loggers": { + "": { + "propagate": False + }, "lambda_function": { "handlers": ["standard"], "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') From 723e64290d39a0b630080cfad5c03edd3448d7d4 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Thu, 22 Oct 2020 16:51:18 -0400 Subject: [PATCH 21/32] move from_payload from Catalgos to Catalog --- cirruslib/catalog.py | 61 ++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index ce5d427..ac44d0b 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -71,8 +71,37 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): self.state_item = state_item - def update(self): + @classmethod + def from_payload(cls, payload: Dict, **kwargs) -> Catalog: + """Parse a Cirrus payload and return a Catalog instance + + Args: + payload (Dict): A payload from SNS, SQS, or containing an s3 URL to payload + + Returns: + Catalog: A Catalog instance + """ + if 'Records' in payload: + records = [json.loads(r['body']) for r in payload['Records']] + # there should be only one + assert(len(records) == 1) + if 'Message' in records[0]: + # SNS + cat = json.loads(records[0]['Message']) + else: + # SQS + cat = records[0] + elif 'url' in payload: + cat = s3().read_json(payload['url']) + elif 'Parameters' in payload and 'url' in payload['Parameters']: + # this is Batch, get the output payload + url = payload['Parameters']['url'].replace('.json', '_out.json') + cat = s3().read_json(url) + else: + cat = payload + return cls(cat) + def update(self): # Input collections if 'input_collections' not in self['process']: cols = sorted(list(set([i['collection'] for i in self['features'] if 'collection' in i]))) @@ -271,36 +300,6 @@ def catids(self) -> List[str]: """ return [c['id'] for c in self.catalogs] - @classmethod - def from_payload(cls, payload: Dict, **kwargs) -> Catalogs: - """Parse a Cirrus payload and return a Catalogs instance - - Args: - payload (Dict): A payload from SNS, SQS, or containing an s3 URL to payload - - Returns: - Catalogs: A Catalogs instance - """ - catalogs = [] - if 'Records' in payload: - for record in [json.loads(r['body']) for r in payload['Records']]: - if 'Message' in record: - # SNS - cat = Catalog(json.loads(record['Message'])) - catalogs.append(cat) - else: - # SQS - catalogs.append(Catalog(record)) - elif 'url' in payload: - catalogs = [Catalog(s3().read_json(payload['url']))] - elif 'Parameters' in payload and 'url' in payload['Parameters']: - # this is Batch, get the output payload - url = payload['Parameters']['url'].replace('.json', '_out.json') - catalogs = [Catalog(s3().read_json(url))] - else: - catalogs = [Catalog(payload)] - return cls(catalogs) - @classmethod def from_catids(cls, catids: List[str], **kwargs) -> Catalogs: """Create Catalogs from list of Catalog IDs From 5d70c49547f7870989b1a96fda05141f2a12ef5d Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Thu, 22 Oct 2020 23:40:25 -0400 Subject: [PATCH 22/32] log task and feeders --- cirruslib/logging.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index ef0e147..69f90db 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -31,6 +31,14 @@ "handlers": ["standard"], "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') }, + "feeder": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + }, + "task": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + }, "cirruslib": { "handlers": ["standard"], "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') From 88d220523078a9b70cd471fdf7db92a52b154fdb Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Fri, 23 Oct 2020 00:01:28 -0400 Subject: [PATCH 23/32] update logger config --- cirruslib/logging.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index 69f90db..c47ce78 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -25,6 +25,13 @@ }, "loggers": { "": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + }, + "botocore": { + "propagate": False + }, + "boto3": { "propagate": False }, "lambda_function": { From c06546e27741054eb44fae82bd2fdbd1e677d2db Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Fri, 23 Oct 2020 00:20:25 -0400 Subject: [PATCH 24/32] disable root logger --- cirruslib/logging.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index c47ce78..d61af54 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -25,8 +25,7 @@ }, "loggers": { "": { - "handlers": ["standard"], - "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + "propagate": False }, "botocore": { "propagate": False From c80f7ed82ac8ef3f7605f3d2234b89c2d5d4e415 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Sat, 24 Oct 2020 00:03:43 -0400 Subject: [PATCH 25/32] add get_task_logger --- cirruslib/__init__.py | 1 + cirruslib/catalog.py | 4 ++-- cirruslib/logging.py | 15 ++++++++------- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/cirruslib/__init__.py b/cirruslib/__init__.py index e4c63c1..581be1a 100644 --- a/cirruslib/__init__.py +++ b/cirruslib/__init__.py @@ -1,3 +1,4 @@ from .catalog import Catalog, Catalogs +from .logging import get_task_logger from .statedb import StateDB from .version import __version__ \ No newline at end of file diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index ac44d0b..0ac9ca7 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -40,8 +40,6 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): """ super(Catalog, self).__init__(*args, **kwargs) - self.logger = DynamicLoggerAdapter(logger, self, keys=['id', 'stac_version']) - # convert old functions field to tasks if 'functions' in self['process']: self['process']['tasks'] = self['process'].pop('functions') @@ -49,6 +47,8 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): if update: self.update() + self.logger = DynamicLoggerAdapter(logger, self, keys=['id', 'stac_version']) + # validate process block assert(self['type'] == 'FeatureCollection') assert('process' in self) diff --git a/cirruslib/logging.py b/cirruslib/logging.py index d61af54..e44fefa 100644 --- a/cirruslib/logging.py +++ b/cirruslib/logging.py @@ -27,12 +27,6 @@ "": { "propagate": False }, - "botocore": { - "propagate": False - }, - "boto3": { - "propagate": False - }, "lambda_function": { "handlers": ["standard"], "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') @@ -66,4 +60,11 @@ def process(self, msg, kwargs): kwargs = {k: self.extra[k] for k in self.keys if k in self.extra} return (msg, {"extra": kwargs}) else: - return (msg, kwargs) \ No newline at end of file + return (msg, kwargs) + + +def get_task_logger(*args, catalog, **kwargs): + _logger = logging.getLogger(*args, **kwargs) + logger = DynamicLoggerAdapter(_logger, catalog, keys=['id', 'stac_version']) + return logger + From dad5abd4fa37af2713fdcec8b3de9029174b355a Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Sat, 24 Oct 2020 03:02:35 -0400 Subject: [PATCH 26/32] update getting loggers --- cirruslib/catalog.py | 8 ++------ cirruslib/stac.py | 1 - cirruslib/statedb.py | 1 - cirruslib/transfer.py | 1 - cirruslib/utils.py | 2 -- 5 files changed, 2 insertions(+), 11 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 0ac9ca7..39937fb 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -2,7 +2,6 @@ import boto3 import json -import logging import os import re import uuid @@ -11,14 +10,11 @@ from boto3utils import s3 from cirruslib.statedb import StateDB -from cirruslib.logging import DynamicLoggerAdapter +from cirruslib.logging import DynamicLoggerAdapter, get_task_logger from cirruslib.transfer import get_s3_session from cirruslib.utils import get_path from pythonjsonlogger import jsonlogger -logger = logging.getLogger(__name__) - - # envvars DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET', None) @@ -47,7 +43,7 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): if update: self.update() - self.logger = DynamicLoggerAdapter(logger, self, keys=['id', 'stac_version']) + self.logger = get_task_logger(__name__, catalog=self) # validate process block assert(self['type'] == 'FeatureCollection') diff --git a/cirruslib/stac.py b/cirruslib/stac.py index b4331e4..769ad96 100644 --- a/cirruslib/stac.py +++ b/cirruslib/stac.py @@ -9,7 +9,6 @@ from pystac import STAC_IO, Catalog, CatalogType, Collection, Link logger = logging.getLogger(__name__) -logger.setLevel(os.getenv('CIRRUS_LOG_LEVEL', 'INFO')) # envvars DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index deed9e0..2da2bec 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -10,7 +10,6 @@ from typing import Dict, Optional, List logger = getLogger(__name__) -logger.setLevel(os.getenv('CIRRUS_LOG_LEVEL', 'INFO')) # envvars CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET') diff --git a/cirruslib/transfer.py b/cirruslib/transfer.py index b4992df..03f7d83 100644 --- a/cirruslib/transfer.py +++ b/cirruslib/transfer.py @@ -13,7 +13,6 @@ from typing import Dict, Optional, List logger = logging.getLogger(__name__) -logger.setLevel(getenv('CIRRUS_LOG_LEVEL', 'INFO')) # get data bucket to upload to DATA_BUCKET = getenv('CIRRUS_DATA_BUCKET') diff --git a/cirruslib/utils.py b/cirruslib/utils.py index f6e3d9b..6bb6d18 100644 --- a/cirruslib/utils.py +++ b/cirruslib/utils.py @@ -11,9 +11,7 @@ from typing import Dict, Optional, List from collections.abc import Mapping -# configure logger - CRITICAL, ERROR, WARNING, INFO, DEBUG logger = logging.getLogger(__name__) -logger.setLevel(getenv('CIRRUS_LOG_LEVEL', 'INFO')) batch_client = boto3.client('batch') From 6f02495b6de1ed85e04dd2e60fa052b163cea372 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Sun, 25 Oct 2020 22:26:52 -0400 Subject: [PATCH 27/32] Catalog use self logger --- cirruslib/catalog.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 39937fb..5ba9ef1 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -2,18 +2,17 @@ import boto3 import json +import logging import os import re import uuid -from traceback import format_exc from typing import Dict, Optional, List from boto3utils import s3 from cirruslib.statedb import StateDB -from cirruslib.logging import DynamicLoggerAdapter, get_task_logger +from cirruslib.logging import get_task_logger from cirruslib.transfer import get_s3_session from cirruslib.utils import get_path -from pythonjsonlogger import jsonlogger # envvars DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) @@ -25,6 +24,9 @@ snsclient = boto3.client('sns') stepfunctions = boto3.client('stepfunctions') +# logging +logger = logging.getLogger(__name__) + class Catalog(dict): @@ -109,7 +111,7 @@ def update(self): if 'id' not in self: self['id'] = f"{collections_str}/workflow-{self['process']['workflow']}/{items_str}" - logger.debug(f"Catalog after validate_and_update: {json.dumps(self)}") + self.logger.debug(f"Catalog after validate_and_update: {json.dumps(self)}") # assign collections to Items given a mapping of Col ID: ID regex @@ -124,7 +126,7 @@ def assign_collections(self): for col in collections: regex = re.compile(collections[col]) if regex.match(item['id']): - logger.debug(f"Setting {item['id']} collection to {col}") + self.logger.debug(f"Setting {item['id']} collection to {col}") item['collection'] = col def get_payload(self) -> Dict: @@ -186,7 +188,7 @@ def publish_to_s3(self, bucket, public=False) -> List: extra.update(headers) s3session.upload_json(item, url, public=public, extra=extra) s3urls.append(url) - logger.info(f"Uploaded STAC Item {item['id']} as {url}") + self.logger.info(f"Uploaded STAC Item {item['id']} as {url}") return s3urls @@ -240,7 +242,7 @@ def publish_to_sns(self, topic_arn=PUBLISH_TOPIC_ARN): topic_arn (str, optional): ARN of SNS Topic. Defaults to PUBLISH_TOPIC_ARN. """ for item in self['features']: - logger.debug(f"Publishing item {item['id']} to {topic_arn}") + self.logger.debug(f"Publishing item to {topic_arn}") response = snsclient.publish(TopicArn=topic_arn, Message=json.dumps(item), MessageAttributes=self.sns_attributes(item)) @@ -257,11 +259,11 @@ def process(self) -> str: # add input catalog to s3 url = f"s3://{CATALOG_BUCKET}/{self['id']}/input.json" s3().upload_json(self, url) - logger.debug(f"Uploaded {url}") + self.logger.debug(f"Uploaded {url}") # invoke step function arn = os.getenv('BASE_WORKFLOW_ARN') + self['process']['workflow'] - logger.debug(f"Running {arn} on {self['id']}") + self.logger.debug(f"Running {arn} on {self['id']}") exe_response = stepfunctions.start_execution(stateMachineArn=arn, input=json.dumps(self.get_payload())) # create DynamoDB record - this will always overwrite any existing process @@ -270,8 +272,7 @@ def process(self) -> str: return self['id'] except Exception as err: msg = f"process: failed starting {self['id']} ({err})" - logger.error(msg) - logger.error(format_exc()) + self.logger.error(msg, exc_info=True) statedb.add_failed_item(self, msg) raise err @@ -323,7 +324,7 @@ def from_statedb_paged(cls, collections, state, since: str=None, index: str='inp for it in resp['items']: cat = Catalog(s3().read_json(it['input_catalog'])) catalogs.append(cat) - logger.debug(f"Retrieved {len(catalogs)} from state db") + self.logger.debug(f"Retrieved {len(catalogs)} from state db") yield cls(catalogs, state_items=resp['items']) catalogs = [] while 'nextkey' in resp: @@ -331,7 +332,7 @@ def from_statedb_paged(cls, collections, state, since: str=None, index: str='inp for it in resp['items']: cat = Catalog(s3().read_json(it['input_catalog'])) catalogs.append(cat) - logger.debug(f"Retrieved {len(catalogs)} from state db") + self.logger.debug(f"Retrieved {len(catalogs)} from state db") yield cls(catalogs, state_items=resp['items']) """ From 5908ff5d235276b2a649598747c7247e3cedec89 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Sun, 25 Oct 2020 22:27:05 -0400 Subject: [PATCH 28/32] update logger in StateDB --- cirruslib/statedb.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index 2da2bec..6860666 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -1,16 +1,13 @@ import boto3 import json +import logging import os from boto3utils import s3 from boto3.dynamodb.conditions import Key from datetime import datetime, timedelta, timezone -from logging import getLogger -from traceback import format_exc from typing import Dict, Optional, List -logger = getLogger(__name__) - # envvars CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET') @@ -20,6 +17,9 @@ 'output_state': 'output_collections' } +# logging +logger = logging.getLogger(f"{__name__}.aws-landsat") + class StateDB: @@ -148,8 +148,7 @@ def get_dbitems(self, catids: List[str]) -> List[Dict]: return items except Exception as err: msg = f"Error fetching items {catids} ({err})" - logger.error(msg) - logger.error(format_exc()) + logger.error(msg, exc_info=True) raise Exception(msg) from err def get_counts(self, collection: str, state: str=None, since: str=None, From 70bef670bdebbcd76bf20e1697095b054aee9e41 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Sun, 25 Oct 2020 22:34:32 -0400 Subject: [PATCH 29/32] update catalog logging --- cirruslib/catalog.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 5ba9ef1..d74f3a1 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -126,7 +126,7 @@ def assign_collections(self): for col in collections: regex = re.compile(collections[col]) if regex.match(item['id']): - self.logger.debug(f"Setting {item['id']} collection to {col}") + self.logger.debug(f"Setting collection to {col}") item['collection'] = col def get_payload(self) -> Dict: @@ -188,7 +188,7 @@ def publish_to_s3(self, bucket, public=False) -> List: extra.update(headers) s3session.upload_json(item, url, public=public, extra=extra) s3urls.append(url) - self.logger.info(f"Uploaded STAC Item {item['id']} as {url}") + self.logger.info("Published to s3") return s3urls @@ -242,9 +242,9 @@ def publish_to_sns(self, topic_arn=PUBLISH_TOPIC_ARN): topic_arn (str, optional): ARN of SNS Topic. Defaults to PUBLISH_TOPIC_ARN. """ for item in self['features']: - self.logger.debug(f"Publishing item to {topic_arn}") response = snsclient.publish(TopicArn=topic_arn, Message=json.dumps(item), MessageAttributes=self.sns_attributes(item)) + self.logger.debug(f"Published item to {topic_arn}") def process(self) -> str: """Add this Catalog to Cirrus and start workflow @@ -259,11 +259,10 @@ def process(self) -> str: # add input catalog to s3 url = f"s3://{CATALOG_BUCKET}/{self['id']}/input.json" s3().upload_json(self, url) - self.logger.debug(f"Uploaded {url}") # invoke step function arn = os.getenv('BASE_WORKFLOW_ARN') + self['process']['workflow'] - self.logger.debug(f"Running {arn} on {self['id']}") + self.logger.debug(f"Running Step Function {arn}") exe_response = stepfunctions.start_execution(stateMachineArn=arn, input=json.dumps(self.get_payload())) # create DynamoDB record - this will always overwrite any existing process @@ -271,7 +270,7 @@ def process(self) -> str: return self['id'] except Exception as err: - msg = f"process: failed starting {self['id']} ({err})" + msg = f"failed starting workflow ({err})" self.logger.error(msg, exc_info=True) statedb.add_failed_item(self, msg) raise err @@ -386,7 +385,7 @@ def process(self, replace=False): if state in ['FAILED', ''] or _replace: catids.append(cat.process()) else: - logger.info(f"Skipping {cat['id']}, in {state} state") + logger.info(f"Skipping, input already in {state} state") continue return catids From 9a616acd1d6045eaf1076993e6ad4ef77d1365b1 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Sun, 25 Oct 2020 22:39:45 -0400 Subject: [PATCH 30/32] cleanup logging --- cirruslib/statedb.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index 6860666..f5329da 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -73,7 +73,7 @@ def add_item(self, catalog, execution): 'execution': execution } ) - logger.debug(f"Created DynamoDB Item {catalog['id']}") + logger.debug("Created DynamoDB Item", extra={'id':catalog['id']}) return response def add_failed_item(self, catalog, error_message): @@ -93,13 +93,13 @@ def add_failed_item(self, catalog, error_message): 'error_message': error_message } ) - logger.debug(f"Created DynamoDB Item {catalog['id']}") + logger.debug("Created DynamoDB Item", extra={'id':catalog['id']}) return response def delete_item(self, catid: str): key = self.catid_to_key(catid) response = self.table.delete_item(Key=key) - logger.debug(f"Removed DynamoDB Item {catid}") + logger.debug("Removed DynamoDB Item", extra={'id': catid}) return response def get_dbitem(self, catid: str) -> Dict: @@ -116,7 +116,6 @@ def get_dbitem(self, catid: str) -> Dict: """ try: response = self.table.get_item(Key=self.catid_to_key(catid)) - #logger.debug(f"Fetched {response['Item']}") return response['Item'] except Exception as err: logger.info(f"Error fetching item {catid}: {err}") @@ -236,10 +235,8 @@ def get_items(self, *args, limit=None, **kwargs) -> Dict: """ resp = self.get_items_page(*args, **kwargs) items = resp['items'] - #logger.debug(f"Fetched page of {len(items)} items from statedb") while 'nextkey' in resp and (limit is None or len(items) < limit): resp = self.get_items_page(*args, nextkey=resp['nextkey'], **kwargs) - #logger.debug(f"Fetched page of {len(resp['items'])} items from statedb") items += resp['items'] if limit is None or len(items) < limit: return items From d30a5f4d58af7e2cd1a0e7afbc083c5c529fc737 Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Mon, 26 Oct 2020 00:00:39 -0400 Subject: [PATCH 31/32] fix logger name --- cirruslib/statedb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index f5329da..22d9ba8 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -18,7 +18,7 @@ } # logging -logger = logging.getLogger(f"{__name__}.aws-landsat") +logger = logging.getLogger(__name__) class StateDB: From 31f9b758c7f43801af33466489013ceb38dda56a Mon Sep 17 00:00:00 2001 From: Matthew Hanson Date: Mon, 26 Oct 2020 00:00:47 -0400 Subject: [PATCH 32/32] update CHANGELOG and bump version --- CHANGELOG.md | 15 +++++++++++++++ cirruslib/version.py | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af684f2..e09f211 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.3.2] - 2020-10-25 + +### Added +- `cirruslib.logging` module configuring structured (JSON) logging and get_task_logger for logging from tasks +- `cirruslib.stac` module for working with the Cirrus static STAC catalog on s3, uses PySTAC +- `utils.dict_merged` function for doing recursive merges + +### Changed +- Parsing payload for a task should now use `Catalog.from_payload` instead of `Catalogs.from_payload`, which returns a `Catalog` instead of an array of `Catalog` objects that always had a length of one +- Claned up logging across all modules + +### Removed +- `Catalogs.from_payload`, replaced by `Catalog.from_payload` +- `QUEUED` as potential processing state + ## [v0.3.1] - 2020-09-27 ### Changed diff --git a/cirruslib/version.py b/cirruslib/version.py index bfd5071..73e3bb4 100644 --- a/cirruslib/version.py +++ b/cirruslib/version.py @@ -1 +1 @@ -__version__ = '0.3.2-b1' +__version__ = '0.3.2'