Skip to content

Commit

Permalink
finished the api for running flows and fixed many bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
jkmin3 committed Mar 27, 2024
1 parent 5c63887 commit 44a8a02
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 17 deletions.
50 changes: 34 additions & 16 deletions ai_ta_backend/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import os
import supabase
from urllib.parse import quote


class Flows():
Expand All @@ -26,7 +27,7 @@ def get_users(self, limit: int = 50, pagination: bool = True, api_key: str = "")
all_users.append(data['data'])
cursor = data.get('nextCursor')
while cursor is not None:
url = self.url + '/api/v1/users?limit=%s&cursor=%s&includeRole=true' % (str(limit), cursor)
url = self.url + '/api/v1/users?limit=%s&cursor=%s&includeRole=true' % (str(limit), quote(cursor))
response = requests.get(url, headers=headers, timeout=8)
data = response.json()
all_users.append(data['data'])
Expand Down Expand Up @@ -57,25 +58,25 @@ def get_executions(self, limit, id=None, pagination: bool = True, api_key: str =
all_executions.append(executions['data'])
cursor = executions.get('nextCursor')
while cursor is not None:
url = f'self.url+/api/v1/executions?includeData=true&status=success&limit={str(limit)}&cursor={cursor}'
url = self.url + f'/api/v1/executions?includeData=true&status=success&limit={str(limit)}&cursor={quote(cursor)}'
response = requests.get(url, headers=headers, timeout=8)
executions = response.json()
all_executions.append(executions['data'])
cursor = executions.get('nextCursor')
if id:
for execution in all_executions:
if execution[0]['workflowId'] == id:
if execution[0]['id'] == id:
return execution

if id:
for execution in executions['data']:
if execution['workflowId'] == id:
if execution['id'] == id:
return execution
else:
return all_executions

def get_hook(self, name: str, api_key: str = ""):
work_flow = self.get_workflows(1, api_key=api_key, workflow_name=name)
work_flow = self.get_workflows(limit=100, api_key=api_key, workflow_name=name)
for node in work_flow.get('nodes'): # type: ignore
if node['name'] == 'Webhook':
return node['webhookId']
Expand Down Expand Up @@ -105,7 +106,7 @@ def get_workflows(self,
all_workflows.append(workflows['data'])
cursor = workflows.get('nextCursor')
while cursor is not None:
url = self.url + f"/api/v1/workflows?limit={limit}&cursor={cursor}"
url = self.url + f"/api/v1/workflows?limit={limit}&cursor={quote(cursor)}"
response = requests.get(url, headers=headers, timeout=8)
workflows = response.json()
all_workflows.append(workflows['data'])
Expand Down Expand Up @@ -148,9 +149,10 @@ def get_data(self, id):
def main_flow(self, name: str, api_key: str = ""):
if not api_key:
raise ValueError('api_key is required')
workflows = self.get_workflows(limit=1)
execution = self.get_executions(limit=1, api_key=api_key)
hookId = self.get_hook(name, api_key)
hook = self.url + f"/webhook-test/{hookId}"
hook = self.url + f"/webhook/{hookId}"
print("Hook!!!: ", hook)

response = self.supabase_client.table('n8n_api_keys').select("*").execute()

Expand All @@ -160,15 +162,31 @@ def main_flow(self, name: str, api_key: str = ""):

if len(ids) > 0:
id = max(ids) + 1
print("Execution found in supabase: ", id)
else:
id = workflows[0]['id'] + 1
if execution:
id = int(execution[0][0]['id']) + 1
print("Execution found through n8n: ", id)
else:
raise Exception('No executions found')
id = str(id)

self.supabase_client.table('n8n_api_keys').insert({"id": id}).execute()
self.execute_flow(hook)
executions = self.get_executions(20, id, True, api_key)
while id not in executions:
executions = self.get_executions(10, id, True, api_key)
time.sleep(1)
try:
self.execute_flow(hook, api_key)
print("Executed")
executions = self.get_executions(20, id, True, api_key)
print("Got executions", executions)
while executions is None:
executions = self.get_executions(1, id, True, api_key)
print("Executions: ", executions)
print("Can't find id in executions")
time.sleep(1)
except Exception as e:
self.supabase_client.table('n8n_api_keys').delete().eq('id', id).execute()
return {"error": str(e)}
print("Found id in executions ")
self.supabase_client.table('n8n_api_keys').delete().eq('id', id).execute()

return self.get_executions(1, id, False, api_key)
print("Deleted id")
print("Returning")
return executions
30 changes: 29 additions & 1 deletion ai_ta_backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ def get_all_workflows() -> Response:
@app.route('/switch_workflow', methods=['GET'])
def switch_workflow() -> Response:
"""
Get all workflows from user.
Activate or deactivate flow for user.
"""

api_key = request.args.get('api_key', default='', type=str)
Expand All @@ -758,5 +758,33 @@ def switch_workflow() -> Response:
abort(400, description=f"Bad request: {e}")


@app.route('/run_flow', methods=['GET'])
def run_flow() -> Response:
"""
Run flow for a user and return results.
"""

api_key = request.args.get('api_key', default='', type=str)
name = request.args.get('name', default='', type=str)

print(request.args)

if api_key == '':
# proper web error "400 Bad request"
abort(400, description=f"Missing N8N API_KEY: 'api_key' must be provided. Search query: `{api_key}`")

flows = Flows()
try:
response = flows.main_flow(name, api_key)
response = jsonify(response)
response.headers.add('Access-Control-Allow-Origin', '*')
return response
except Exception as e:
if e == "Unauthorized":
abort(401, description=f"Unauthorized: 'api_key' is invalid. Search query: `{api_key}`")
else:
abort(400, description=f"Bad request: {e}")


if __name__ == '__main__':
app.run(debug=True, port=int(os.getenv("PORT", default=8000))) # nosec -- reasonable bandit error suppression

0 comments on commit 44a8a02

Please sign in to comment.