Skip to content

Commit

Permalink
fixed errors with executing flows and bettered the return value for e…
Browse files Browse the repository at this point in the history
…xecutions
  • Loading branch information
jkmin3 committed Apr 3, 2024
1 parent e4b5617 commit 24b7910
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions ai_ta_backend/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ def execute_flow(self, hook: str, data=None) -> None:
print("Executing flow")
if not data:
data = {'field-0': ''}
url = hook
response = requests.post(url, files=data, timeout=60)
body = response.json()
response = requests.post(hook, files=data, timeout=60)
if not response.ok:
raise Exception(f"Error: {response.status_code} \n Message: {body.get('message')}")
raise Exception(f"Error: {response.status_code}")
pass

def get_executions(self, limit, id=None, pagination: bool = True, api_key: str = ""):
Expand All @@ -68,8 +66,9 @@ def get_executions(self, limit, id=None, pagination: bool = True, api_key: str =
cursor = executions.get('nextCursor')
if id:
for execution in all_executions:
print("Execution: ", execution)
if execution[0]['id'] == id:
return execution
return execution[0]

if id:
for execution in executions['data']:
Expand Down Expand Up @@ -172,9 +171,8 @@ def switch_workflow(self, id, api_key: str = "", activate: 'str' = 'True'):
def get_data(self, id):
self.get_executions(20, id)

# TODO: NEED to have keyword args for workflows like Pest Detection.
# TODO: make the supabase rpc call to make the transaction
# What if some data takes longer to parse, so the transactional supabase call is wrong.
# todo: Another problem with supabase calling for id is if someone manually runs a workflow.
def main_flow(self, name: str, api_key: str = "", data: str = ""):
if not api_key:
raise ValueError('api_key is required')
Expand All @@ -196,10 +194,11 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""):
id = max(ids) + 1
print("Execution found in supabase: ", id)
else:
execution = self.get_executions(limit=1, api_key=api_key)
print("Getting executions")
execution = self.get_executions(limit=1, api_key=api_key, pagination=False)
print("Got executions")
if execution:
id = int(execution[0][0]['id']) + 1
id = int(execution[0]['id']) + 1
print("Execution found through n8n: ", id)
else:
raise Exception('No executions found')
Expand All @@ -208,26 +207,32 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""):
try:
start_time = time.monotonic()
print("Inserting")
self.supabase_client.table('n8n_workflows').insert({"latest_workflow_id": id, "is_locked": True}).execute()
insert_response = self.supabase_client.table('n8n_workflows').insert({
"latest_workflow_id": id,
"is_locked": True
}).execute()
print("Insert response: ", insert_response)
print("inserted")
self.execute_flow(hook, new_data)
execute_response = self.execute_flow(hook, new_data)
print("Execute response: ", execute_response)
print("Executed")
print(f"⏰ Runtime to execute_flow(): {(time.monotonic() - start_time):.4f} seconds")
except Exception as e:
# TODO: Decrease number by one, is locked false
# self.supabase_client.table('n8n_workflows').update({"latest_workflow_id": str(int(id) - 1), "is_locked": False}).eq('latest_workflow_id', id).execute()
self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute()
return {"error": str(e)}
# self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute()
return {"error!!": str(e)}
finally:
# TODO: Remove lock from Supabase table.
print("id: ", id)
print("Removing lock from: ", id)
self.supabase_client.table('n8n_workflows').update({"is_locked": False}).eq('latest_workflow_id', id).execute()

try:
print("Checking executions")
executions = self.get_executions(20, id, True, api_key)
print("Got executions", executions)
while executions is None:
executions = self.get_executions(1, id, True, api_key)
executions = self.get_executions(20, id, True, api_key)
print("Executions: ", executions)
print("Can't find id in executions")
time.sleep(1)
Expand All @@ -238,4 +243,5 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""):
except Exception as e:
self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute()
return {"error": str(e)}
print("Returning execution")
return executions

0 comments on commit 24b7910

Please sign in to comment.