diff --git a/README.md b/README.md index 566aa0d..a26e154 100644 --- a/README.md +++ b/README.md @@ -130,6 +130,20 @@ The [**Youtube Analytics Authentication**](https://docs.google.com/document/d/1F ```bash TBD ``` + +## Notes about the state.json file + +The following streams include the parent id in the output of the state file: +| stream | parent id | +| ------ | --------- | +| videos | channel_id | +| playlist | playlist_id | +| reports | job_id | + +For example, the file will look like this: +```json +{"bookmarks": {"playlist_items": {"id12345": "2020-12-01T15:10:32.000000Z", "id45678": "2005-01-01T00:00:00Z"}}} +``` --- Copyright © 2019 Stitch diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 5e8788c..e634f88 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -22,7 +22,7 @@ def write_schema(catalog, stream_name): try: singer.write_schema(stream_name, schema, stream.key_properties) except OSError as err: - LOGGER.error('OS Error writing schema for: %s', stream_name) + LOGGER.debug('OS Error writing schema for: %s', stream_name) raise err @@ -30,18 +30,25 @@ def write_record(stream_name, record, time_extracted): try: singer.messages.write_record(stream_name, record, time_extracted=time_extracted) except OSError as err: - LOGGER.error('OS Error writing record for: %s', stream_name) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('OS Error writing record for: %s', stream_name) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err except TypeError as err: - LOGGER.error('Type Error writing record for: %s', stream_name) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Type Error writing record for: %s', stream_name) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err -def get_bookmark(state, stream, default): +def get_bookmark(state, stream, default, parent_id=None): if (state is None) or ('bookmarks' not in state): return default + if parent_id: + return ( + state + .get('bookmarks', {}) + .get(stream, {}) + .get(parent_id, default) + ) return ( state .get('bookmarks', {}) @@ -49,10 +56,15 @@ def get_bookmark(state, stream, default): ) -def write_bookmark(state, stream, value): +def write_bookmark(state, stream, value, parent_id=None): if 'bookmarks' not in state: state['bookmarks'] = {} - state['bookmarks'][stream] = value + if stream not in state['bookmarks']: + state['bookmarks'][stream] = {} + if parent_id: + state['bookmarks'][stream][parent_id] = value + else: + state['bookmarks'][stream] = value LOGGER.info('Write state for stream: %s, value: %s', stream, value) singer.write_state(state) @@ -110,8 +122,8 @@ def sync_channels(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err write_record(stream_name, transformed_record, time_extracted=time_extracted) @@ -161,8 +173,8 @@ def sync_playlists(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err write_record(stream_name, transformed_record, time_extracted=time_extracted) @@ -190,11 +202,6 @@ def sync_playlist_items(client, playlist_params = STREAMS.get('playlists', {}).get('params', {}) channel_list = channel_ids.split(',') - # Initialize bookmarking - last_datetime = get_bookmark(state, stream_name, start_date) - last_dttm = strptime_to_utc(last_datetime) - max_bookmark_value = last_datetime - with metrics.record_counter(stream_name) as counter: # Loop each channel_id from config for channel_id in channel_list: @@ -212,6 +219,12 @@ def sync_playlist_items(client, for playlist in playlists: playlist_id = playlist.get('id') params['playlistId'] = playlist_id + + # Initialize bookmarking + last_datetime = get_bookmark(state, stream_name, start_date, parent_id=playlist_id) + last_dttm = strptime_to_utc(last_datetime) + max_bookmark_value = last_datetime + write_schema(catalog, stream_name) records = get_paginated_data( client=client, url=DATA_URL, @@ -223,6 +236,8 @@ def sync_playlist_items(client, time_extracted = utils.now() for record in records: + if record is None: + continue for key in id_fields: if not record.get(key): raise ValueError('Stream: {}, Missing key: {}'.format(stream_name, key)) @@ -234,8 +249,8 @@ def sync_playlist_items(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err # Bookmarking @@ -246,13 +261,13 @@ def sync_playlist_items(client, max_bookmark_value = strftime(bookmark_dttm) # Only sync records whose bookmark is after the last_datetime - if bookmark_dttm >= last_dttm: + if bookmark_dttm > last_dttm: write_record(stream_name, transformed_record, \ time_extracted=time_extracted) counter.increment() - # Youtube API does not allow page/batch sorting for playlist_items - write_bookmark(state, stream_name, max_bookmark_value) + # Youtube API does not allow page/batch sorting for playlist_items + write_bookmark(state, stream_name, max_bookmark_value, parent_id=playlist_id) LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value)) return counter.value @@ -274,11 +289,6 @@ def sync_videos(client, params = endpoint_config.get('params', {}) channel_list = channel_ids.split(',') - # Initialize bookmarking - last_datetime = get_bookmark(state, stream_name, start_date) - last_dttm = strptime_to_utc(last_datetime) - max_bookmark_value = last_datetime - search_params = { 'part': 'id,snippet', 'channelId': '{channel_id}', @@ -290,6 +300,12 @@ def sync_videos(client, with metrics.record_counter(stream_name) as counter: # Loop each channel_id from config for channel_id in channel_list: + write_schema(catalog, stream_name) + # Initialize bookmarking + last_datetime = get_bookmark(state, stream_name, start_date, parent_id=channel_id) + last_dttm = strptime_to_utc(last_datetime) + max_bookmark_value = last_datetime + video_ids = [] search_params['channelId'] = channel_id search_records = get_paginated_data( @@ -303,18 +319,20 @@ def sync_videos(client, i = 0 for search_record in search_records: - video_id = search_record.get('id', {}).get('videoId') - video_ids.append(video_id) + if search_record is None: + continue # Bookmarking bookmark_date = search_record.get('snippet', {}).get('publishedAt') bookmark_dttm = strptime_to_utc(bookmark_date) - if i == 0: - max_bookmark_value = bookmark_date - # Stop looping when bookmark is before last datetime - if bookmark_dttm < last_dttm: + + # Only process records newer than previous run + if bookmark_dttm > last_dttm: + video_id = search_record.get('id', {}).get('videoId') + video_ids.append(video_id) + max_bookmark_value = strftime(max(bookmark_dttm, strptime_to_utc(max_bookmark_value))) + else: break - i = i + 1 # Break into chunks of 50 video_ids video_id_chunks = chunks(video_ids, 50) @@ -345,15 +363,16 @@ def sync_videos(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err write_record(stream_name, transformed_record, time_extracted=time_extracted) counter.increment() - # Write bookmark after all records synced due to sort descending (most recent first) - write_bookmark(state, stream_name, max_bookmark_value) + # Write bookmark after all records synced due to sort descending (most recent first) + # Include channel_id in bookmark + write_bookmark(state, stream_name, max_bookmark_value, parent_id=channel_id) LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value)) return counter.value @@ -374,19 +393,6 @@ def sync_report(client, report_type = endpoint_config.get('report_type') dimensions = endpoint_config.get('dimensions', []) - # Initialize bookmarking - # There is a 2-3 day lag (sometimes up to 6-7 days lag) in YouTube results reconcilliation - now_dttm = utils.now() - attribution_start_dttm = now_dttm - timedelta(days=ATTRIBUTION_DAYS) - last_datetime = get_bookmark(state, stream_name, start_date) - last_dttm = strptime_to_utc(last_datetime) - - if attribution_start_dttm < last_dttm: - last_dttm = attribution_start_dttm - last_datetime = strftime(last_dttm) - - max_bookmark_value = last_datetime - with metrics.record_counter(stream_name) as counter: job_id = None job_params = { @@ -412,6 +418,19 @@ def sync_report(client, job_id = job.get('id') break + # Initialize bookmarking + # There is a 2-3 day lag (sometimes up to 6-7 days lag) in YouTube results reconcilliation + now_dttm = utils.now() + attribution_start_dttm = now_dttm - timedelta(days=ATTRIBUTION_DAYS) + last_datetime = get_bookmark(state, stream_name, start_date, parent_id=job_id) + last_dttm = strptime_to_utc(last_datetime) + + if attribution_start_dttm < last_dttm: + last_dttm = attribution_start_dttm + last_datetime = strftime(last_dttm) + + max_bookmark_value = last_datetime + # Create job for stream if not job_exists if not job_exists: body = { @@ -467,8 +486,8 @@ def sync_report(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err # Bookmarking @@ -484,7 +503,7 @@ def sync_report(client, counter.increment() # Write bookmark after all records synced due to sort descending (most recent first) - write_bookmark(state, stream_name, max_bookmark_value) + write_bookmark(state, stream_name, max_bookmark_value, parent_id=job_id) LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value)) return counter.value diff --git a/tap_youtube_analytics/transform.py b/tap_youtube_analytics/transform.py index 776bf55..370552e 100644 --- a/tap_youtube_analytics/transform.py +++ b/tap_youtube_analytics/transform.py @@ -38,10 +38,6 @@ def transform_report_record(record, dimensions, report): new_record = record.copy() - map_path = get_abs_path('dim_lookup_map.json') - with open(map_path) as file: - dim_lookup_map = json.load(file) - dimension_values = {} for key, val in list(record.items()): @@ -49,16 +45,6 @@ def transform_report_record(record, dimensions, report): if key in dimensions: dimension_values[key] = val - # Transform dim values from codes to names using dim_lookup_map - if key in dim_lookup_map: - # lookup new_val, with a default for existing val (if not found) - new_val = dim_lookup_map[key].get(val, val) - if val == new_val: - LOGGER.warning('dim_lookup_map value not found; key: {}, value: {}'.format(key, val)) - new_record[key] = new_val - else: - new_record[key] = val - # Add report fields to data new_record['report_id'] = report.get('id') new_record['report_type_id'] = report.get('reportTypeId')