-
Notifications
You must be signed in to change notification settings - Fork 0
/
create_dataprep_objects.py
151 lines (108 loc) · 4.99 KB
/
create_dataprep_objects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import requests, json
from datetime import datetime
# --- Token for Dataprep Authentication - Can be generated in Dataprep interface for project's owner
DATAPREP_AUTH_TOKEN='XXXXXXXXXXXX'
# --- Dataprep folder ID to create new flows. You can get it from the url of a folder.
DATAPREP_FOLDERID=0#1234
def get_dataprep_urls(file_uri):
now = datetime.now() # current date and time
timestamp = now.strftime("%d/%m/%y %H:%M")
#print("Today :", timestamp)
#timestamp=`date +"%d/%m/%y %H:%M"`
#echo $timestamp
flow_name="Flow - "+timestamp
flow_description="Flow description - "+timestamp
dataset_name="Dataset - "+timestamp
recipe_name="Recipe - "+timestamp
new_flow_id = create_dataprep_flow(flow_name,flow_description)
new_dataset_id = create_dataprep_dataset(file_uri,dataset_name)
add_dataset_to_flow(new_flow_id,new_dataset_id)
new_recipe_id = add_recipe_to_dataset(new_flow_id,new_dataset_id,recipe_name)
recipe_url = 'https://clouddataprep.com/data/{}/{}'.format(new_flow_id,new_recipe_id)
dataset_url = 'https://clouddataprep.com/datasets/{}'.format(new_dataset_id)
flow_url = 'https://clouddataprep.com/flows/{}'.format(new_flow_id)
dataprep_urls={'recipe_url': recipe_url, 'dataset_url': dataset_url, 'flow_url': flow_url}
return dataprep_urls
# --------------------------------------------------------------------------------------------
# -------------- CREATE FLOW ---------------
# --------------------------------------------------------------------------------------------
def create_dataprep_flow(flow_name, flow_description):
endpoint="/v4/flows"
parameters = {
"name": flow_name,
"description": flow_description,
"folderId": DATAPREP_FOLDERID
}
#print('Parameter: {}'.format(parameters))
resp = requests.post(
url="https://api.clouddataprep.com"+endpoint,
headers={"Content-Type":"application/json","Authorization": "Bearer "+DATAPREP_AUTH_TOKEN},
data=json.dumps(parameters)
)
result=resp.json()
new_flow_id=result['id']
#print('Status Code : {}'.format(resp.status_code))
#print('Result : {}'.format(result))
#print('New Flow ID : {}'.format(new_flow_id))
return new_flow_id
# --------------------------------------------------------------------------------------------
# ------------------------------ CREATE DATASET ----------------------------------------------
# --------------------------------------------------------------------------------------------
def create_dataprep_dataset(file_uri,dataset_name):
endpoint="/v4/importedDatasets"
parameters = {
"uri": file_uri,
"name": dataset_name
}
#print('Parameter: {}'.format(parameters))
resp = requests.post(
url="https://api.clouddataprep.com"+endpoint,
headers={"Content-Type":"application/json","Authorization": "Bearer "+DATAPREP_AUTH_TOKEN},
data=json.dumps(parameters)
)
result=resp.json()
new_dataset_id=result['id']
#print('Status Code : {}'.format(resp.status_code))
#print('Result : {}'.format(result))
#print('New Dataset ID : {}'.format(new_dataset_id))
return new_dataset_id
# --------------------------------------------------------------------------------------------
# -------------- ADD DATASET TO FLOW ---------------
# --------------------------------------------------------------------------------------------
def add_dataset_to_flow(flow_id,dataset_id):
endpoint="/v4/importedDatasets/"+str(dataset_id)+"/addToFlow"
parameters = {
"flow": {"id": flow_id}
}
#print('Parameter: {}'.format(parameters))
resp = requests.post(
url="https://api.clouddataprep.com"+endpoint,
headers={"Content-Type":"application/json","Authorization": "Bearer "+DATAPREP_AUTH_TOKEN},
data=json.dumps(parameters)
)
result=resp.json()
#print('Status Code : {}'.format(resp.status_code))
#print('Result : {}'.format(result))
return
# --------------------------------------------------------------------------------------------
# -------------- ADD RECIPE TO DATASET ---------------
# --------------------------------------------------------------------------------------------
def add_recipe_to_dataset(flow_id,dataset_id,recipe_name):
endpoint="/v4/wrangledDatasets"
parameters = {
"importedDataset":{"id": dataset_id},
"flow": {"id": flow_id},
"name": recipe_name
}
#print('Parameter: {}'.format(parameters))
resp = requests.post(
url="https://api.clouddataprep.com"+endpoint,
headers={"Content-Type":"application/json","Authorization": "Bearer "+DATAPREP_AUTH_TOKEN},
data=json.dumps(parameters)
)
result=resp.json()
new_recipe_id=result['id']
#print('Status Code : {}'.format(resp.status_code))
#print('Result : {}'.format(result))
#print('New Recipe ID : {}'.format(new_recipe_id))
return new_recipe_id