Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding most recent changes made to repo #1

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ The [**Youtube Analytics Authentication**](https://docs.google.com/document/d/1F
```bash
TBD
```

## Notes about the state.json file

The following streams include the parent id in the output of the state file:
| stream | parent id |
| ------ | --------- |
| videos | channel_id |
| playlist | playlist_id |
| reports | job_id |

For example, the file will look like this:
```json
{"bookmarks": {"playlist_items": {"id12345": "2020-12-01T15:10:32.000000Z", "id45678": "2005-01-01T00:00:00Z"}}}
```
---

Copyright © 2019 Stitch
127 changes: 73 additions & 54 deletions tap_youtube_analytics/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,49 @@ def write_schema(catalog, stream_name):
try:
singer.write_schema(stream_name, schema, stream.key_properties)
except OSError as err:
LOGGER.error('OS Error writing schema for: %s', stream_name)
LOGGER.debug('OS Error writing schema for: %s', stream_name)
raise err


def write_record(stream_name, record, time_extracted):
try:
singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
except OSError as err:
LOGGER.error('OS Error writing record for: %s', stream_name)
LOGGER.error('Stream: %s, record: %s', stream_name, record)
LOGGER.debug('OS Error writing record for: %s', stream_name)
LOGGER.debug('Stream: %s, record: %s', stream_name, record)
raise err
except TypeError as err:
LOGGER.error('Type Error writing record for: %s', stream_name)
LOGGER.error('Stream: %s, record: %s', stream_name, record)
LOGGER.debug('Type Error writing record for: %s', stream_name)
LOGGER.debug('Stream: %s, record: %s', stream_name, record)
raise err


def get_bookmark(state, stream, default):
def get_bookmark(state, stream, default, parent_id=None):
if (state is None) or ('bookmarks' not in state):
return default
if parent_id:
return (
state
.get('bookmarks', {})
.get(stream, {})
.get(parent_id, default)
)
return (
state
.get('bookmarks', {})
.get(stream, default)
)


def write_bookmark(state, stream, value):
def write_bookmark(state, stream, value, parent_id=None):
if 'bookmarks' not in state:
state['bookmarks'] = {}
state['bookmarks'][stream] = value
if stream not in state['bookmarks']:
state['bookmarks'][stream] = {}
if parent_id:
state['bookmarks'][stream][parent_id] = value
else:
state['bookmarks'][stream] = value
LOGGER.info('Write state for stream: %s, value: %s', stream, value)
singer.write_state(state)

Expand Down Expand Up @@ -110,8 +122,8 @@ def sync_channels(client,
schema,
stream_metadata)
except Exception as err:
LOGGER.error('Transformer Error: %s', err)
LOGGER.error('Stream: %s, record: %s', stream_name, record)
LOGGER.debug('Transformer Error: %s', err)
LOGGER.debug('Stream: %s, record: %s', stream_name, record)
raise err

write_record(stream_name, transformed_record, time_extracted=time_extracted)
Expand Down Expand Up @@ -161,8 +173,8 @@ def sync_playlists(client,
schema,
stream_metadata)
except Exception as err:
LOGGER.error('Transformer Error: %s', err)
LOGGER.error('Stream: %s, record: %s', stream_name, record)
LOGGER.debug('Transformer Error: %s', err)
LOGGER.debug('Stream: %s, record: %s', stream_name, record)
raise err

write_record(stream_name, transformed_record, time_extracted=time_extracted)
Expand Down Expand Up @@ -190,11 +202,6 @@ def sync_playlist_items(client,
playlist_params = STREAMS.get('playlists', {}).get('params', {})
channel_list = channel_ids.split(',')

# Initialize bookmarking
last_datetime = get_bookmark(state, stream_name, start_date)
last_dttm = strptime_to_utc(last_datetime)
max_bookmark_value = last_datetime

with metrics.record_counter(stream_name) as counter:
# Loop each channel_id from config
for channel_id in channel_list:
Expand All @@ -212,6 +219,12 @@ def sync_playlist_items(client,
for playlist in playlists:
playlist_id = playlist.get('id')
params['playlistId'] = playlist_id

# Initialize bookmarking
last_datetime = get_bookmark(state, stream_name, start_date, parent_id=playlist_id)
last_dttm = strptime_to_utc(last_datetime)
max_bookmark_value = last_datetime
write_schema(catalog, stream_name)
records = get_paginated_data(
client=client,
url=DATA_URL,
Expand All @@ -223,6 +236,8 @@ def sync_playlist_items(client,
time_extracted = utils.now()

for record in records:
if record is None:
continue
for key in id_fields:
if not record.get(key):
raise ValueError('Stream: {}, Missing key: {}'.format(stream_name, key))
Expand All @@ -234,8 +249,8 @@ def sync_playlist_items(client,
schema,
stream_metadata)
except Exception as err:
LOGGER.error('Transformer Error: %s', err)
LOGGER.error('Stream: %s, record: %s', stream_name, record)
LOGGER.debug('Transformer Error: %s', err)
LOGGER.debug('Stream: %s, record: %s', stream_name, record)
raise err

# Bookmarking
Expand All @@ -246,13 +261,13 @@ def sync_playlist_items(client,
max_bookmark_value = strftime(bookmark_dttm)

# Only sync records whose bookmark is after the last_datetime
if bookmark_dttm >= last_dttm:
if bookmark_dttm > last_dttm:
write_record(stream_name, transformed_record, \
time_extracted=time_extracted)
counter.increment()

# Youtube API does not allow page/batch sorting for playlist_items
write_bookmark(state, stream_name, max_bookmark_value)
# Youtube API does not allow page/batch sorting for playlist_items
write_bookmark(state, stream_name, max_bookmark_value, parent_id=playlist_id)

LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value))
return counter.value
Expand All @@ -274,11 +289,6 @@ def sync_videos(client,
params = endpoint_config.get('params', {})
channel_list = channel_ids.split(',')

# Initialize bookmarking
last_datetime = get_bookmark(state, stream_name, start_date)
last_dttm = strptime_to_utc(last_datetime)
max_bookmark_value = last_datetime

search_params = {
'part': 'id,snippet',
'channelId': '{channel_id}',
Expand All @@ -290,6 +300,12 @@ def sync_videos(client,
with metrics.record_counter(stream_name) as counter:
# Loop each channel_id from config
for channel_id in channel_list:
write_schema(catalog, stream_name)
# Initialize bookmarking
last_datetime = get_bookmark(state, stream_name, start_date, parent_id=channel_id)
last_dttm = strptime_to_utc(last_datetime)
max_bookmark_value = last_datetime

video_ids = []
search_params['channelId'] = channel_id
search_records = get_paginated_data(
Expand All @@ -303,18 +319,20 @@ def sync_videos(client,

i = 0
for search_record in search_records:
video_id = search_record.get('id', {}).get('videoId')
video_ids.append(video_id)
if search_record is None:
continue

# Bookmarking
bookmark_date = search_record.get('snippet', {}).get('publishedAt')
bookmark_dttm = strptime_to_utc(bookmark_date)
if i == 0:
max_bookmark_value = bookmark_date
# Stop looping when bookmark is before last datetime
if bookmark_dttm < last_dttm:

# Only process records newer than previous run
if bookmark_dttm > last_dttm:
video_id = search_record.get('id', {}).get('videoId')
video_ids.append(video_id)
max_bookmark_value = strftime(max(bookmark_dttm, strptime_to_utc(max_bookmark_value)))
else:
break
i = i + 1

# Break into chunks of 50 video_ids
video_id_chunks = chunks(video_ids, 50)
Expand Down Expand Up @@ -345,15 +363,16 @@ def sync_videos(client,
schema,
stream_metadata)
except Exception as err:
LOGGER.error('Transformer Error: %s', err)
LOGGER.error('Stream: %s, record: %s', stream_name, record)
LOGGER.debug('Transformer Error: %s', err)
LOGGER.debug('Stream: %s, record: %s', stream_name, record)
raise err

write_record(stream_name, transformed_record, time_extracted=time_extracted)
counter.increment()

# Write bookmark after all records synced due to sort descending (most recent first)
write_bookmark(state, stream_name, max_bookmark_value)
# Write bookmark after all records synced due to sort descending (most recent first)
# Include channel_id in bookmark
write_bookmark(state, stream_name, max_bookmark_value, parent_id=channel_id)

LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value))
return counter.value
Expand All @@ -374,19 +393,6 @@ def sync_report(client,
report_type = endpoint_config.get('report_type')
dimensions = endpoint_config.get('dimensions', [])

# Initialize bookmarking
# There is a 2-3 day lag (sometimes up to 6-7 days lag) in YouTube results reconcilliation
now_dttm = utils.now()
attribution_start_dttm = now_dttm - timedelta(days=ATTRIBUTION_DAYS)
last_datetime = get_bookmark(state, stream_name, start_date)
last_dttm = strptime_to_utc(last_datetime)

if attribution_start_dttm < last_dttm:
last_dttm = attribution_start_dttm
last_datetime = strftime(last_dttm)

max_bookmark_value = last_datetime

with metrics.record_counter(stream_name) as counter:
job_id = None
job_params = {
Expand All @@ -412,6 +418,19 @@ def sync_report(client,
job_id = job.get('id')
break

# Initialize bookmarking
# There is a 2-3 day lag (sometimes up to 6-7 days lag) in YouTube results reconcilliation
now_dttm = utils.now()
attribution_start_dttm = now_dttm - timedelta(days=ATTRIBUTION_DAYS)
last_datetime = get_bookmark(state, stream_name, start_date, parent_id=job_id)
last_dttm = strptime_to_utc(last_datetime)

if attribution_start_dttm < last_dttm:
last_dttm = attribution_start_dttm
last_datetime = strftime(last_dttm)

max_bookmark_value = last_datetime

# Create job for stream if not job_exists
if not job_exists:
body = {
Expand Down Expand Up @@ -467,8 +486,8 @@ def sync_report(client,
schema,
stream_metadata)
except Exception as err:
LOGGER.error('Transformer Error: %s', err)
LOGGER.error('Stream: %s, record: %s', stream_name, record)
LOGGER.debug('Transformer Error: %s', err)
LOGGER.debug('Stream: %s, record: %s', stream_name, record)
raise err

# Bookmarking
Expand All @@ -484,7 +503,7 @@ def sync_report(client,
counter.increment()

# Write bookmark after all records synced due to sort descending (most recent first)
write_bookmark(state, stream_name, max_bookmark_value)
write_bookmark(state, stream_name, max_bookmark_value, parent_id=job_id)

LOGGER.info('Stream: {}, Processed {} records'.format(stream_name, counter.value))
return counter.value
Expand Down
14 changes: 0 additions & 14 deletions tap_youtube_analytics/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,13 @@ def transform_report_record(record, dimensions, report):

new_record = record.copy()

map_path = get_abs_path('dim_lookup_map.json')
with open(map_path) as file:
dim_lookup_map = json.load(file)

dimension_values = {}

for key, val in list(record.items()):
# Add dimension key-val to dimension_values dictionary
if key in dimensions:
dimension_values[key] = val

# Transform dim values from codes to names using dim_lookup_map
if key in dim_lookup_map:
# lookup new_val, with a default for existing val (if not found)
new_val = dim_lookup_map[key].get(val, val)
if val == new_val:
LOGGER.warning('dim_lookup_map value not found; key: {}, value: {}'.format(key, val))
new_record[key] = new_val
else:
new_record[key] = val

# Add report fields to data
new_record['report_id'] = report.get('id')
new_record['report_type_id'] = report.get('reportTypeId')
Expand Down