Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a job that will send users with canonical_donor_id to BigQuery #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions gae_dashboard/sailthru_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,147 @@ def _send_campaign_report(status, start_date, end_date, temp_file, verbose,
return recent_blast_ids


def _send_list_data_to_bq(list_name, temp_file, verbose, dry_run, keep_temp):
"""Send users list data from Sailthru to BigQuery.

User data fields are specified in sailthru_user_list_export_schema.json

Arguments:
list_name: name of list in sailthru to export
temp_file: A file to store the data, to be used by 'bq load'.
verbose: True if you want to show debug messages, else False.
dry_run: True if we should skip writing to bq.
keep_temp: True if we should keep the temp_file that we write.
"""

# 'vars' will be filled with the name of custom user data
# fields from sailthru ex: email: 1
fields = {
'vars': {}
}

# used to configure what fields are pulled from sailthru
# AND what's sent to BigQuery
schema_file = 'sailthru_user_list_export_schema.json'

# appending a timestamp to table to keep historical data
bq_table_name = "sailthru_list_data.donor_list_%s" % str(
datetime.date.today().strftime('%Y%m%d'))

# convert sailthru's fields to expected format
normalized_headers = {
"Profile Id": "profile_id",
"Email Hash": "email_hash"
}

# use .json file to determine what fields we want to get from sailthru
with open(os.path.join(os.path.dirname(__file__), schema_file)) as f:
schema = json.load(f)

for field in schema:
fields['vars'][field['name']] = 1

f.close()

response_1 = _post(
'job', job='export_list_data', list=list_name,
hash_algo='sha256', fields=fields, verbose=verbose)

job_id = response_1.get_body().get('job_id')

if job_id is None:
print (
"WARNING: For the export_list_data job with list = %s, "
"the job_id returned from Sailthru's job=export_list_data is "
"None" % list_name)
return

if verbose:
print (
"For the export_list_data job with list = %s, calling "
"sailthru's job status API for job_id = %s" % (
list_name,
job_id))
response_2 = _get('job', job_id=job_id)

while response_2.get_body().get('status') != "completed":
if verbose:
print (
"For the export_list_data job with list_name = %s, polled "
"sailthru's job status API for job_id = %s " %
(list_name, job_id))
print "Will poll again in 5 seconds."
time.sleep(5)
response_2 = _get('job', job_id=job_id)
if response_2.get_body().get('status') == "expired":
raise SailthruAPIException(response_2)

filename_url = response_2.get_body().get('export_url')

print filename_url

if verbose:
print (
"For the export_list_data job with list_name = %s, "
"creating a jsonl "
"file from the sailthru data" % list_name)

with _CPU_LOCK:
try:
with open(temp_file, "wb") as f:
open_url = urllib.urlopen(filename_url)
with contextlib.closing(open_url) as csvdata:
# Take the csv data from the Sailthru API and
# convert it to JSON.
reader = csv.reader(csvdata, delimiter=',', quotechar='"')

headers = reader.next()
headers = [
normalized_headers[hdr]
if hdr in normalized_headers
else hdr for hdr in headers]

for row_csv in reader:
row_object = {}
for idx, column_name in enumerate(headers):
cell_content = row_csv[idx].strip()
if cell_content == "":
row_object[column_name] = None
else:
row_object[column_name] = cell_content

# Write each row.
# In JSON mode, bq expects a JSON object on each line.
f.write("%s\n" % (json.dumps(row_object)))

if dry_run:
print (
"DRY RUN: if this was for real, for the export_list_data "
"job with list = %s, we would write data at path "
"'%s' to bq table '%s'"
% (list_name, temp_file, bq_table_name))
else:
if verbose:
print (
"For the export_list_data job with list = %s, "
"writing jsonl file to bigquery" % list_name)

bq_util.call_bq([
'load',
'--source_format=NEWLINE_DELIMITED_JSON',
'--replace', bq_table_name,
temp_file,
os.path.join(
os.path.dirname(__file__),
schema_file)
],
project='khanacademy.org:deductive-jet-827',
return_output=False)
finally:
if not keep_temp:
os.unlink(temp_file)


if __name__ == "__main__":
# Create a temp directory to hold temporary files
temp_dir = tempfile.mkdtemp("sailthru_data_dir")
Expand Down Expand Up @@ -405,6 +546,14 @@ def _send_campaign_report(status, start_date, end_date, temp_file, verbose,
'--end_date', required=True,
help="End date of blasts (format: 'January 1 2017')")

parser_list_data = subparsers.add_parser(
'list_data',
help='export user list data to BigQuery')
parser_list_data.add_argument(
'--list_name', required=True,
help='Need name of user list from sailthru'
)

parser_export = subparsers.add_parser('export',
help='export all as one script')

Expand Down Expand Up @@ -435,6 +584,13 @@ def _send_campaign_report(status, start_date, end_date, temp_file, verbose,
verbose=args.verbose,
dry_run=args.dry_run,
keep_temp=args.keep_temp)
elif args.subparser_name == 'list_data':
temp_file = os.path.join(temp_dir, "user_list_data_export.json")
_send_list_data_to_bq(list_name=args.list_name,
temp_file=temp_file,
verbose=args.verbose,
dry_run=args.dry_run,
keep_temp=args.keep_temp)
else:
# Call the script directly to generate the all campaigns table and
# tables for blasts fired in the past 7 days.
Expand Down
26 changes: 26 additions & 0 deletions gae_dashboard/sailthru_user_list_export_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[{
"name": "profile_id",
"mode": "NULLABLE",
"type": "STRING"
},
{
"name": "email_hash",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "extid",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "kaid",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "canonical_donor_id",
"type": "STRING",
"mode": "NULLABLE"
}
]