diff --git a/.beamignore b/.beamignore new file mode 100644 index 00000000..f787b8ee --- /dev/null +++ b/.beamignore @@ -0,0 +1,7 @@ +.venv +venv +.idea +.vscode +.git +*.pyc +__pycache__ diff --git a/ai_ta_backend/beam/ingest.py b/ai_ta_backend/beam/ingest.py index 11477b25..0993b286 100644 --- a/ai_ta_backend/beam/ingest.py +++ b/ai_ta_backend/beam/ingest.py @@ -41,7 +41,8 @@ from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Qdrant -from nomic_logging import delete_from_document_map, log_to_document_map + +from nomic_logging import delete_from_document_map, log_to_document_map, rebuild_map from OpenaiEmbeddings import OpenAIAPIProcessor from PIL import Image from posthog import Posthog @@ -156,6 +157,7 @@ def loader(): loader=loader, autoscaler=autoscaler) def ingest(**inputs: Dict[str, Any]): + qdrant_client, vectorstore, s3_client, supabase_client, posthog = inputs["context"] course_name: List[str] | str = inputs.get('course_name', '') @@ -171,6 +173,8 @@ def ingest(**inputs: Dict[str, Any]): ingester = Ingest(qdrant_client, vectorstore, s3_client, supabase_client, posthog) + + def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content): if content: return ingester.ingest_single_web_text(course_name, base_url, url, content, readable_filename) @@ -194,7 +198,10 @@ def run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) # s3_paths = success_fail_dict['failure_ingest'] # retry only failed paths.... what if this is a URL instead? success_fail_dict = run_ingest(course_name, s3_paths, base_url, url, readable_filename, content) time.sleep(13 * retry_num) # max is 65 - + + # rebuild nomic document map after all ingests are done + rebuild_status = rebuild_map(course_name, map_type='document') + print(f"Final success_fail_dict: {success_fail_dict}") return json.dumps(success_fail_dict) diff --git a/ai_ta_backend/beam/nomic_logging.py b/ai_ta_backend/beam/nomic_logging.py index 18591a05..ce8235a2 100644 --- a/ai_ta_backend/beam/nomic_logging.py +++ b/ai_ta_backend/beam/nomic_logging.py @@ -215,6 +215,7 @@ def log_convo_to_nomic(course_name: str, conversation) -> str: raise Exception({"exception": str(e)}) + def get_nomic_map(course_name: str, type: str): """ Returns the variables necessary to construct an iframe of the Nomic map given a course name. @@ -241,8 +242,7 @@ def get_nomic_map(course_name: str, type: str): except Exception as e: # Error: ValueError: You must specify a unique_id_field when creating a new project. if str(e) == 'You must specify a unique_id_field when creating a new project.': # type: ignore - print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", - e) + print("Nomic map does not exist yet, probably because you have less than 20 queries/documents on your project: ", e) else: print("ERROR in get_nomic_map():", e) sentry_sdk.capture_exception(e) @@ -405,8 +405,8 @@ def create_nomic_map(course_name: str, log_data: list): return "failed" -## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## +## -------------------------------- DOCUMENT MAP FUNCTIONS --------------------------------- ## def create_document_map(course_name: str): """ @@ -444,13 +444,13 @@ def create_document_map(course_name: str): desc=False).execute() if not response.count: return "No documents found for this course." - + total_doc_count = response.count print("Total number of documents in Supabase: ", total_doc_count) # minimum 20 docs needed to create map - if total_doc_count > 19: + if total_doc_count > 19: first_id = response.data[0]['id'] combined_dfs = [] curr_total_doc_count = 0 @@ -469,6 +469,7 @@ def create_document_map(course_name: str): curr_total_doc_count += len(response.data) doc_count += len(response.data) + if doc_count >= 1000: # upload to Nomic every 1000 docs # concat all dfs from the combined_dfs list @@ -495,6 +496,7 @@ def create_document_map(course_name: str): # add project lock logic here result = append_to_map(embeddings, metadata, project_name) + # reset variables combined_dfs = [] doc_count = 0 @@ -502,6 +504,7 @@ def create_document_map(course_name: str): # set first_id for next iteration first_id = response.data[-1]['id'] + 1 + # upload last set of docs final_df = pd.concat(combined_dfs, ignore_index=True) embeddings, metadata = data_prep_for_doc_map(final_df) @@ -528,6 +531,7 @@ def create_document_map(course_name: str): except Exception as e: print(e) sentry_sdk.capture_exception(e) + return "failed" @@ -615,13 +619,6 @@ def log_to_document_map(data: dict): project_name = "Document Map for " + course_name result = append_to_map(embeddings, metadata, project_name) - # check if project is accepting new datums - if project.is_accepting_data: - with project.wait_for_project_lock(): - project.rebuild_maps() - - # with project.wait_for_project_lock(): - # project.rebuild_maps() return result except Exception as e: @@ -642,7 +639,6 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co colorable_fields: list of str """ nomic.login(os.getenv('NOMIC_API_KEY')) - try: project = atlas.map_embeddings(embeddings=embeddings, data=metadata, @@ -658,7 +654,6 @@ def create_map(embeddings, metadata, map_name, index_name, topic_label_field, co print(e) return "Error in creating map: {e}" - def append_to_map(embeddings, metadata, map_name): """ Generic function to append new data to an existing Nomic map. @@ -667,6 +662,7 @@ def append_to_map(embeddings, metadata, map_name): metadata: pd.DataFrame of Nomic upload metadata map_name: str """ + nomic.login(os.getenv('NOMIC_API_KEY')) try: project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True) @@ -691,10 +687,10 @@ def data_prep_for_doc_map(df: pd.DataFrame): metadata = [] embeddings = [] + texts = [] for index, row in df.iterrows(): - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") if row['url'] == None: row['url'] = "" @@ -725,9 +721,7 @@ def data_prep_for_doc_map(df: pd.DataFrame): # check dimension if embeddings_np is (n, 1536) if len(embeddings_np.shape) < 2: print("Creating new embeddings...") - # embeddings_model = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE, - # openai_api_base=os.getenv('AZURE_OPENAI_BASE'), - # openai_api_key=os.getenv('AZURE_OPENAI_KEY')) # type: ignore + embeddings_model = OpenAIEmbeddings(openai_api_type="openai", openai_api_base="https://api.openai.com/v1/", openai_api_key=os.getenv('VLADS_OPENAI_KEY')) # type: ignore @@ -738,6 +732,32 @@ def data_prep_for_doc_map(df: pd.DataFrame): return embeddings, metadata +def rebuild_map(course_name:str, map_type:str): + """ + This function rebuilds a given map in Nomic. + """ + print("in rebuild_map()") + nomic.login(os.getenv('NOMIC_API_KEY')) + if map_type.lower() == 'document': + NOMIC_MAP_NAME_PREFIX = 'Document Map for ' + else: + NOMIC_MAP_NAME_PREFIX = 'Conversation Map for ' + + try: + # fetch project from Nomic + project_name = NOMIC_MAP_NAME_PREFIX + course_name + project = AtlasProject(name=project_name, add_datums_if_exists=True) + + with project.wait_for_project_lock(): + project.rebuild_maps() + return "Successfully rebuilt map" + except Exception as e: + print(e) + sentry_sdk.capture_exception(e) + return "Error in rebuilding map: {e}" + + if __name__ == '__main__': pass +