diff --git a/ai_ta_backend/flows.py b/ai_ta_backend/flows.py index c43f1f92..ee0e2d09 100644 --- a/ai_ta_backend/flows.py +++ b/ai_ta_backend/flows.py @@ -147,7 +147,7 @@ def format_data(self, inputted, api_key: str, workflow_name): new_data = {} for k, v in inputted.items(): new_data[data[k]] = v - + print("Done with formatting") return new_data # TODO: activate and disactivate workflows @@ -182,25 +182,25 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): if not api_key: raise ValueError('api_key is required') print("Starting") - execution = self.get_executions(limit=1, api_key=api_key) - print("Got executions") hookId = self.get_hook(name, api_key) hook = self.url + f"/form/{hookId}" print("Hook!!!: ", hook) new_data = self.format_data(data, api_key, name) - response = self.supabase_client.table('n8n_api_keys').select("*").execute() + response = self.supabase_client.table('n8n_workflows').select("latest_workflow_id").execute() print("Got response") ids = [] for row in dict(response)['data']: - ids.append(row['id']) + ids.append(row['latest_workflow_id']) if len(ids) > 0: id = max(ids) + 1 print("Execution found in supabase: ", id) else: + execution = self.get_executions(limit=1, api_key=api_key) + print("Got executions") if execution: id = int(execution[0][0]['id']) + 1 print("Execution found through n8n: ", id) @@ -208,18 +208,17 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): raise Exception('No executions found') id = str(id) - # start job try: start_time = time.monotonic() - self.supabase_client.table('n8n_api_keys').insert({"in_progress_workflow_id": id}).execute() + self.supabase_client.table('n8n_workflows').insert({"latest_workflow_id": id, "is_locked": True}).execute() self.execute_flow(hook, new_data) print("Executed") print(f"⏰ Runtime to execute_flow(): {(time.monotonic() - start_time):.4f} seconds") except: - pass + self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute() finally: # TODO: Remove lock from Supabase table. - pass + self.supabase_client.table('n8n_workflows').update({"is_locked": False}).eq('latest_workflow_id', id).execute() try: executions = self.get_executions(20, id, True, api_key) @@ -230,11 +229,10 @@ def main_flow(self, name: str, api_key: str = "", data: str = ""): print("Can't find id in executions") time.sleep(1) print("Found id in executions ") - self.supabase_client.table('n8n_api_keys').delete().eq('in_progress_workflow_id', id).execute() + self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute() print("Deleted id") print("Returning") except Exception as e: - self.supabase_client.table('n8n_api_keys').delete().eq('in_progress_workflow_id', id).execute() + self.supabase_client.table('n8n_workflows').delete().eq('latest_workflow_id', id).execute() return {"error": str(e)} - return executions diff --git a/ai_ta_backend/nomic_logging.py b/ai_ta_backend/nomic_logging.py index 43946bee..3636d388 100644 --- a/ai_ta_backend/nomic_logging.py +++ b/ai_ta_backend/nomic_logging.py @@ -14,9 +14,12 @@ OPENAI_API_TYPE = "azure" -LOCK_EXCEPTIONS = ['Project is locked for state access! Please wait until the project is unlocked to access embeddings.', - 'Project is locked for state access! Please wait until the project is unlocked to access data.', - 'Project is currently indexing and cannot ingest new datums. Try again later.'] +LOCK_EXCEPTIONS = [ + 'Project is locked for state access! Please wait until the project is unlocked to access embeddings.', + 'Project is locked for state access! Please wait until the project is unlocked to access data.', + 'Project is currently indexing and cannot ingest new datums. Try again later.' +] + def giveup_hdlr(e): """ @@ -36,12 +39,16 @@ def giveup_hdlr(e): sentry_sdk.capture_exception(e) return True + def backoff_hdlr(details): """ Function to handle backup conditions in backoff decorator. Currently just prints the details of the backoff. """ - print("\nBacking off {wait:0.1f} seconds after {tries} tries, calling function {target} with args {args} and kwargs {kwargs}".format(**details)) + print( + "\nBacking off {wait:0.1f} seconds after {tries} tries, calling function {target} with args {args} and kwargs {kwargs}" + .format(**details)) + def backoff_strategy(): """ @@ -50,7 +57,13 @@ def backoff_strategy(): """ return backoff.expo(base=10, factor=1.5) -@backoff.on_exception(backoff_strategy, Exception, max_tries=5, raise_on_giveup=False, giveup=giveup_hdlr, on_backoff=backoff_hdlr) + +@backoff.on_exception(backoff_strategy, + Exception, + max_tries=5, + raise_on_giveup=False, + giveup=giveup_hdlr, + on_backoff=backoff_hdlr) def log_convo_to_nomic(course_name: str, conversation) -> str: nomic.login(os.getenv('NOMIC_API_KEY')) # login during start of flask app NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' @@ -193,8 +206,8 @@ def log_convo_to_nomic(course_name: str, conversation) -> str: else: # raising exception again to trigger backoff and passing parameters to use in create_nomic_map() raise Exception({"exception": str(e)}) - - + + def get_nomic_map(course_name: str): """ Returns the variables necessary to construct an iframe of the Nomic map given a course name. @@ -377,7 +390,7 @@ def create_nomic_map(course_name: str, log_data: list): else: print("ERROR in create_nomic_map():", e) sentry_sdk.capture_exception(e) - + return "failed"