diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 0d38cf4b..9ba01bdf 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -453,6 +453,7 @@ def create_document_map(course_name: str): return "Cannot create a map because there are less than 20 documents in the course." first_id = response.data[0]['id'] + combined_dfs = [] curr_total_doc_count = 0 doc_count = 0 @@ -460,7 +461,7 @@ def create_document_map(course_name: str): # iteratively query in batches of 25 while curr_total_doc_count < total_doc_count: - + response = supabase_client.table("documents").select( "id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte( 'id', first_id).order('id', desc=False).limit(25).execute() @@ -486,15 +487,18 @@ def create_document_map(course_name: str): topic_label_field = "text" colorable_fields = ["readable_filename", "text", "base_url", "created_at"] result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) - # update flag - first_batch = False - # log project info to supabas - project = AtlasProject(name=project_name, add_datums_if_exists=True) - project_id = project.id - last_id = final_df['id'].iloc[-1].split("_")[0] - project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} - response = supabase_client.table("projects").insert(project_info).execute() - print("Response from supabase: ", response) + + if result == "success": + # update flag + first_batch = False + # log project info to supabase + project = AtlasProject(name=project_name, add_datums_if_exists=True) + project_id = project.id + print("Last id: ", final_df['id'].iloc[-1]) + last_id = int(final_df['id'].iloc[-1]) + project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id} + update_response = supabase_client.table("projects").insert(project_info).execute() + print("Response from supabase: ", update_response) else: # append to existing map @@ -504,14 +508,16 @@ def create_document_map(course_name: str): result = append_to_map(embeddings, metadata, project_name) if result == "success": # update the last uploaded id in supabase - last_id = final_df['id'].iloc[-1].split("_")[0] - project_info = {'last_uploaded_doc_id': last_id} - response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", response) + last_id = int(final_df['id'].iloc[-1]) + info = {'last_uploaded_doc_id': last_id} + print("info:", info) + update_response = supabase_client.table("projects").update(info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) # reset variables combined_dfs = [] doc_count = 0 + print("Records uploaded: ", curr_total_doc_count) # set first_id for next iteration first_id = response.data[-1]['id'] + 1 @@ -528,12 +534,15 @@ def create_document_map(course_name: str): result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields) else: result = append_to_map(embeddings, metadata, project_name) + + # update the last uploaded id in supabase if result == "success": # update the last uploaded id in supabase - last_id = final_df['id'].iloc[-1].split("_")[0] + last_id = int(final_df['id'].iloc[-1]) project_info = {'last_uploaded_doc_id': last_id} - response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() - print("Response from supabase: ", response) + print("project_info: ", project_info) + update_response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) print("Atlas upload status: ", result) # rebuild the map @@ -576,7 +585,7 @@ def delete_from_document_map(course_name: str, ids: list): return "Error in deleting from document map: {e}" -def log_to_document_map(data: dict): +def log_to_document_map(course_name: str): """ This is a function which appends new documents to an existing document map. It's called at the end of split_and_upload() after inserting data to Supabase. @@ -586,8 +595,7 @@ def log_to_document_map(data: dict): print("in add_to_document_map()") try: - # check if map exists - course_name = data['course_name'] + # check if map exists response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute() if response.data: project_id = response.data[0]['doc_map_id'] @@ -604,51 +612,76 @@ def log_to_document_map(data: dict): last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id'] project = AtlasProject(project_id=project_id, add_datums_if_exists=True) - #print("Inserted data: ", data) + project_name = "Document Map for " + course_name # check if project is locked, if yes -> skip logging if not project.is_accepting_data: return "Skipping Nomic logging because project is locked." - else: + + # fetch count of records greater than last_uploaded_doc_id + print("last uploaded doc id: ", last_uploaded_doc_id) + response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() + print("Number of new documents: ", response.count) + + total_doc_count = response.count + current_doc_count = 0 + combined_dfs = [] + + while current_doc_count < total_doc_count: # fetch all records from supabase greater than last_uploaded_doc_id - response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute() + response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).limit(25).execute() df = pd.DataFrame(response.data) - - + combined_dfs.append(df) # list of dfs + current_doc_count += len(response.data) + doc_count += len(response.data) - embeddings = [] - metadata = [] - context_count = 0 - # prep data for nomic upload - for row in data['contexts']: - context_count += 1 - embeddings.append(row['embedding']) - metadata.append({ - "id": str(data['id']) + "_" + str(context_count), - "created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "s3_path": data['s3_path'], - "url": data['url'], - "base_url": data['base_url'], - "readable_filename": data['readable_filename'], - "modified_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "text": row['text'] - }) - embeddings = np.array(embeddings) - metadata = pd.DataFrame(metadata) - print("Shape of embeddings: ", embeddings.shape) - - # append to existing map - project_name = "Document Map for " + course_name - result = append_to_map(embeddings, metadata, project_name) + if doc_count >= 1000: # upload to Nomic in batches of 1000 + # concat all dfs from the combined_dfs list + final_df = pd.concat(combined_dfs, ignore_index=True) + # prep data for nomic upload + embeddings, metadata = data_prep_for_doc_map(final_df) - return result + # append to existing map + print("Appending data to existing map...") + + result = append_to_map(embeddings, metadata, project_name) + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + info = {'last_uploaded_doc_id': last_id} + print("info:", info) + update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + + # reset variables + combined_dfs = [] + doc_count = 0 + print("Records uploaded: ", curr_total_doc_count) + + # set first_id for next iteration + first_id = response.data[-1]['id'] + 1 + + # upload last set of docs + if doc_count > 0: + final_df = pd.concat(combined_dfs, ignore_index=True) + embeddings, metadata = data_prep_for_doc_map(final_df) + result = append_to_map(embeddings, metadata, project_name) + # update the last uploaded id in supabase + if result == "success": + # update the last uploaded id in supabase + last_id = int(final_df['id'].iloc[-1]) + project_info = {'last_uploaded_doc_id': last_id} + print("project_info: ", project_info) + update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute() + print("Response from supabase: ", update_response) + + return "success" except Exception as e: print(e) - sentry_sdk.capture_exception(e) - return "Error in appending to map: {e}" - - + return "failed" + + def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields): """ Generic function to create a Nomic map from given parameters.