diff --git a/gae_dashboard/sailthru_to_bigquery.py b/gae_dashboard/sailthru_to_bigquery.py index 8dba8a446..91c0c2e57 100755 --- a/gae_dashboard/sailthru_to_bigquery.py +++ b/gae_dashboard/sailthru_to_bigquery.py @@ -369,6 +369,147 @@ def _send_campaign_report(status, start_date, end_date, temp_file, verbose, return recent_blast_ids +def _send_list_data_to_bq(list_name, temp_file, verbose, dry_run, keep_temp): + """Send users list data from Sailthru to BigQuery. + + User data fields are specified in sailthru_user_list_export_schema.json + + Arguments: + list_name: name of list in sailthru to export + temp_file: A file to store the data, to be used by 'bq load'. + verbose: True if you want to show debug messages, else False. + dry_run: True if we should skip writing to bq. + keep_temp: True if we should keep the temp_file that we write. + """ + + # 'vars' will be filled with the name of custom user data + # fields from sailthru ex: email: 1 + fields = { + 'vars': {} + } + + # used to configure what fields are pulled from sailthru + # AND what's sent to BigQuery + schema_file = 'sailthru_user_list_export_schema.json' + + # appending a timestamp to table to keep historical data + bq_table_name = "sailthru_list_data.donor_list_%s" % str( + datetime.date.today().strftime('%Y%m%d')) + + # convert sailthru's fields to expected format + normalized_headers = { + "Profile Id": "profile_id", + "Email Hash": "email_hash" + } + + # use .json file to determine what fields we want to get from sailthru + with open(os.path.join(os.path.dirname(__file__), schema_file)) as f: + schema = json.load(f) + + for field in schema: + fields['vars'][field['name']] = 1 + + f.close() + + response_1 = _post( + 'job', job='export_list_data', list=list_name, + hash_algo='sha256', fields=fields, verbose=verbose) + + job_id = response_1.get_body().get('job_id') + + if job_id is None: + print ( + "WARNING: For the export_list_data job with list = %s, " + "the job_id returned from Sailthru's job=export_list_data is " + "None" % list_name) + return + + if verbose: + print ( + "For the export_list_data job with list = %s, calling " + "sailthru's job status API for job_id = %s" % ( + list_name, + job_id)) + response_2 = _get('job', job_id=job_id) + + while response_2.get_body().get('status') != "completed": + if verbose: + print ( + "For the export_list_data job with list_name = %s, polled " + "sailthru's job status API for job_id = %s " % + (list_name, job_id)) + print "Will poll again in 5 seconds." + time.sleep(5) + response_2 = _get('job', job_id=job_id) + if response_2.get_body().get('status') == "expired": + raise SailthruAPIException(response_2) + + filename_url = response_2.get_body().get('export_url') + + print filename_url + + if verbose: + print ( + "For the export_list_data job with list_name = %s, " + "creating a jsonl " + "file from the sailthru data" % list_name) + + with _CPU_LOCK: + try: + with open(temp_file, "wb") as f: + open_url = urllib.urlopen(filename_url) + with contextlib.closing(open_url) as csvdata: + # Take the csv data from the Sailthru API and + # convert it to JSON. + reader = csv.reader(csvdata, delimiter=',', quotechar='"') + + headers = reader.next() + headers = [ + normalized_headers[hdr] + if hdr in normalized_headers + else hdr for hdr in headers] + + for row_csv in reader: + row_object = {} + for idx, column_name in enumerate(headers): + cell_content = row_csv[idx].strip() + if cell_content == "": + row_object[column_name] = None + else: + row_object[column_name] = cell_content + + # Write each row. + # In JSON mode, bq expects a JSON object on each line. + f.write("%s\n" % (json.dumps(row_object))) + + if dry_run: + print ( + "DRY RUN: if this was for real, for the export_list_data " + "job with list = %s, we would write data at path " + "'%s' to bq table '%s'" + % (list_name, temp_file, bq_table_name)) + else: + if verbose: + print ( + "For the export_list_data job with list = %s, " + "writing jsonl file to bigquery" % list_name) + + bq_util.call_bq([ + 'load', + '--source_format=NEWLINE_DELIMITED_JSON', + '--replace', bq_table_name, + temp_file, + os.path.join( + os.path.dirname(__file__), + schema_file) + ], + project='khanacademy.org:deductive-jet-827', + return_output=False) + finally: + if not keep_temp: + os.unlink(temp_file) + + if __name__ == "__main__": # Create a temp directory to hold temporary files temp_dir = tempfile.mkdtemp("sailthru_data_dir") @@ -405,6 +546,14 @@ def _send_campaign_report(status, start_date, end_date, temp_file, verbose, '--end_date', required=True, help="End date of blasts (format: 'January 1 2017')") + parser_list_data = subparsers.add_parser( + 'list_data', + help='export user list data to BigQuery') + parser_list_data.add_argument( + '--list_name', required=True, + help='Need name of user list from sailthru' + ) + parser_export = subparsers.add_parser('export', help='export all as one script') @@ -435,6 +584,13 @@ def _send_campaign_report(status, start_date, end_date, temp_file, verbose, verbose=args.verbose, dry_run=args.dry_run, keep_temp=args.keep_temp) + elif args.subparser_name == 'list_data': + temp_file = os.path.join(temp_dir, "user_list_data_export.json") + _send_list_data_to_bq(list_name=args.list_name, + temp_file=temp_file, + verbose=args.verbose, + dry_run=args.dry_run, + keep_temp=args.keep_temp) else: # Call the script directly to generate the all campaigns table and # tables for blasts fired in the past 7 days. diff --git a/gae_dashboard/sailthru_user_list_export_schema.json b/gae_dashboard/sailthru_user_list_export_schema.json new file mode 100644 index 000000000..ffe46c6ba --- /dev/null +++ b/gae_dashboard/sailthru_user_list_export_schema.json @@ -0,0 +1,26 @@ +[{ + "name": "profile_id", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "email_hash", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "extid", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "kaid", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "canonical_donor_id", + "type": "STRING", + "mode": "NULLABLE" + } +]