File tree Expand file tree Collapse file tree 1 file changed +34
-0
lines changed
Expand file tree Collapse file tree 1 file changed +34
-0
lines changed Original file line number Diff line number Diff line change 1+ import re
2+
3+ @click .command ()
4+ @click .option ('--target_db' , default = 'curated' )
5+ @click .option ('--target_table' , default = 'client_communication_preferences_journal' )
6+ @click .option ('--as_of' , required = True )
7+ def main (target_db , target_table , as_of ):
8+ # Validate the as_of parameter to ensure it matches the expected format (YYYYMMDD)
9+ if not re .match (r'^\d{8}$' , as_of ):
10+ raise ValueError ("Invalid as_of format. Expected YYYYMMDD." )
11+
12+ qry = f"""
13+ WITH blueshift_active_email_client_agg AS (
14+ SELECT client_id,
15+ MAX(last_opened_at) AS last_opened_at,
16+ MIN(first_opened_at) AS first_opened_at
17+ FROM blueshift.campaign_activity_kpis
18+ WHERE DATE(last_opened_at) <= TO_DATE('{ as_of } ', 'yyyyMMdd')
19+ OR last_opened_at IS NULL
20+ OR DATE(first_opened_at) <= TO_DATE('{ as_of } ', 'yyyyMMdd')
21+ GROUP BY 1
22+ )
23+ ...
24+ """
25+
26+ df = sc .sql (qry ).withColumn ('start_date' , f .col ('start_date' ).cast ('timestamp' ))
27+
28+ sc .save (
29+ df = df ,
30+ database = target_db ,
31+ table = target_table ,
32+ journal_write = True ,
33+ journal_write_as_of = as_of ,
34+ )
You can’t perform that action at this time.
0 commit comments