From 44a8a025f74ada25a258371828e132842e17c61a Mon Sep 17 00:00:00 2001 From: jkmin3 Date: Wed, 27 Mar 2024 14:34:17 -0500 Subject: [PATCH] finished the api for running flows and fixed many bugs --- ai_ta_backend/flows.py | 50 ++++++++++++++++++++++++++++-------------- ai_ta_backend/main.py | 30 ++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 17 deletions(-) diff --git a/ai_ta_backend/flows.py b/ai_ta_backend/flows.py index 78031d45..c3e96502 100644 --- a/ai_ta_backend/flows.py +++ b/ai_ta_backend/flows.py @@ -2,6 +2,7 @@ import time import os import supabase +from urllib.parse import quote class Flows(): @@ -26,7 +27,7 @@ def get_users(self, limit: int = 50, pagination: bool = True, api_key: str = "") all_users.append(data['data']) cursor = data.get('nextCursor') while cursor is not None: - url = self.url + '/api/v1/users?limit=%s&cursor=%s&includeRole=true' % (str(limit), cursor) + url = self.url + '/api/v1/users?limit=%s&cursor=%s&includeRole=true' % (str(limit), quote(cursor)) response = requests.get(url, headers=headers, timeout=8) data = response.json() all_users.append(data['data']) @@ -57,25 +58,25 @@ def get_executions(self, limit, id=None, pagination: bool = True, api_key: str = all_executions.append(executions['data']) cursor = executions.get('nextCursor') while cursor is not None: - url = f'self.url+/api/v1/executions?includeData=true&status=success&limit={str(limit)}&cursor={cursor}' + url = self.url + f'/api/v1/executions?includeData=true&status=success&limit={str(limit)}&cursor={quote(cursor)}' response = requests.get(url, headers=headers, timeout=8) executions = response.json() all_executions.append(executions['data']) cursor = executions.get('nextCursor') if id: for execution in all_executions: - if execution[0]['workflowId'] == id: + if execution[0]['id'] == id: return execution if id: for execution in executions['data']: - if execution['workflowId'] == id: + if execution['id'] == id: return execution else: return all_executions def get_hook(self, name: str, api_key: str = ""): - work_flow = self.get_workflows(1, api_key=api_key, workflow_name=name) + work_flow = self.get_workflows(limit=100, api_key=api_key, workflow_name=name) for node in work_flow.get('nodes'): # type: ignore if node['name'] == 'Webhook': return node['webhookId'] @@ -105,7 +106,7 @@ def get_workflows(self, all_workflows.append(workflows['data']) cursor = workflows.get('nextCursor') while cursor is not None: - url = self.url + f"/api/v1/workflows?limit={limit}&cursor={cursor}" + url = self.url + f"/api/v1/workflows?limit={limit}&cursor={quote(cursor)}" response = requests.get(url, headers=headers, timeout=8) workflows = response.json() all_workflows.append(workflows['data']) @@ -148,9 +149,10 @@ def get_data(self, id): def main_flow(self, name: str, api_key: str = ""): if not api_key: raise ValueError('api_key is required') - workflows = self.get_workflows(limit=1) + execution = self.get_executions(limit=1, api_key=api_key) hookId = self.get_hook(name, api_key) - hook = self.url + f"/webhook-test/{hookId}" + hook = self.url + f"/webhook/{hookId}" + print("Hook!!!: ", hook) response = self.supabase_client.table('n8n_api_keys').select("*").execute() @@ -160,15 +162,31 @@ def main_flow(self, name: str, api_key: str = ""): if len(ids) > 0: id = max(ids) + 1 + print("Execution found in supabase: ", id) else: - id = workflows[0]['id'] + 1 + if execution: + id = int(execution[0][0]['id']) + 1 + print("Execution found through n8n: ", id) + else: + raise Exception('No executions found') + id = str(id) self.supabase_client.table('n8n_api_keys').insert({"id": id}).execute() - self.execute_flow(hook) - executions = self.get_executions(20, id, True, api_key) - while id not in executions: - executions = self.get_executions(10, id, True, api_key) - time.sleep(1) + try: + self.execute_flow(hook, api_key) + print("Executed") + executions = self.get_executions(20, id, True, api_key) + print("Got executions", executions) + while executions is None: + executions = self.get_executions(1, id, True, api_key) + print("Executions: ", executions) + print("Can't find id in executions") + time.sleep(1) + except Exception as e: + self.supabase_client.table('n8n_api_keys').delete().eq('id', id).execute() + return {"error": str(e)} + print("Found id in executions ") self.supabase_client.table('n8n_api_keys').delete().eq('id', id).execute() - - return self.get_executions(1, id, False, api_key) + print("Deleted id") + print("Returning") + return executions diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 50261a2e..7bcf0b20 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -731,7 +731,7 @@ def get_all_workflows() -> Response: @app.route('/switch_workflow', methods=['GET']) def switch_workflow() -> Response: """ - Get all workflows from user. + Activate or deactivate flow for user. """ api_key = request.args.get('api_key', default='', type=str) @@ -758,5 +758,33 @@ def switch_workflow() -> Response: abort(400, description=f"Bad request: {e}") +@app.route('/run_flow', methods=['GET']) +def run_flow() -> Response: + """ + Run flow for a user and return results. + """ + + api_key = request.args.get('api_key', default='', type=str) + name = request.args.get('name', default='', type=str) + + print(request.args) + + if api_key == '': + # proper web error "400 Bad request" + abort(400, description=f"Missing N8N API_KEY: 'api_key' must be provided. Search query: `{api_key}`") + + flows = Flows() + try: + response = flows.main_flow(name, api_key) + response = jsonify(response) + response.headers.add('Access-Control-Allow-Origin', '*') + return response + except Exception as e: + if e == "Unauthorized": + abort(401, description=f"Unauthorized: 'api_key' is invalid. Search query: `{api_key}`") + else: + abort(400, description=f"Bad request: {e}") + + if __name__ == '__main__': app.run(debug=True, port=int(os.getenv("PORT", default=8000))) # nosec -- reasonable bandit error suppression