Skip to content

Commit 60aef3a

Browse files
committed
add responsys export
1 parent e25c692 commit 60aef3a

File tree

8 files changed

+275
-0
lines changed

8 files changed

+275
-0
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Responsys Activation Actions
2+
3+
----
4+
## Overview
5+
6+
This project provides a solution to exporting profiles to Oracle Responsys.
7+
8+
----
9+
## Implementation
10+
1. Copy and paste this code into Treasure Workflows.
11+
12+
----
13+
## Considerations
14+
15+
N/A
16+
17+
----
18+
## Questions
19+
20+
Please feel free to reach out to [email protected] with any questions you have about using this code.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
sla:
2+
duration: 00:30:00
3+
fail: true
4+
+long_alert:
5+
echo>: "#### Long Running Workflow !!!###"
6+
7+
_error:
8+
mail>:
9+
data: Workflow Error occurred. Please check or run the command below. ${error.message}
10+
subject: Workflow (${session_id}) Error
11+
to: [<email>]
12+
bcc: [<email>]
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
SELECT
2+
riid,
3+
'${td.last_results.campaign}' AS campaign_name,
4+
'${td.exec}' AS ma,
5+
'${session_local_time}' AS session_time_compact,
6+
'sent' as status,
7+
'${segment_name}' AS segment_name,
8+
'${segment_id}' AS audience_id
9+
FROM
10+
${log.database}.tmp_${activation_id}_${session_id}
11+
GROUP BY
12+
1,2
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
SELECT
2+
${log.campaign_col} as campaign,
3+
COUNT(DISTINCT riid) AS cnt
4+
FROM
5+
${activation_actions_table}
6+
GROUP BY
7+
1
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
SELECT
2+
riid,
3+
campaign
4+
FROM
5+
${activation_actions_db}.${activation_actions_table}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
SELECT
2+
'${td.exec}' AS ma,
3+
'${td.last_results.campaign}' AS campaign_name,
4+
${td.last_results.cnt} AS activated_profiles,
5+
${session_unixtime} AS sesseion_time,
6+
'${session_local_time}' AS session_time_compact,
7+
'${td.wf}' AS workflow_name,
8+
'${segment_name}' AS segment_name,
9+
'${segment_id}' AS audience_id,
10+
'${activation_id}' AS syndication_id,
11+
'${activation_actions_db}' AS activation_source_db,
12+
'${activation_actions_table}' AS activation_source_tbl,
13+
'${session_id}'AS session_id,
14+
'${project_id}' AS project_id,
15+
'Activation Actions' as activation_type
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
### About this workflow ###
2+
# (Note) This workflow only works with Activation Actions
3+
#
4+
# Plese refer to the document on Activation Actions below:
5+
# https://docs.treasuredata.com/articles/pd/activation-actions
6+
#
7+
timezone: Asia/Singapore
8+
9+
# Global variables
10+
_export:
11+
td:
12+
engine: presto
13+
ep: https://api.treasuredata.com/
14+
wf: responsys_activation_actions
15+
exec: Oracle Responsys
16+
api:
17+
pw: ${secret:api.pw}
18+
prod: DEV # PROD or DEV
19+
log:
20+
database: activation_log
21+
table: log_responsys
22+
activation_log: log_summary
23+
id: riid
24+
campaign_col: campaign
25+
backup:
26+
flag: false
27+
days: 3 # call another workflow
28+
29+
## SLA & Error Notification ##
30+
31+
!include : config/handling.dig
32+
33+
## Data Check ##
34+
+initial_data_check:
35+
+data_counts:
36+
td>: queries/aggregation.sql
37+
database: ${activation_actions_db}
38+
store_last_results: true
39+
+count_logging:
40+
td>: queries/logging.sql
41+
database: ${log.database}
42+
insert_into: ${log.database}.${log.activation_log}
43+
44+
## Preparation & Logging ##
45+
46+
+data_preparation:
47+
_export:
48+
tmp_table: tmp_${activation_id}_${session_id}
49+
+create_tmp_table_for_activation:
50+
td>: queries/audience.sql
51+
create_table: ${tmp_table}
52+
database: ${log.database}
53+
54+
## Data sync with Responsys
55+
# Oracle Responsys Documentation:
56+
# https://docs.oracle.com/en/cloud/saas/marketing/responsys-rest-api/index.html
57+
# Some limitations may apply. Please see the document for more information.
58+
59+
+send_data_to_responsys:
60+
py>: scripts.send_data.main
61+
_env:
62+
PW: ${api.pw}
63+
PROD: ${api.prod}
64+
CAMP: ${td.last_results.campaign}
65+
ACT_DB: cdp_audience_${audience_id}
66+
ACT_TBL: cdp_syndication_${activation_id}
67+
EP: ${td.ep}
68+
ENGINE: ${td.engine}
69+
API_KEY: ${secret:td.apikey}
70+
WRITE_KEY: ${secret:td.writekey}
71+
docker:
72+
image: "digdag/digdag-python:3.10"
73+
74+
## Data Clean up ##
75+
76+
+data_clean_up:
77+
+activated_audience_log:
78+
td>: queries/activation_log.sql
79+
insert_into: ${log.table}
80+
database: ${log.database}
81+
+activated_to_other_destination:
82+
if>: ${typeof result_connection_settings !== 'undefined'}
83+
_do:
84+
td>: queries/audience.sql
85+
database: ${log.database}
86+
result_connection: ${result_connection_name}
87+
result_settings: ${result_connection_settings}
88+
_retry: 5
89+
+cleanup:
90+
td_ddl>:
91+
drop_tables: ["tmp_${activation_id}_${session_id}"]
92+
database: ${log.database}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import os
2+
import requests as req
3+
import json
4+
import pytd
5+
import pandas as pd
6+
7+
def main():
8+
client = pytd.Client(
9+
apikey=os.getenv("API_KEY"),
10+
endpoint=os.getenv("EP"),
11+
database=os.getenv("ACT_DB"),
12+
default_engine=os.getenv("ENGINE")
13+
)
14+
15+
res = client.query('SELECT array_agg(DISTINCT riid) AS riids FROM ' + os.getenv("ACT_TBL"))
16+
df = pd.DataFrame(**res)
17+
riids = list(df.at[0, 'riids'])
18+
19+
apiUsername = '<apiusername>'
20+
apiPassword = os.getenv("PW")
21+
campaignName = os.getenv("CAMP")
22+
23+
# Get token
24+
tokenRes = req.post(
25+
'https://<clientid>-api.responsys.ocs.oraclecloud.com/rest/api/v1.3/auth/token',
26+
data={
27+
'user_name': apiUsername,
28+
'password': apiPassword,
29+
'auth_type': 'password'
30+
},
31+
)
32+
token = tokenRes.json()['authToken']
33+
endpoint = tokenRes.json()['endPoint']
34+
35+
# Send batch data
36+
send_batches(campaignName, riids, endpoint, token)
37+
38+
def send_batches(campaignName, riids, endpoint, token):
39+
prod = os.environ.get('PROD', 'DEV').upper()
40+
batch_size = 200
41+
for i in range(0, len(riids), batch_size):
42+
recipients_batch = riids[i:i+batch_size]
43+
# Test mode
44+
if prod == 'DEV':
45+
print("---------------")
46+
print(i)
47+
print(recipients_batch)
48+
send_td(recipients_batch, campaignName)
49+
# Production Mode
50+
elif prod == 'PROD':
51+
# Activation to Oracle Responsys
52+
send(campaignName, recipients_batch, endpoint, token)
53+
# Other Mode
54+
else:
55+
print("Mode should be DEV or PROD")
56+
57+
def send(campaignName, riids_batch, endpoint, token):
58+
recipientsList = [{
59+
'recipient': {
60+
'recipientId': riid,
61+
}
62+
} for riid in riids_batch]
63+
64+
print("-------------------")
65+
print(json.dumps({'recipientData': recipientsList}))
66+
67+
print(endpoint + '/rest/api/v1.3/campaigns/' + campaignName + '/email')
68+
69+
response = req.post(
70+
f'{endpoint}/rest/api/v1.3/campaigns/{campaignName}/email',
71+
data=json.dumps({'recipientData': recipientsList}),
72+
headers={
73+
'Authorization': token,
74+
'content-type': 'application/json',
75+
}
76+
)
77+
78+
status_code = response.status_code
79+
res_text = response.text
80+
send_td(recipients_batch, campaignName, status_code=status_code, res_text=res_text)
81+
82+
if response.status_code != 200:
83+
print(f"Error: {response.status_code}")
84+
print(response.json())
85+
else:
86+
print("Batch sent successfully!")
87+
print(response.json())
88+
89+
90+
def send_td(recipients_batch, campaignName, **kwargs):
91+
td_log_database = 'activation_log'
92+
td_log_table = 'log_api'
93+
td_ingestion_endpoint = f'https://records.in.treasuredata.com/{td_log_database}/{td_log_table}'
94+
95+
payload = {
96+
"ma": "Responsys",
97+
"campaign": campaignName,
98+
"status": kwargs.get('status_code', 200),
99+
"response": kwargs.get('res_text', 'sample test response message'),
100+
"recipients_batch": recipients_batch,
101+
"recipients_batch_size": len(recipients_batch),
102+
"activation_source": "Activation Actions"
103+
}
104+
headers = {
105+
"Content-Type": "application/vnd.treasuredata.v1.single+json",
106+
"Authorization": f"TD1 {os.environ['WRITE_KEY']}",
107+
}
108+
td_response = req.post(td_ingestion_endpoint, json=payload, headers=headers)
109+
print("Response from TD:", td_response.text)
110+
111+
if __name__ == "__main__":
112+
main()

0 commit comments

Comments
 (0)