diff --git a/CHANGELOG.md b/CHANGELOG.md index af684f2..e09f211 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.3.2] - 2020-10-25 + +### Added +- `cirruslib.logging` module configuring structured (JSON) logging and get_task_logger for logging from tasks +- `cirruslib.stac` module for working with the Cirrus static STAC catalog on s3, uses PySTAC +- `utils.dict_merged` function for doing recursive merges + +### Changed +- Parsing payload for a task should now use `Catalog.from_payload` instead of `Catalogs.from_payload`, which returns a `Catalog` instead of an array of `Catalog` objects that always had a length of one +- Claned up logging across all modules + +### Removed +- `Catalogs.from_payload`, replaced by `Catalog.from_payload` +- `QUEUED` as potential processing state + ## [v0.3.1] - 2020-09-27 ### Changed diff --git a/cirruslib/__init__.py b/cirruslib/__init__.py index e4c63c1..581be1a 100644 --- a/cirruslib/__init__.py +++ b/cirruslib/__init__.py @@ -1,3 +1,4 @@ from .catalog import Catalog, Catalogs +from .logging import get_task_logger from .statedb import StateDB from .version import __version__ \ No newline at end of file diff --git a/cirruslib/catalog.py b/cirruslib/catalog.py index 2f3b986..d74f3a1 100644 --- a/cirruslib/catalog.py +++ b/cirruslib/catalog.py @@ -6,28 +6,27 @@ import os import re import uuid +from typing import Dict, Optional, List from boto3utils import s3 from cirruslib.statedb import StateDB +from cirruslib.logging import get_task_logger from cirruslib.transfer import get_s3_session from cirruslib.utils import get_path -from traceback import format_exc -from typing import Dict, Optional, List # envvars -LOG_LEVEL = os.getenv('CIRRUS_LOG_LEVEL', 'INFO') DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET', None) PUBLISH_TOPIC_ARN = os.getenv('CIRRUS_PUBLISH_TOPIC_ARN', None) -logger = logging.getLogger(__name__) -logger.setLevel(LOG_LEVEL) - # clients statedb = StateDB() snsclient = boto3.client('sns') stepfunctions = boto3.client('stepfunctions') +# logging +logger = logging.getLogger(__name__) + class Catalog(dict): @@ -39,9 +38,15 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): """ super(Catalog, self).__init__(*args, **kwargs) + # convert old functions field to tasks + if 'functions' in self['process']: + self['process']['tasks'] = self['process'].pop('functions') + if update: self.update() + self.logger = get_task_logger(__name__, catalog=self) + # validate process block assert(self['type'] == 'FeatureCollection') assert('process' in self) @@ -64,12 +69,37 @@ def __init__(self, *args, update=False, state_item=None, **kwargs): self.state_item = state_item - def update(self): + @classmethod + def from_payload(cls, payload: Dict, **kwargs) -> Catalog: + """Parse a Cirrus payload and return a Catalog instance - # convert old functions field to tasks - if 'functions' in self['process']: - self['process']['tasks'] = self['process'].pop('functions') + Args: + payload (Dict): A payload from SNS, SQS, or containing an s3 URL to payload + + Returns: + Catalog: A Catalog instance + """ + if 'Records' in payload: + records = [json.loads(r['body']) for r in payload['Records']] + # there should be only one + assert(len(records) == 1) + if 'Message' in records[0]: + # SNS + cat = json.loads(records[0]['Message']) + else: + # SQS + cat = records[0] + elif 'url' in payload: + cat = s3().read_json(payload['url']) + elif 'Parameters' in payload and 'url' in payload['Parameters']: + # this is Batch, get the output payload + url = payload['Parameters']['url'].replace('.json', '_out.json') + cat = s3().read_json(url) + else: + cat = payload + return cls(cat) + def update(self): # Input collections if 'input_collections' not in self['process']: cols = sorted(list(set([i['collection'] for i in self['features'] if 'collection' in i]))) @@ -81,7 +111,7 @@ def update(self): if 'id' not in self: self['id'] = f"{collections_str}/workflow-{self['process']['workflow']}/{items_str}" - logger.debug(f"Catalog after validate_and_update: {json.dumps(self)}") + self.logger.debug(f"Catalog after validate_and_update: {json.dumps(self)}") # assign collections to Items given a mapping of Col ID: ID regex @@ -96,7 +126,7 @@ def assign_collections(self): for col in collections: regex = re.compile(collections[col]) if regex.match(item['id']): - logger.debug(f"Setting {item['id']} collection to {col}") + self.logger.debug(f"Setting collection to {col}") item['collection'] = col def get_payload(self) -> Dict: @@ -158,7 +188,7 @@ def publish_to_s3(self, bucket, public=False) -> List: extra.update(headers) s3session.upload_json(item, url, public=public, extra=extra) s3urls.append(url) - logger.info(f"Uploaded STAC Item {item['id']} as {url}") + self.logger.info("Published to s3") return s3urls @@ -212,10 +242,9 @@ def publish_to_sns(self, topic_arn=PUBLISH_TOPIC_ARN): topic_arn (str, optional): ARN of SNS Topic. Defaults to PUBLISH_TOPIC_ARN. """ for item in self['features']: - logger.info(f"Publishing item {item['id']} to {topic_arn}") response = snsclient.publish(TopicArn=topic_arn, Message=json.dumps(item), - MessageAttributes=self.sns_attributes(item)) - logger.debug(f"Response: {json.dumps(response)}") + MessageAttributes=self.sns_attributes(item)) + self.logger.debug(f"Published item to {topic_arn}") def process(self) -> str: """Add this Catalog to Cirrus and start workflow @@ -230,23 +259,19 @@ def process(self) -> str: # add input catalog to s3 url = f"s3://{CATALOG_BUCKET}/{self['id']}/input.json" s3().upload_json(self, url) - logger.debug(f"Uploaded {url}") # invoke step function arn = os.getenv('BASE_WORKFLOW_ARN') + self['process']['workflow'] - logger.info(f"Running {arn} on {self['id']}") + self.logger.debug(f"Running Step Function {arn}") exe_response = stepfunctions.start_execution(stateMachineArn=arn, input=json.dumps(self.get_payload())) - logger.debug(f"Start execution response: {exe_response}") # create DynamoDB record - this will always overwrite any existing process resp = statedb.add_item(self, exe_response['executionArn']) - logger.debug(f"Add state item response: {resp}") return self['id'] except Exception as err: - msg = f"process: failed starting {self['id']} ({err})" - logger.error(msg) - logger.error(format_exc()) + msg = f"failed starting workflow ({err})" + self.logger.error(msg, exc_info=True) statedb.add_failed_item(self, msg) raise err @@ -271,36 +296,6 @@ def catids(self) -> List[str]: """ return [c['id'] for c in self.catalogs] - @classmethod - def from_payload(cls, payload: Dict, **kwargs) -> Catalogs: - """Parse a Cirrus payload and return a Catalogs instance - - Args: - payload (Dict): A payload from SNS, SQS, or containing an s3 URL to payload - - Returns: - Catalogs: A Catalogs instance - """ - catalogs = [] - if 'Records' in payload: - for record in [json.loads(r['body']) for r in payload['Records']]: - if 'Message' in record: - # SNS - cat = Catalog(json.loads(record['Message'])) - catalogs.append(cat) - else: - # SQS - catalogs.append(Catalog(record)) - elif 'url' in payload: - catalogs = [Catalog(s3().read_json(payload['url']))] - elif 'Parameters' in payload and 'url' in payload['Parameters']: - # this is Batch, get the output payload - url = payload['Parameters']['url'].replace('.json', '_out.json') - catalogs = [Catalog(s3().read_json(url))] - else: - catalogs = [Catalog(payload)] - return cls(catalogs) - @classmethod def from_catids(cls, catids: List[str], **kwargs) -> Catalogs: """Create Catalogs from list of Catalog IDs @@ -328,7 +323,7 @@ def from_statedb_paged(cls, collections, state, since: str=None, index: str='inp for it in resp['items']: cat = Catalog(s3().read_json(it['input_catalog'])) catalogs.append(cat) - logger.debug(f"Retrieved {len(catalogs)} from state db") + self.logger.debug(f"Retrieved {len(catalogs)} from state db") yield cls(catalogs, state_items=resp['items']) catalogs = [] while 'nextkey' in resp: @@ -336,7 +331,7 @@ def from_statedb_paged(cls, collections, state, since: str=None, index: str='inp for it in resp['items']: cat = Catalog(s3().read_json(it['input_catalog'])) catalogs.append(cat) - logger.debug(f"Retrieved {len(catalogs)} from state db") + self.logger.debug(f"Retrieved {len(catalogs)} from state db") yield cls(catalogs, state_items=resp['items']) """ @@ -390,7 +385,7 @@ def process(self, replace=False): if state in ['FAILED', ''] or _replace: catids.append(cat.process()) else: - logger.info(f"Skipping {cat['id']}, in {state} state") + logger.info(f"Skipping, input already in {state} state") continue return catids diff --git a/cirruslib/logging.py b/cirruslib/logging.py new file mode 100644 index 0000000..e44fefa --- /dev/null +++ b/cirruslib/logging.py @@ -0,0 +1,70 @@ +import logging +import logging.config +from os import getenv + + +config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "standard": { + "format": "%(asctime)s %(name)s %(levelname)s %(message)s", + "datefmt": "%Y-%m-%dT%H:%M:%S%z", + }, + "json": { + "format": "%(asctime)s %(name)s %(levelname)s %(message)s", + "datefmt": "%Y-%m-%dT%H:%M:%S%z", + "class": "pythonjsonlogger.jsonlogger.JsonFormatter" + } + }, + "handlers": { + "standard": { + "class": "logging.StreamHandler", + "formatter": "json" + } + }, + "loggers": { + "": { + "propagate": False + }, + "lambda_function": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + }, + "feeder": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + }, + "task": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + }, + "cirruslib": { + "handlers": ["standard"], + "level": getenv('CIRRUS_LOG_LEVEL', 'DEBUG') + } + } +} + +logging.config.dictConfig(config) + + +class DynamicLoggerAdapter(logging.LoggerAdapter): + + def __init__(self, *args, keys=None, **kwargs): + super(DynamicLoggerAdapter, self).__init__(*args, **kwargs) + self.keys = keys + + def process(self, msg, kwargs): + if self.keys is not None: + kwargs = {k: self.extra[k] for k in self.keys if k in self.extra} + return (msg, {"extra": kwargs}) + else: + return (msg, kwargs) + + +def get_task_logger(*args, catalog, **kwargs): + _logger = logging.getLogger(*args, **kwargs) + logger = DynamicLoggerAdapter(_logger, catalog, keys=['id', 'stac_version']) + return logger + diff --git a/cirruslib/stac.py b/cirruslib/stac.py index e820436..769ad96 100644 --- a/cirruslib/stac.py +++ b/cirruslib/stac.py @@ -6,11 +6,9 @@ from boto3utils import s3 from typing import Dict, Optional, List -from pystac import STAC_IO, Catalog, CatalogType, Collection +from pystac import STAC_IO, Catalog, CatalogType, Collection, Link logger = logging.getLogger(__name__) -logger.setLevel(os.getenv('CIRRUS_LOG_LEVEL', 'INFO')) - # envvars DATA_BUCKET = os.getenv('CIRRUS_DATA_BUCKET', None) @@ -18,48 +16,67 @@ STAC_VERSION = os.getenv('CIRRUS_STAC_VERSION', '1.0.0-beta.2') DESCRIPTION = os.getenv('CIRRUS_STAC_DESCRIPTION', 'Cirrus STAC') AWS_REGION = os.getenv('AWS_REGION') +PUBLISH_TOPIC = os.getenv('CIRRUS_PUBLISH_TOPIC_ARN', None) + +# root catalog +ROOT_URL = f"s3://{DATA_BUCKET}" +if PUBLIC_CATALOG: + ROOT_URL = s3.s3_to_https(ROOT_URL, region=AWS_REGION) -ROOT_URL = f"s3://{DATA_BUCKET}/catalog.json" +# boto3 clients +snsclient = boto3.client('sns') def s3stac_read(uri): - if uri.startswith('s3'): - return json.dumps(s3().read_json(uri)) - else: - return STAC_IO.default_read_text_method(uri) + if uri.startswith('http'): + uri = s3.https_to_s3(uri) + return json.dumps(s3().read_json(uri)) def s3stac_write(uri, txt): extra = { 'ContentType': 'application/json' } - if uri.startswith('s3'): - s3().upload_json(json.loads(txt), uri, extra=extra, public=PUBLIC_CATALOG) - else: - STAC_IO.default_write_text_method(uri, txt) + if uri.startswith('http'): + uri = s3.https_to_s3(uri) + s3().upload_json(json.loads(txt), uri, extra=extra, public=PUBLIC_CATALOG) STAC_IO.read_text_method = s3stac_read STAC_IO.write_text_method = s3stac_write -def get_root_catalog() -> Dict: +def get_root_catalog(): """Get Cirrus root catalog from s3 Returns: Dict: STAC root catalog """ - if s3().exists(ROOT_URL): - cat = Catalog.from_file(ROOT_URL) + caturl = f"{ROOT_URL}/catalog.json" + if s3().exists(caturl): + cat = Catalog.from_file(caturl) else: catid = DATA_BUCKET.split('-data-')[0] cat = Catalog(id=catid, description=DESCRIPTION) + cat.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) logger.debug(f"Fetched {cat.describe()}") return cat # add this collection to Cirrus catalog -def add_collection(collection): - cat = get_root_catalog() - col = Collection(**collection) - cat.add_child(col) - cat.normalize_and_save(ROOT_URL, CatalogType=CatalogType.ABSOLUTE_PUBLISHED) - return cat \ No newline at end of file +def add_collections(collections, publish=True): + + for collection in collections: + collection.remove_links('child') + link = Link('copied_from', collection) + collection.add_link(link, collection.get_self_href()) + ROOT_CATALOG.add_child(collection) + if publish: + child_json = json.dumps(collection.to_dict()) + logger.debug(f"Publishing {collection.id}: {child_json}") + response = snsclient.publish(TopicArn=PUBLISH_TOPIC, Message=child_json) + logger.debug(f"SNS Publish response: {json.dumps(response)}") + + ROOT_CATALOG.normalize_and_save(ROOT_URL, CatalogType.ABSOLUTE_PUBLISHED) + return ROOT_CATALOG + + +ROOT_CATALOG = get_root_catalog() \ No newline at end of file diff --git a/cirruslib/statedb.py b/cirruslib/statedb.py index d628d00..22d9ba8 100644 --- a/cirruslib/statedb.py +++ b/cirruslib/statedb.py @@ -1,26 +1,25 @@ import boto3 import json +import logging import os from boto3utils import s3 from boto3.dynamodb.conditions import Key -from datetime import datetime, timedelta -from logging import getLogger -from traceback import format_exc +from datetime import datetime, timedelta, timezone from typing import Dict, Optional, List -logger = getLogger(__name__) -logger.setLevel(os.getenv('CIRRUS_LOG_LEVEL', 'INFO')) - # envvars CATALOG_BUCKET = os.getenv('CIRRUS_CATALOG_BUCKET') -STATES = ['QUEUED', 'PROCESSING', 'COMPLETED', 'FAILED', 'INVALID'] +STATES = ['PROCESSING', 'COMPLETED', 'FAILED', 'INVALID'] INDEX_KEYS = { 'input_state': 'input_collections', 'output_state': 'output_collections' } +# logging +logger = logging.getLogger(__name__) + class StateDB: @@ -35,14 +34,14 @@ def __init__(self, table_name: str=os.getenv('CIRRUS_STATE_DB', 'test')): self.table_name = table_name self.table = self.db.Table(table_name) - def create_item(self, catalog: Dict, state: str='QUEUED'): + def create_item(self, catalog: Dict, state: str='PROCESSING'): """Create an item in DynamoDB Args: catalog (Dict): A Cirrus Input Catalog - state (str, optional): Set items to this state. Defaults to 'QUEUED'. + state (str, optional): Set items to this state. Defaults to 'PROCESSING'. """ - now = datetime.now().isoformat() + now = datetime.now(timezone.utc).isoformat() opts = catalog['process']['output_options'] output_collections = '/'.join(sorted(opts['collections'].keys())) key = self.catid_to_key(catalog['id']) @@ -60,7 +59,7 @@ def create_item(self, catalog: Dict, state: str='QUEUED'): def add_item(self, catalog, execution): """ Adds new item with state function execution """ - now = datetime.now().isoformat() + now = datetime.now(timezone.utc).isoformat() opts = catalog['process']['output_options'] output_collections = '/'.join(sorted(opts['collections'].keys())) key = self.catid_to_key(catalog['id']) @@ -74,13 +73,13 @@ def add_item(self, catalog, execution): 'execution': execution } ) - logger.debug(f"Created DynamoDB Item {catalog['id']}") + logger.debug("Created DynamoDB Item", extra={'id':catalog['id']}) return response def add_failed_item(self, catalog, error_message): """ Adds new item as failed """ """ Adds new item with state function execution """ - now = datetime.now().isoformat() + now = datetime.now(timezone.utc).isoformat() opts = catalog['process']['output_options'] output_collections = '/'.join(sorted(opts['collections'].keys())) key = self.catid_to_key(catalog['id']) @@ -94,13 +93,13 @@ def add_failed_item(self, catalog, error_message): 'error_message': error_message } ) - logger.debug(f"Created DynamoDB Item {catalog['id']}") + logger.debug("Created DynamoDB Item", extra={'id':catalog['id']}) return response def delete_item(self, catid: str): key = self.catid_to_key(catid) response = self.table.delete_item(Key=key) - logger.debug(f"Removed DynamoDB Item {catid}") + logger.debug("Removed DynamoDB Item", extra={'id': catid}) return response def get_dbitem(self, catid: str) -> Dict: @@ -117,7 +116,6 @@ def get_dbitem(self, catid: str) -> Dict: """ try: response = self.table.get_item(Key=self.catid_to_key(catid)) - logger.debug(f"Fetched {response['Item']}") return response['Item'] except Exception as err: logger.info(f"Error fetching item {catid}: {err}") @@ -149,8 +147,7 @@ def get_dbitems(self, catids: List[str]) -> List[Dict]: return items except Exception as err: msg = f"Error fetching items {catids} ({err})" - logger.error(msg) - logger.error(format_exc()) + logger.error(msg, exc_info=True) raise Exception(msg) from err def get_counts(self, collection: str, state: str=None, since: str=None, @@ -199,7 +196,7 @@ def get_items_page(self, collection: str, state: str, since: Optional[str]=None, Args: collection (str): /-separated list of collections (input or output depending on index) - state (str): State of Items to get (QUEUED, PROCESSING, COMPLETED, FAILED, INVALID) + state (str): State of Items to get (PROCESSING, COMPLETED, FAILED, INVALID) since (Optional[str], optional): Get Items since this amount of time in the past. Defaults to None. index (str, optional): Query this index (input_state or output_state). Defaults to 'input_state'. @@ -238,10 +235,8 @@ def get_items(self, *args, limit=None, **kwargs) -> Dict: """ resp = self.get_items_page(*args, **kwargs) items = resp['items'] - logger.debug(f"Fetched page of {len(items)} items from statedb") while 'nextkey' in resp and (limit is None or len(items) < limit): resp = self.get_items_page(*args, nextkey=resp['nextkey'], **kwargs) - logger.debug(f"Fetched page of {len(resp['items'])} items from statedb") items += resp['items'] if limit is None or len(items) < limit: return items @@ -254,7 +249,7 @@ def get_state(self, catid: str) -> str: catid (str): The catalog ID Returns: - str: Current state: QUEUED, PROCESSING, COMPLETED, FAILED, INVALID + str: Current state: PROCESSING, COMPLETED, FAILED, INVALID """ response = self.table.get_item(Key=self.catid_to_key(catid)) if 'Item' in response: @@ -292,7 +287,7 @@ def set_processing(self, catid: str, execution: str) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, execution=:exe', ExpressionAttributeValues={ - ':p': f"PROCESSING_{datetime.now().isoformat()}", + ':p': f"PROCESSING_{datetime.now(timezone.utc).isoformat()}", ':exe': execution } ) @@ -312,7 +307,7 @@ def set_completed(self, catid: str, urls: List[str]) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, output_urls=:urls', ExpressionAttributeValues={ - ':p': f"COMPLETED_{datetime.now().isoformat()}", + ':p': f"COMPLETED_{datetime.now(timezone.utc).isoformat()}", ':urls': urls } ) @@ -332,7 +327,7 @@ def set_failed(self, catid: str, msg: str) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, error_message=:err', ExpressionAttributeValues={ - ':p': f"FAILED_{datetime.now().isoformat()}", + ':p': f"FAILED_{datetime.now(timezone.utc).isoformat()}", ':err': msg } ) @@ -352,7 +347,7 @@ def set_invalid(self, catid: str, msg: str) -> str: Key=self.catid_to_key(catid), UpdateExpression='SET current_state=:p, error_message=:err', ExpressionAttributeValues={ - ':p': f"INVALID_{datetime.now().isoformat()}", + ':p': f"INVALID_{datetime.now(timezone.utc).isoformat()}", ':err': msg } ) @@ -374,9 +369,9 @@ def query(self, collection: str, state: str=None, since: str=None, """ expr = Key(INDEX_KEYS[index]).eq(collection) if state and since: - start = datetime.now() - self.since_to_timedelta(since) + start = datetime.now(timezone.utc) - self.since_to_timedelta(since) begin = f"{state}_{start.isoformat()}" - end = f"{state}_{datetime.now().isoformat()}" + end = f"{state}_{datetime.now(timezone.utc).isoformat()}" expr = expr & Key('current_state').between(begin, end) elif state: expr = expr & Key('current_state').begins_with(state) diff --git a/cirruslib/transfer.py b/cirruslib/transfer.py index acc6251..03f7d83 100644 --- a/cirruslib/transfer.py +++ b/cirruslib/transfer.py @@ -13,7 +13,6 @@ from typing import Dict, Optional, List logger = logging.getLogger(__name__) -logger.setLevel(getenv('CIRRUS_LOG_LEVEL', 'INFO')) # get data bucket to upload to DATA_BUCKET = getenv('CIRRUS_DATA_BUCKET') @@ -49,7 +48,9 @@ def get_s3_session(bucket: str=None, s3url: str=None, **kwargs) -> s3: creds.update(_creds) logger.debug(f"Using credentials for bucket {bucket}: {json.dumps(creds)}") except ClientError: - logger.debug(f"Using default credentials for bucket {bucket}") + # using default credentials + pass + requester_pays = creds.pop('requester_pays', False) session = boto3.Session(**creds) @@ -168,7 +169,7 @@ def upload_item_assets(item: Dict, assets: List[str]=None, public_assets: List[s s3_session = get_s3_session(parts['bucket']) # upload - logger.info(f"Uploading {filename} to {url}") + logger.debug(f"Uploading {filename} to {url}") url_out = s3_session.upload(filename, url, public=public, extra=_headers, http_url=not s3_urls) _item['assets'][key]['href'] = url_out return _item diff --git a/cirruslib/utils.py b/cirruslib/utils.py index 8e72e14..6bb6d18 100644 --- a/cirruslib/utils.py +++ b/cirruslib/utils.py @@ -9,10 +9,9 @@ from os import getenv from string import Formatter, Template from typing import Dict, Optional, List +from collections.abc import Mapping -# configure logger - CRITICAL, ERROR, WARNING, INFO, DEBUG logger = logging.getLogger(__name__) -logger.setLevel(getenv('CIRRUS_LOG_LEVEL', 'INFO')) batch_client = boto3.client('batch') @@ -73,4 +72,39 @@ def get_path(item: Dict, template: str='${collection}/${id}') -> str: # Item property else: subs[key] = item['properties'][key.replace('__colon__', ':')] - return Template(_template).substitute(**subs).replace('__colon__', ':') \ No newline at end of file + return Template(_template).substitute(**subs).replace('__colon__', ':') + + +# from https://gist.github.com/angstwad/bf22d1822c38a92ec0a9#gistcomment-2622319 +def dict_merge(dct, merge_dct, add_keys=True): + """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of + updating only top-level keys, dict_merge recurses down into dicts nested + to an arbitrary depth, updating keys. The ``merge_dct`` is merged into + ``dct``. + This version will return a copy of the dictionary and leave the original + arguments untouched. + The optional argument ``add_keys``, determines whether keys which are + present in ``merge_dict`` but not ``dct`` should be included in the + new dict. + Args: + dct (dict) onto which the merge is executed + merge_dct (dict): dct merged into dct + add_keys (bool): whether to add new keys + Returns: + dict: updated dict + """ + dct = dct.copy() + if not add_keys: + merge_dct = { + k: merge_dct[k] + for k in set(dct).intersection(set(merge_dct)) + } + + for k, v in merge_dct.items(): + if (k in dct and isinstance(dct[k], dict) + and isinstance(merge_dct[k], Mapping)): + dct[k] = dict_merge(dct[k], merge_dct[k], add_keys=add_keys) + else: + dct[k] = merge_dct[k] + + return dct \ No newline at end of file diff --git a/cirruslib/version.py b/cirruslib/version.py index e1424ed..73e3bb4 100644 --- a/cirruslib/version.py +++ b/cirruslib/version.py @@ -1 +1 @@ -__version__ = '0.3.1' +__version__ = '0.3.2' diff --git a/requirements.txt b/requirements.txt index 0f2f760..6e10b72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ boto3-utils~=0.3.1 -pystac==0.5.0-RC1 +pystac~=0.5 requests>=2.24.0 +python-json-logger~=2.0 diff --git a/tests/fixtures/test-catalog.json b/tests/fixtures/test-catalog.json new file mode 100644 index 0000000..9a948de --- /dev/null +++ b/tests/fixtures/test-catalog.json @@ -0,0 +1,38 @@ +{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "tiles-32-R-MS-2020-10-20-0", + "collection": "sentinel-s2-l2a-aws", + "properties": {}, + "assets": { + "json": { + "href": "https://roda.sentinel-hub.com/sentinel-s2-l2a/tiles/32/R/MS/2020/10/20/0/tileInfo.json" + } + }, + "links": [] + } + ], + "process": { + "description": "Convert Original Sentinel-2 metadata to STAC and publish", + "input_collections": [ + "sentinel-s2-l2a-aws" + ], + "workflow": "publish-sentinel", + "output_options": { + "path_template": "/${collection}/${sentinel:utm_zone}/${sentinel:latitude_band}/${sentinel:grid_square}/${year}/${month}/${id}", + "collections": { + "sentinel-s2-l1c": ".*L1C", + "sentinel-s2-l2a": ".*L2A" + } + }, + "tasks": { + "publish": { + "public": true + } + }, + "replace": false + }, + "id": "sentinel-s2-l2a-aws/workflow-publish-sentinel/tiles-32-R-MS-2020-10-20-0" +} \ No newline at end of file diff --git a/tests/test_catalog.py b/tests/test_catalog.py new file mode 100644 index 0000000..b1d7a6d --- /dev/null +++ b/tests/test_catalog.py @@ -0,0 +1,20 @@ +import os +import json +import unittest + +from cirruslib import Catalog + +testpath = os.path.dirname(__file__) + + +class TestClassMethods(unittest.TestCase): + + def open_fixture(self, filename='test-catalog.json'): + with open(os.path.join(testpath, filename)) as f: + data = json.loads(f.read()) + return data + + def test_open_catalog(self): + data = self.open_fixture() + cat = Catalog(**data) + import pdb; pdb.set_trace()