diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index fab28c97..0d38cf4b 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -433,9 +433,9 @@ def create_document_map(course_name: str): try: # check if map exists - # response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - # if response.data: - # return "Map already exists for this course." + response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + if response.data: + return "Map already exists for this course." # fetch relevant document data from Supabase response = supabase_client.table("documents").select("id", @@ -470,8 +470,7 @@ def create_document_map(course_name: str): curr_total_doc_count += len(response.data) doc_count += len(response.data) - - if doc_count >= 100: # upload to Nomic in batches of 1000 + if doc_count >= 1000: # upload to Nomic in batches of 1000 # concat all dfs from the combined_dfs list final_df = pd.concat(combined_dfs, ignore_index=True) @@ -489,6 +488,13 @@ def create_document_map(course_name: str): result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) # update flag first_batch = False + # log project info to supabas + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + last_id = final_df['id'].iloc[-1].split("_")[0] + project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} + response = supabase_client.table("projects").insert(project_info).execute() + print("Response from supabase: ", response) else: # append to existing map @@ -496,7 +502,13 @@ def create_document_map(course_name: str): project_name = NOMIC_MAP_NAME_PREFIX + course_name # add project lock logic here result = append_to_map(embeddings, metadata, project_name) - + if result == "success": + # update the last uploaded id in supabase + last_id = final_df['id'].iloc[-1].split("_")[0] + project_info = {'last_uploaded_doc_id': last_id} + response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", response) + # reset variables combined_dfs = [] doc_count = 0 @@ -505,35 +517,33 @@ def create_document_map(course_name: str): first_id = response.data[-1]['id'] + 1 # upload last set of docs - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name - if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text", "base_url", "created_at"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - else: - result = append_to_map(embeddings, metadata, project_name) - print("Atlas upload status: ", result) - - # log info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project.rebuild_maps() - project_info = {'course_name': course_name, 'doc_map_id': project_id} - response = supabase_client.table("projects").insert(project_info).execute() - print("Response from supabase: ", response) - return "success" + if doc_count > 0: + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = data_prep_for_doc_map(final_df) + project_name = NOMIC_MAP_NAME_PREFIX + course_name + if first_batch: + index_name = course_name + "_doc_index" + topic_label_field = "text" + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + else: + result = append_to_map(embeddings, metadata, project_name) + if result == "success": + # update the last uploaded id in supabase + last_id = final_df['id'].iloc[-1].split("_")[0] + project_info = {'last_uploaded_doc_id': last_id} + response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", response) + print("Atlas upload status: ", result) + + # rebuild the map + rebuild_map(course_name, "document") except Exception as e: print(e) sentry_sdk.capture_exception(e) - return "failed" - - def delete_from_document_map(course_name: str, ids: list): """ This function is used to delete datapoints from a document map. @@ -559,7 +569,7 @@ def delete_from_document_map(course_name: str, ids: list): print("Deleting point from document map:", project.delete_data(ids)) with project.wait_for_project_lock(): project.rebuild_maps() - return "Successfully deleted from Nomic map" + return "success" except Exception as e: print(e) sentry_sdk.capture_exception(e) @@ -578,9 +588,10 @@ def log_to_document_map(data: dict): try: # check if map exists course_name = data['course_name'] - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() if response.data: project_id = response.data[0]['doc_map_id'] + last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] else: # create a map map_creation_result = create_document_map(course_name) @@ -588,11 +599,22 @@ def log_to_document_map(data: dict): return "The project has less than 20 documents and a map cannot be created." else: # fetch project id - response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() project_id = response.data[0]['doc_map_id'] + last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] project = AtlasProject(project_id=project_id, add_datums_if_exists=True) #print("Inserted data: ", data) + # check if project is locked, if yes -> skip logging + if not project.is_accepting_data: + return "Skipping Nomic logging because project is locked." + else: + # fetch all records from supabase greater than last_uploaded_doc_id + response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() + df = pd.DataFrame(response.data) + + + embeddings = [] metadata = [] @@ -603,11 +625,12 @@ def log_to_document_map(data: dict): embeddings.append(row['embedding']) metadata.append({ "id": str(data['id']) + "_" + str(context_count), - "doc_ingested_at": data['created_at'], + "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "s3_path": data['s3_path'], "url": data['url'], + "base_url": data['base_url'], "readable_filename": data['readable_filename'], - "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "modified_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "text": row['text'] }) embeddings = np.array(embeddings) @@ -667,7 +690,7 @@ def append_to_map(embeddings, metadata, map_name): project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) with project.wait_for_project_lock(): project.add_embeddings(embeddings=embeddings, data=metadata) - return "Successfully appended to Nomic map" + return "success" except Exception as e: print(e) return "Error in appending to map: {e}" @@ -750,7 +773,7 @@ def rebuild_map(course_name:str, map_type:str): with project.wait_for_project_lock(): project.rebuild_maps() - return "Successfully rebuilt map" + return "success" except Exception as e: print(e) sentry_sdk.capture_exception(e)