diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index f6cce69e..a822cd89 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -16,6 +16,8 @@ REQUIRED_CONFIG_KEYS = ["shop", "api_key"] LOGGER = singer.get_logger() +SDC_KEYS = {'id': 'integer', 'name': 'string', 'myshopify_domain': 'string'} + def initialize_shopify_client(): api_key = Context.config['api_key'] @@ -23,6 +25,8 @@ def initialize_shopify_client(): version = '2020-07' session = shopify.Session(shop, version, api_key) shopify.ShopifyResource.activate_session(session) + return shopify.Shop.current().attributes + def get_abs_path(path): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) @@ -68,6 +72,13 @@ def load_schema_references(): return refs + +def add_synthetic_keys_to_schema(schema): + for k in SDC_KEYS: + schema['properties']['_sdc_shop_' + k] = {'type': SDC_KEYS[k]} + return schema + + def discover(): raw_schemas = load_schemas() streams = [] @@ -79,12 +90,14 @@ def discover(): stream = Context.stream_objects[schema_name]() + catalog_schema = add_synthetic_keys_to_schema(singer.resolve_schema_references(schema, refs)) + # create and add catalog entry catalog_entry = { 'stream': schema_name, 'tap_stream_id': schema_name, - 'schema': singer.resolve_schema_references(schema, refs), - 'metadata' : get_discovery_metadata(stream, schema), + 'schema': catalog_schema, + 'metadata': get_discovery_metadata(stream, schema), 'key_properties': stream.key_properties, 'replication_key': stream.replication_key, 'replication_method': stream.replication_method @@ -107,7 +120,8 @@ def shuffle_streams(stream_name): Context.catalog["streams"] = top_half + bottom_half def sync(): - initialize_shopify_client() + shop_attributes = initialize_shopify_client() + sdc_fields = {"_sdc_shop_" + x: shop_attributes[x] for x in SDC_KEYS} # Emit all schemas first so we have them for child streams for stream in Context.catalog["streams"]: @@ -144,7 +158,7 @@ def sync(): extraction_time = singer.utils.now() record_schema = catalog_entry['schema'] record_metadata = metadata.to_map(catalog_entry['metadata']) - rec = transformer.transform(rec, record_schema, record_metadata) + rec = transformer.transform({**rec, **sdc_fields}, record_schema, record_metadata) singer.write_record(stream_id, rec, time_extracted=extraction_time)