From b4792f7d52f4d549d2e6ea779afda6db16e49fa1 Mon Sep 17 00:00:00 2001 From: star-nox Date: Thu, 21 Mar 2024 18:15:32 -0500 Subject: [PATCH] updated create_doc_map() --- ai_ta_backend/beam/nomic_logging.py | 150 ++++++++++++++-------------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index ce8235a2..fab28c97 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -433,9 +433,9 @@ def create_document_map(course_name: str): try: # check if map exists - response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() - if response.data: - return "Map already exists for this course." + # response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute() + # if response.data: + # return "Map already exists for this course." # fetch relevant document data from Supabase response = supabase_client.table("documents").select("id", @@ -449,85 +449,83 @@ def create_document_map(course_name: str): print("Total number of documents in Supabase: ", total_doc_count) # minimum 20 docs needed to create map + if total_doc_count < 20: + return "Cannot create a map because there are less than 20 documents in the course." - if total_doc_count > 19: - first_id = response.data[0]['id'] - combined_dfs = [] - curr_total_doc_count = 0 - doc_count = 0 - first_batch = True + first_id = response.data[0]['id'] + combined_dfs = [] + curr_total_doc_count = 0 + doc_count = 0 + first_batch = True - # iteratively query in batches of 25 - while curr_total_doc_count < total_doc_count: + # iteratively query in batches of 25 + while curr_total_doc_count < total_doc_count: - response = supabase_client.table("documents").select( - "id, created_at, s3_path, url, readable_filename, contexts").eq("course_name", course_name).gte( + response = supabase_client.table("documents").select( + "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( 'id', first_id).order('id', desc=False).limit(25).execute() - df = pd.DataFrame(response.data) - combined_dfs.append(df) # list of dfs - - curr_total_doc_count += len(response.data) - doc_count += len(response.data) - + df = pd.DataFrame(response.data) + combined_dfs.append(df) # list of dfs - if doc_count >= 1000: # upload to Nomic every 1000 docs + curr_total_doc_count += len(response.data) + doc_count += len(response.data) - # concat all dfs from the combined_dfs list - final_df = pd.concat(combined_dfs, ignore_index=True) - # prep data for nomic upload - embeddings, metadata = data_prep_for_doc_map(final_df) - - if first_batch: - # create a new map - print("Creating new map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - # update flag - first_batch = False - - else: - # append to existing map - print("Appending data to existing map...") - project_name = NOMIC_MAP_NAME_PREFIX + course_name - # add project lock logic here - result = append_to_map(embeddings, metadata, project_name) + if doc_count >= 100: # upload to Nomic in batches of 1000 + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) - # reset variables - combined_dfs = [] - doc_count = 0 + # prep data for nomic upload + embeddings, metadata = data_prep_for_doc_map(final_df) - # set first_id for next iteration - first_id = response.data[-1]['id'] + 1 + if first_batch: + # create a new map + print("Creating new map...") + project_name = NOMIC_MAP_NAME_PREFIX + course_name + index_name = course_name + "_doc_index" + topic_label_field = "text" + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) + # update flag + first_batch = False - - # upload last set of docs - final_df = pd.concat(combined_dfs, ignore_index=True) - embeddings, metadata = data_prep_for_doc_map(final_df) - project_name = NOMIC_MAP_NAME_PREFIX + course_name - if first_batch: - index_name = course_name + "_doc_index" - topic_label_field = "text" - colorable_fields = ["readable_filename", "text"] - result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - else: - result = append_to_map(embeddings, metadata, project_name) - print("Atlas upload status: ", result) - - # log info to supabase - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - project.rebuild_maps() - project_info = {'course_name': course_name, 'doc_map_id': project_id} - response = supabase_client.table("projects").insert(project_info).execute() - print("Response from supabase: ", response) - return "success" + else: + # append to existing map + print("Appending data to existing map...") + project_name = NOMIC_MAP_NAME_PREFIX + course_name + # add project lock logic here + result = append_to_map(embeddings, metadata, project_name) + + # reset variables + combined_dfs = [] + doc_count = 0 + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of docs + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = data_prep_for_doc_map(final_df) + project_name = NOMIC_MAP_NAME_PREFIX + course_name + if first_batch: + index_name = course_name + "_doc_index" + topic_label_field = "text" + colorable_fields = ["readable_filename", "text", "base_url", "created_at"] + result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) else: - return "Cannot create a map because there are less than 20 documents in the course." + result = append_to_map(embeddings, metadata, project_name) + print("Atlas upload status: ", result) + + # log info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + project.rebuild_maps() + project_info = {'course_name': course_name, 'doc_map_id': project_id} + response = supabase_client.table("projects").insert(project_info).execute() + print("Response from supabase: ", response) + return "success" + except Exception as e: print(e) sentry_sdk.capture_exception(e) @@ -535,6 +533,7 @@ def create_document_map(course_name: str): return "failed" + def delete_from_document_map(course_name: str, ids: list): """ This function is used to delete datapoints from a document map. @@ -644,11 +643,11 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co data=metadata, id_field="id", build_topic_model=True, - name=map_name, topic_label_field=topic_label_field, + name=map_name, colorable_fields=colorable_fields, add_datums_if_exists=True) - project.create_index(index_name, build_topic_model=True) + project.create_index(name=index_name, build_topic_model=True) return "success" except Exception as e: print(e) @@ -673,7 +672,6 @@ def append_to_map(embeddings, metadata, map_name): print(e) return "Error in appending to map: {e}" - def data_prep_for_doc_map(df: pd.DataFrame): """ This function prepares embeddings and metadata for nomic upload in document map creation. @@ -692,6 +690,7 @@ def data_prep_for_doc_map(df: pd.DataFrame): for index, row in df.iterrows(): current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + created_at = datetime.datetime.strptime(row['created_at'], "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%Y-%m-%d %H:%M:%S") if row['url'] == None: row['url'] = "" # iterate through all contexts and create separate entries for each @@ -703,11 +702,12 @@ def data_prep_for_doc_map(df: pd.DataFrame): meta_row = { "id": str(row['id']) + "_" + str(context_count), - "doc_ingested_at": row['created_at'], + "created_at": created_at, "s3_path": row['s3_path'], "url": row['url'], + "base_url": row['base_url'], "readable_filename": row['readable_filename'], - "created_at": current_time, + "modified_at": current_time, "text": text_row }