From 58fe44a657b2fa983837803c7f23c2695eeed3e1 Mon Sep 17 00:00:00 2001 From: pbegle Date: Fri, 26 Feb 2021 17:47:50 -0800 Subject: [PATCH 01/10] Handling cases where records are None. --- tap_youtube_analytics/sync.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 5e8788c..16c722d 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -223,6 +223,8 @@ def sync_playlist_items(client, time_extracted = utils.now() for record in records: + if record is None: + continue for key in id_fields: if not record.get(key): raise ValueError('Stream: {}, Missing key: {}'.format(stream_name, key)) @@ -303,6 +305,8 @@ def sync_videos(client, i = 0 for search_record in search_records: + if search_record is None: + continue video_id = search_record.get('id', {}).get('videoId') video_ids.append(video_id) From 23dd7b2b83a2c46237e05bbf2a23367af9dd2555 Mon Sep 17 00:00:00 2001 From: pbegle Date: Mon, 1 Mar 2021 16:38:38 -0800 Subject: [PATCH 02/10] Fixed sync_videos function so that only 'new' videos are synced. --- tap_youtube_analytics/sync.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 16c722d..3c081c0 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -307,18 +307,17 @@ def sync_videos(client, for search_record in search_records: if search_record is None: continue - video_id = search_record.get('id', {}).get('videoId') - video_ids.append(video_id) # Bookmarking bookmark_date = search_record.get('snippet', {}).get('publishedAt') bookmark_dttm = strptime_to_utc(bookmark_date) - if i == 0: - max_bookmark_value = bookmark_date - # Stop looping when bookmark is before last datetime - if bookmark_dttm < last_dttm: + + # Only process records newer than previous run + if bookmark_dttm > last_dttm: + video_id = search_record.get('id', {}).get('videoId') + video_ids.append(video_id) + else: break - i = i + 1 # Break into chunks of 50 video_ids video_id_chunks = chunks(video_ids, 50) From 1ab9d42d823fceb89d46921a513b72e87f49f399 Mon Sep 17 00:00:00 2001 From: pbegle Date: Mon, 1 Mar 2021 19:19:02 -0800 Subject: [PATCH 03/10] Fixed sync_playlist_items function to only sync new records. --- tap_youtube_analytics/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 3c081c0..885d35b 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -248,7 +248,7 @@ def sync_playlist_items(client, max_bookmark_value = strftime(bookmark_dttm) # Only sync records whose bookmark is after the last_datetime - if bookmark_dttm >= last_dttm: + if bookmark_dttm > last_dttm: write_record(stream_name, transformed_record, \ time_extracted=time_extracted) counter.increment() From f3c2d5bc1c8f8d1d1b746b9e9cad66b832be2144 Mon Sep 17 00:00:00 2001 From: pbegle Date: Wed, 3 Mar 2021 17:18:29 -0800 Subject: [PATCH 04/10] Fixed bookmark issue with sync_videos. --- tap_youtube_analytics/sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 885d35b..2e32a33 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -316,6 +316,7 @@ def sync_videos(client, if bookmark_dttm > last_dttm: video_id = search_record.get('id', {}).get('videoId') video_ids.append(video_id) + max_bookmark_value = strftime(max(bookmark_dttm, strptime_to_utc(max_bookmark_value))) else: break From 76ff2b7b525520c3431f762cd48ee54d53ec5d33 Mon Sep 17 00:00:00 2001 From: pbegle Date: Wed, 3 Mar 2021 17:27:03 -0800 Subject: [PATCH 05/10] Changed all instances of LOGGER.error to LOGGER.debug. --- tap_youtube_analytics/sync.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 2e32a33..62f0032 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -22,7 +22,7 @@ def write_schema(catalog, stream_name): try: singer.write_schema(stream_name, schema, stream.key_properties) except OSError as err: - LOGGER.error('OS Error writing schema for: %s', stream_name) + LOGGER.debug('OS Error writing schema for: %s', stream_name) raise err @@ -30,12 +30,12 @@ def write_record(stream_name, record, time_extracted): try: singer.messages.write_record(stream_name, record, time_extracted=time_extracted) except OSError as err: - LOGGER.error('OS Error writing record for: %s', stream_name) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('OS Error writing record for: %s', stream_name) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err except TypeError as err: - LOGGER.error('Type Error writing record for: %s', stream_name) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Type Error writing record for: %s', stream_name) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err @@ -110,8 +110,8 @@ def sync_channels(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err write_record(stream_name, transformed_record, time_extracted=time_extracted) @@ -161,8 +161,8 @@ def sync_playlists(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err write_record(stream_name, transformed_record, time_extracted=time_extracted) @@ -236,8 +236,8 @@ def sync_playlist_items(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err # Bookmarking @@ -349,8 +349,8 @@ def sync_videos(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err write_record(stream_name, transformed_record, time_extracted=time_extracted) @@ -471,8 +471,8 @@ def sync_report(client, schema, stream_metadata) except Exception as err: - LOGGER.error('Transformer Error: %s', err) - LOGGER.error('Stream: %s, record: %s', stream_name, record) + LOGGER.debug('Transformer Error: %s', err) + LOGGER.debug('Stream: %s, record: %s', stream_name, record) raise err # Bookmarking From d8b243e6f4b31f26b479da0d77a5004f75fedbef Mon Sep 17 00:00:00 2001 From: pbegle Date: Fri, 5 Mar 2021 16:38:39 -0800 Subject: [PATCH 06/10] Implemented feature to write channel_id into video stream state. --- tap_youtube_analytics/sync.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 62f0032..f4a8a98 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -39,9 +39,16 @@ def write_record(stream_name, record, time_extracted): raise err -def get_bookmark(state, stream, default): +def get_bookmark(state, stream, default, parent_id=None): if (state is None) or ('bookmarks' not in state): return default + if parent_id: + return ( + state + .get('bookmarks', {}) + .get(stream, {}) + .get(parent_id, default) + ) return ( state .get('bookmarks', {}) @@ -49,10 +56,15 @@ def get_bookmark(state, stream, default): ) -def write_bookmark(state, stream, value): +def write_bookmark(state, stream, value, parent_id=None): if 'bookmarks' not in state: state['bookmarks'] = {} - state['bookmarks'][stream] = value + if stream not in state['bookmarks']: + state['bookmarks'][stream] = {} + if parent_id: + state['bookmarks'][stream][parent_id] = value + else: + state['bookmarks'][stream] = value LOGGER.info('Write state for stream: %s, value: %s', stream, value) singer.write_state(state) @@ -276,11 +288,6 @@ def sync_videos(client, params = endpoint_config.get('params', {}) channel_list = channel_ids.split(',') - # Initialize bookmarking - last_datetime = get_bookmark(state, stream_name, start_date) - last_dttm = strptime_to_utc(last_datetime) - max_bookmark_value = last_datetime - search_params = { 'part': 'id,snippet', 'channelId': '{channel_id}', @@ -292,6 +299,12 @@ def sync_videos(client, with metrics.record_counter(stream_name) as counter: # Loop each channel_id from config for channel_id in channel_list: + write_schema(catalog, stream_name) + # Initialize bookmarking + last_datetime = get_bookmark(state, stream_name, start_date, parent_id=channel_id) + last_dttm = strptime_to_utc(last_datetime) + max_bookmark_value = last_datetime + video_ids = [] search_params['channelId'] = channel_id search_records = get_paginated_data( @@ -356,8 +369,9 @@ def sync_videos(client, write_record(stream_name, transformed_record, time_extracted=time_extracted) counter.increment() - # Write bookmark after all records synced due to sort descending (most recent first) - write_bookmark(state, stream_name, max_bookmark_value) + # Write bookmark after all records synced due to sort descending (most recent first) + # Include channel_id in bookmark + write_bookmark(state, stream_name, max_bookmark_value, parent_id=channel_id) LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value)) return counter.value From 56f5c394fc794f222347518dc030e8d8e0810b2c Mon Sep 17 00:00:00 2001 From: pbegle Date: Fri, 5 Mar 2021 17:41:03 -0800 Subject: [PATCH 07/10] Implemented feature to write playlist_id into playlist_items stream state. --- tap_youtube_analytics/sync.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index f4a8a98..0d2da24 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -202,11 +202,6 @@ def sync_playlist_items(client, playlist_params = STREAMS.get('playlists', {}).get('params', {}) channel_list = channel_ids.split(',') - # Initialize bookmarking - last_datetime = get_bookmark(state, stream_name, start_date) - last_dttm = strptime_to_utc(last_datetime) - max_bookmark_value = last_datetime - with metrics.record_counter(stream_name) as counter: # Loop each channel_id from config for channel_id in channel_list: @@ -224,6 +219,12 @@ def sync_playlist_items(client, for playlist in playlists: playlist_id = playlist.get('id') params['playlistId'] = playlist_id + + # Initialize bookmarking + last_datetime = get_bookmark(state, stream_name, start_date, parent_id=playlist_id) + last_dttm = strptime_to_utc(last_datetime) + max_bookmark_value = last_datetime + write_schema(catalog, stream_name) records = get_paginated_data( client=client, url=DATA_URL, @@ -265,8 +266,8 @@ def sync_playlist_items(client, time_extracted=time_extracted) counter.increment() - # Youtube API does not allow page/batch sorting for playlist_items - write_bookmark(state, stream_name, max_bookmark_value) + # Youtube API does not allow page/batch sorting for playlist_items + write_bookmark(state, stream_name, max_bookmark_value, parent_id=playlist_id) LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value)) return counter.value From 545cdea8a7984fe820e86aaab62e2e20bf2c212a Mon Sep 17 00:00:00 2001 From: pbegle Date: Mon, 8 Mar 2021 15:20:15 -0800 Subject: [PATCH 08/10] Adding job_id to report bookmarks. --- tap_youtube_analytics/sync.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tap_youtube_analytics/sync.py b/tap_youtube_analytics/sync.py index 0d2da24..e634f88 100644 --- a/tap_youtube_analytics/sync.py +++ b/tap_youtube_analytics/sync.py @@ -393,19 +393,6 @@ def sync_report(client, report_type = endpoint_config.get('report_type') dimensions = endpoint_config.get('dimensions', []) - # Initialize bookmarking - # There is a 2-3 day lag (sometimes up to 6-7 days lag) in YouTube results reconcilliation - now_dttm = utils.now() - attribution_start_dttm = now_dttm - timedelta(days=ATTRIBUTION_DAYS) - last_datetime = get_bookmark(state, stream_name, start_date) - last_dttm = strptime_to_utc(last_datetime) - - if attribution_start_dttm < last_dttm: - last_dttm = attribution_start_dttm - last_datetime = strftime(last_dttm) - - max_bookmark_value = last_datetime - with metrics.record_counter(stream_name) as counter: job_id = None job_params = { @@ -431,6 +418,19 @@ def sync_report(client, job_id = job.get('id') break + # Initialize bookmarking + # There is a 2-3 day lag (sometimes up to 6-7 days lag) in YouTube results reconcilliation + now_dttm = utils.now() + attribution_start_dttm = now_dttm - timedelta(days=ATTRIBUTION_DAYS) + last_datetime = get_bookmark(state, stream_name, start_date, parent_id=job_id) + last_dttm = strptime_to_utc(last_datetime) + + if attribution_start_dttm < last_dttm: + last_dttm = attribution_start_dttm + last_datetime = strftime(last_dttm) + + max_bookmark_value = last_datetime + # Create job for stream if not job_exists if not job_exists: body = { @@ -503,7 +503,7 @@ def sync_report(client, counter.increment() # Write bookmark after all records synced due to sort descending (most recent first) - write_bookmark(state, stream_name, max_bookmark_value) + write_bookmark(state, stream_name, max_bookmark_value, parent_id=job_id) LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value)) return counter.value From f96f18bf479661cae9ca87a549a45c9b471f3df4 Mon Sep 17 00:00:00 2001 From: pbegle Date: Tue, 9 Mar 2021 12:04:59 -0800 Subject: [PATCH 09/10] Removing dimension mapping logic that altered the raw data. --- tap_youtube_analytics/transform.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tap_youtube_analytics/transform.py b/tap_youtube_analytics/transform.py index 776bf55..370552e 100644 --- a/tap_youtube_analytics/transform.py +++ b/tap_youtube_analytics/transform.py @@ -38,10 +38,6 @@ def transform_report_record(record, dimensions, report): new_record = record.copy() - map_path = get_abs_path('dim_lookup_map.json') - with open(map_path) as file: - dim_lookup_map = json.load(file) - dimension_values = {} for key, val in list(record.items()): @@ -49,16 +45,6 @@ def transform_report_record(record, dimensions, report): if key in dimensions: dimension_values[key] = val - # Transform dim values from codes to names using dim_lookup_map - if key in dim_lookup_map: - # lookup new_val, with a default for existing val (if not found) - new_val = dim_lookup_map[key].get(val, val) - if val == new_val: - LOGGER.warning('dim_lookup_map value not found; key: {}, value: {}'.format(key, val)) - new_record[key] = new_val - else: - new_record[key] = val - # Add report fields to data new_record['report_id'] = report.get('id') new_record['report_type_id'] = report.get('reportTypeId') From c7bac8047e8576cc8aa32609e7c16c2cacb6a998 Mon Sep 17 00:00:00 2001 From: pbegle Date: Mon, 15 Mar 2021 19:30:07 -0700 Subject: [PATCH 10/10] Upadating README. --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 566aa0d..a26e154 100644 --- a/README.md +++ b/README.md @@ -130,6 +130,20 @@ The [**Youtube Analytics Authentication**](https://docs.google.com/document/d/1F ```bash TBD ``` + +## Notes about the state.json file + +The following streams include the parent id in the output of the state file: +| stream | parent id | +| ------ | --------- | +| videos | channel_id | +| playlist | playlist_id | +| reports | job_id | + +For example, the file will look like this: +```json +{"bookmarks": {"playlist_items": {"id12345": "2020-12-01T15:10:32.000000Z", "id45678": "2005-01-01T00:00:00Z"}}} +``` --- Copyright © 2019 Stitch