Skip to content

Commit 2b1eb76

Browse files
committed
add segment sync solution
1 parent 4658ab0 commit 2b1eb76

File tree

4 files changed

+176
-0
lines changed

4 files changed

+176
-0
lines changed

scenarios/segment_sync/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Segment sync (replication)
2+
3+
----
4+
## Overview
5+
6+
This project provides a solution to replicating batch segment to real-time segment and vice versa.
7+
8+
----
9+
## Implementation
10+
1. Copy and paste the code into a custom script in Treasure Workflows.
11+
12+
----
13+
## Considerations
14+
15+
This project doesn't convert "Time Within Past" and "Segment Include/Exclude" segment rules.
16+
17+
----
18+
## Questions
19+
20+
Please feel free to reach out to [email protected] with any questions you have about using this code.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mode: 0 # 0 - batch to rt, 1 - rt to batch
2+
aud_api_ep: https://api-cdp.treasuredata.com
3+
get_seg_list_path: /audiences/{audienceId}/folders/{folderId}/segments
4+
post_new_seg_path: /audiences/{audienceId}/segments
5+
audience: xxx
6+
batch_folder: yyy
7+
rt_folder: zzz
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import os
2+
import requests
3+
4+
def main(**kwargs):
5+
# Init
6+
td_api_key = os.getenv("TD_API_KEY")
7+
aud_api_ep = kwargs.get("aud_api_ep")
8+
audience = str(kwargs.get("audience"))
9+
batch_folder = str(kwargs.get("batch_folder"))
10+
rt_folder = str(kwargs.get("rt_folder"))
11+
get_seg_list_path = kwargs.get("get_seg_list_path")
12+
post_new_seg_path = kwargs.get("post_new_seg_path")
13+
mode = kwargs.get("mode")
14+
get_b_seg_list_path = aud_api_ep + get_seg_list_path.replace("{audienceId}", audience).replace("{folderId}", batch_folder)
15+
get_rt_seg_list_path = aud_api_ep + get_seg_list_path.replace("{audienceId}", audience).replace("{folderId}", rt_folder)
16+
post_new_seg_path = aud_api_ep + post_new_seg_path.replace("{audienceId}", audience)
17+
b_data_dict = {}
18+
b_name_set = set()
19+
rt_data_dict = {}
20+
rt_name_set = set()
21+
22+
# Get segments data via Audience API
23+
def get_segs():
24+
payload = {}
25+
headers = {
26+
"Authorization": f"TD1 {td_api_key}"
27+
}
28+
batch_segs = requests.request("GET", get_b_seg_list_path, headers = headers, data = payload)
29+
rt_segs = requests.request("GET", get_rt_seg_list_path, headers = headers, data = payload)
30+
31+
if batch_segs.status_code == 200 and rt_segs.status_code == 200:
32+
batch_segs_list = batch_segs.json()
33+
rt_segs_list = rt_segs.json()
34+
elif batch_segs != 200 and rt_segs.status_code != 200:
35+
print(f"Failed to fetch batch segments. Status code: {batch_segs.status_code} - {batch_segs.reason} - {batch_segs.text}")
36+
print(f"Failed to fetch real-time segments. Status code: {rt_segs.status_code} - {rt_segs.reason} - {rt_segs.text}")
37+
exit()
38+
elif batch_segs == 200 and rt_segs.status_code != 200:
39+
print(f"Failed to fetch real-time segments. Status code: {rt_segs.status_code} - {rt_segs.reason} - {rt_segs.text}")
40+
exit()
41+
elif batch_segs != 200 and rt_segs.status_code == 200:
42+
print(f"Failed to fetch batch segments. Status code: {rt_segs.status_code} - {rt_segs.reason} - {rt_segs.text}")
43+
exit()
44+
else:
45+
print("Unknown error.")
46+
exit()
47+
48+
return batch_segs_list, rt_segs_list
49+
50+
# Build segment lists
51+
def build_seg_lists():
52+
for i in range(len(batch_segs_list)):
53+
b_data_dict[batch_segs_list[i]["name"]] = [batch_segs_list[i]["id"], batch_segs_list[i]["description"], batch_segs_list[i]["rule"]]
54+
b_name_set.add(batch_segs_list[i]["name"])
55+
for j in range(len(rt_segs_list)):
56+
rt_data_dict[rt_segs_list[j]["name"]] = [rt_segs_list[j]["id"], rt_segs_list[j]["description"], rt_segs_list[j]["rule"]]
57+
rt_name_set.add(rt_segs_list[j]["name"])
58+
59+
# Post new segment via Audience API
60+
def post_seg(seg_name, headers, payload):
61+
new_seg = requests.request("POST", post_new_seg_path, headers = headers, json = payload)
62+
if new_seg.status_code == 200:
63+
print(f"Segment synced OK: '{seg_name}'")
64+
else:
65+
print(f"Failed to post segment: '{seg_name}' - Status code: {new_seg.status_code} - {new_seg.reason} - {new_seg.text}")
66+
67+
# Segments that have time within past operator or segment reference
68+
def detect_unconvertible(sn, rule):
69+
twp = False
70+
seg_ref = False
71+
for c in rule.get("conditions", []):
72+
if "conditions" in c:
73+
for sc in c["conditions"]:
74+
if str(sc).find("'type': 'TimeWithinPast'") > 0:
75+
print(f"TWP operator found in '{sn}'.")
76+
twp = True
77+
if str(sc).find("'type': 'Reference'") > 0:
78+
print(f"Segment reference found in '{sn}'.")
79+
seg_ref = True
80+
return twp or seg_ref
81+
82+
# Segments that already exist by segment name
83+
def detect_existing(sn, ns2):
84+
seg_exists = False
85+
if sn in ns2:
86+
print(f"Segment '{sn}' already exists.")
87+
seg_exists = True
88+
return seg_exists
89+
90+
# Compare segment lists and create diff
91+
def replicate_diff():
92+
if mode == 0:
93+
folder = rt_folder
94+
kind = 1
95+
name_set1 = b_name_set
96+
name_set2 = rt_name_set
97+
data_dict = b_data_dict
98+
elif mode == 1:
99+
folder = batch_folder
100+
kind = 0
101+
name_set1 = rt_name_set
102+
name_set2 = b_name_set
103+
data_dict = rt_data_dict
104+
else:
105+
print("Configuration error.")
106+
exit()
107+
for sn in name_set1:
108+
if not detect_existing(sn, name_set2) and not detect_unconvertible(sn, data_dict[sn][2]):
109+
payload = {
110+
'audienceId': audience,
111+
'name': sn,
112+
'kind': kind,
113+
'description': data_dict[sn][1] + ' --- [Created from: ' + data_dict[sn][0] + ' ]',
114+
'segmentFolderId': folder,
115+
'rule': data_dict[sn][2]
116+
}
117+
headers = {
118+
"Authorization": f"TD1 {td_api_key}",
119+
"Content-Type": "application/json"
120+
}
121+
post_seg(sn, headers, payload)
122+
123+
batch_segs_list, rt_segs_list = get_segs()
124+
build_seg_lists()
125+
replicate_diff()
126+
127+
#Stats
128+
print("Batch segment count (before): " + str(len(batch_segs_list)))
129+
print("Real-time segment count (before): " + str(len(rt_segs_list)))
130+
batch_segs_list, rt_segs_list = get_segs()
131+
print("Batch segment count (after): " + str(len(batch_segs_list)))
132+
print("Real-time segment count (after): " + str(len(rt_segs_list)))
133+
134+
print("Fin")
135+
136+
# Main
137+
if __name__ == "__main__":
138+
main()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
timezone: Asia/Tokyo
2+
3+
_export:
4+
!include : config/params.yaml
5+
6+
+replicate:
7+
py>: scripts.seg_rep.main
8+
_env:
9+
TD_API_KEY: ${secret:td.apikey}
10+
docker:
11+
image: "digdag/digdag-python:3.10"

0 commit comments

Comments
 (0)