From 24b79102e6fc72daf43cad2f4ec20fd98d354f2c Mon Sep 17 00:00:00 2001 From: jkmin3 Date: Wed, 3 Apr 2024 14:58:58 -0500 Subject: [PATCH] fixed errors with executing flows and bettered the return value for executions --- ai_ta_backend/flows.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/ai_ta_backend/flows.py b/ai_ta_backend/flows.py index 70380ccc..e6299ea6 100644 --- a/ai_ta_backend/flows.py +++ b/ai_ta_backend/flows.py @@ -40,11 +40,9 @@ def execute_flow(self, hook: str, data=None) -> None: print("Executing flow") if not data: data = {'field-0': ''} - url = hook - response = requests.post(url, files=data, timeout=60) - body = response.json() + response = requests.post(hook, files=data, timeout=60) if not response.ok: - raise Exception(f"Error: {response.status_code} \n Message: {body.get('message')}") + raise Exception(f"Error: {response.status_code}") pass def get_executions(self, limit, id=None, pagination: bool = True, api_key: str = ""): @@ -68,8 +66,9 @@ def get_executions(self, limit, id=None, pagination: bool = True, api_key: str = cursor = executions.get('nextCursor') if id: for execution in all_executions: + print("Execution: ", execution) if execution[0]['id'] == id: - return execution + return execution[0] if id: for execution in executions['data']: @@ -172,9 +171,8 @@ def switch_workflow(self, id, api_key: str = "", activate: 'str' = 'True'): def get_data(self, id): self.get_executions(20, id) - # TODO: NEED to have keyword args for workflows like Pest Detection. # TODO: make the supabase rpc call to make the transaction - # What if some data takes longer to parse, so the transactional supabase call is wrong. + # todo: Another problem with supabase calling for id is if someone manually runs a workflow. def main_flow(self, name: str, api_key: str = "", data: str = ""): if not api_key: raise ValueError('api_key is required') @@ -196,10 +194,11 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): id = max(ids) + 1 print("Execution found in supabase: ", id) else: - execution = self.get_executions(limit=1, api_key=api_key) + print("Getting executions") + execution = self.get_executions(limit=1, api_key=api_key, pagination=False) print("Got executions") if execution: - id = int(execution[0][0]['id']) + 1 + id = int(execution[0]['id']) + 1 print("Execution found through n8n: ", id) else: raise Exception('No executions found') @@ -208,26 +207,32 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): try: start_time = time.monotonic() print("Inserting") - self.supabase_client.table('n8n_workflows').insert({"latest_workflow_id": id, "is_locked": True}).execute() + insert_response = self.supabase_client.table('n8n_workflows').insert({ + "latest_workflow_id": id, + "is_locked": True + }).execute() + print("Insert response: ", insert_response) print("inserted") - self.execute_flow(hook, new_data) + execute_response = self.execute_flow(hook, new_data) + print("Execute response: ", execute_response) print("Executed") print(f"⏰ Runtime to execute_flow(): {(time.monotonic() - start_time):.4f} seconds") except Exception as e: # TODO: Decrease number by one, is locked false # self.supabase_client.table('n8n_workflows').update({"latest_workflow_id": str(int(id) - 1), "is_locked": False}).eq('latest_workflow_id', id).execute() - self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute() - return {"error": str(e)} + # self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute() + return {"error!!": str(e)} finally: # TODO: Remove lock from Supabase table. - print("id: ", id) + print("Removing lock from: ", id) self.supabase_client.table('n8n_workflows').update({"is_locked": False}).eq('latest_workflow_id', id).execute() try: + print("Checking executions") executions = self.get_executions(20, id, True, api_key) print("Got executions", executions) while executions is None: - executions = self.get_executions(1, id, True, api_key) + executions = self.get_executions(20, id, True, api_key) print("Executions: ", executions) print("Can't find id in executions") time.sleep(1) @@ -238,4 +243,5 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): except Exception as e: self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute() return {"error": str(e)} + print("Returning execution") return executions