From d2fc97f68b22aa2266b276c3320987b0bda21cd5 Mon Sep 17 00:00:00 2001 From: rohanmarwaha Date: Thu, 25 Apr 2024 20:22:23 -0500 Subject: [PATCH 1/4] Add document group bulk insertion endpoint --- ai_ta_backend/database/sql.py | 21 ++++++++ ai_ta_backend/main.py | 8 +++ ai_ta_backend/service/retrieval_service.py | 60 ++++++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index caf0ac51..37a800fc 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -1,4 +1,5 @@ import os +from typing import Dict, List import supabase from injector import inject @@ -110,3 +111,23 @@ def updateProjects(self, course_name: str, data: dict): def getConversation(self, course_name: str, key: str, value: str): return self.supabase_client.table("llm-convo-monitor").select("*").eq(key, value).eq("course_name", course_name).execute() + def fetchDocumentsByURLs(self, urls: List[str], course_name: str): + """ + Fetch documents that have base_url matching any of the URLs in the provided list. + """ + return self.supabase_client.table("documents").select("id, readable_filename, base_url").in_("base_url", urls).eq("course_name", course_name).execute() + + def insertDocumentGroupsBulk(self, document_groups: List[Dict]): + # Assuming the Supabase client's insert method supports returning inserted records + inserted_records = self.supabase_client.table("doc_groups").insert(document_groups).execute() + # Extract and return the IDs of the inserted document groups + inserted_ids = [record['id'] for record in inserted_records.data] + return inserted_ids + + def updateDocumentsDocGroupsBulk(self, document_ids: List[int], doc_group_id: int): + # Prepare updates + updates = [{"document_id": doc_id, "doc_group_id": doc_group_id} for doc_id in document_ids] + # Perform bulk update + self.supabase_client.table("documents_doc_groups").upsert(updates).execute() + + diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 5b52e7d3..785ad721 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -379,6 +379,14 @@ def getTopContextsWithMQR(service: RetrievalService, posthog_service: PosthogSer response.headers.add('Access-Control-Allow-Origin', '*') return response +@app.route('/insert_document_groups', methods=['POST']) +def insert_document_groups(service: RetrievalService) -> Response: + data = request.get_json() + csv_path: str = data.get('csv_path', '') + course_name: str = data.get('course_name', '') + doc_group_count, docs_doc_group_count = service.insertDocumentGroups(course_name, csv_path) + + return jsonify({"message": "Document groups and documents inserted successfully.", "doc_group_count": doc_group_count, "docs_doc_group_count": docs_doc_group_count}) def configure(binder: Binder) -> None: binder.bind(RetrievalService, to=RetrievalService, scope=RequestScope) diff --git a/ai_ta_backend/service/retrieval_service.py b/ai_ta_backend/service/retrieval_service.py index c53bcefb..104f6c50 100644 --- a/ai_ta_backend/service/retrieval_service.py +++ b/ai_ta_backend/service/retrieval_service.py @@ -5,6 +5,7 @@ from typing import Dict, List, Union import openai +import csv from injector import inject from langchain.chat_models import AzureChatOpenAI from langchain.embeddings.openai import OpenAIEmbeddings @@ -466,3 +467,62 @@ def format_for_json(self, found_docs: List[Document]) -> List[Dict]: ] return contexts + + def insertDocumentGroups(self, course_name, csv_path): + """ + Inserts document groups and documents into the database based on the CSV mapping. + Additionally, saves doc group name along with the entire document info in the output csv file. + """ + urls = [] + doc_group_count = 0 + docs_doc_groups_count = 0 + # Fetch existing documents by URLs from CSV + with open(csv_path, newline='') as csvfile: + for row in csv.DictReader(csvfile): + urls = eval(row['start_urls']) + print(f"fetching existing documents for course_name: {course_name} and URLs: {urls}") + existing_documents = self.sqlDb.fetchDocumentsByURLs(urls, course_name) + # Prepare data for bulk operations + data = parse_csv_and_prepare_data(course_name, existing_documents.data, row) + + document_group = { + 'name': row['university'], + 'course_name': course_name, + } + documents_to_update = existing_documents.data + + # Insert document groups in bulk and get inserted IDs + # inserted_doc_groups = self.sqlDb.insertDocumentGroupsBulk(document_groups) + print("document groups for insertion: ", document_group) + print("documents for update: ", len(documents_to_update)) + doc_group_count += 1 # Corrected to increment by 1 instead of len(document_group) which was incorrect + docs_doc_groups_count += len(documents_to_update) + with open(csv_path + 'output.csv', newline='', mode='a') as csvfile: + writer = csv.writer(csvfile) + for doc in documents_to_update: + # Include document group name in the output CSV file + writer.writerow([document_group['name']] + list(doc.values())) + # # Map documents to new document groups and update documents_doc_groups in bulk + # for doc_group, inserted_id in zip(document_groups, inserted_doc_groups): + # doc_ids = [doc['id'] for doc in documents_to_update] + # self.sqlDb.updateDocumentsDocGroupsBulk(doc_ids, inserted_id) + + return doc_group_count, docs_doc_groups_count + +def parse_csv_and_prepare_data(course_name: str, existing_documents: List[Dict], row) -> Dict[str, List[Dict]]: + """ + Parses the CSV file and prepares the data for document groups and documents insertion. + Adjusted to match documents based on start_urls and prepare updates for doc_groups and documents_doc_groups. + """ + document_groups = [] + documents_to_update = [] + + start_urls = eval(row['start_urls']) # Convert string representation of list to actual list + matched_documents = [doc for doc in existing_documents if doc['base_url'] in start_urls] + if matched_documents: + document_groups.append({ + 'name': row['university'], + 'course_name': course_name, + }) + documents_to_update.extend(matched_documents) + return {'document_groups': document_groups, 'documents_to_update': documents_to_update} From fd37eeb3a1bc5f6727d5e0a5cb76845aeaeaf665 Mon Sep 17 00:00:00 2001 From: rohanmarwaha Date: Fri, 26 Apr 2024 11:20:38 -0500 Subject: [PATCH 2/4] Refactor document group insertion and document update logic --- ai_ta_backend/database/sql.py | 11 +-- ai_ta_backend/service/retrieval_service.py | 85 +++++++++++----------- 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index 37a800fc..092c8f4e 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -111,17 +111,18 @@ def updateProjects(self, course_name: str, data: dict): def getConversation(self, course_name: str, key: str, value: str): return self.supabase_client.table("llm-convo-monitor").select("*").eq(key, value).eq("course_name", course_name).execute() - def fetchDocumentsByURLs(self, urls: List[str], course_name: str): + def fetchDocumentsByURLs(self, urls: List[str], course_name: str, page: int = 1, items_per_page: int = 1500): """ Fetch documents that have base_url matching any of the URLs in the provided list. """ - return self.supabase_client.table("documents").select("id, readable_filename, base_url").in_("base_url", urls).eq("course_name", course_name).execute() + return self.supabase_client.table("documents").select("id, readable_filename, base_url").in_("base_url", urls).eq("course_name", course_name).range((page - 1) * items_per_page, page * items_per_page - 1).execute() - def insertDocumentGroupsBulk(self, document_groups: List[Dict]): + def insertDocumentGroupsBulk(self, document_group): # Assuming the Supabase client's insert method supports returning inserted records - inserted_records = self.supabase_client.table("doc_groups").insert(document_groups).execute() + inserted_records = self.supabase_client.table("doc_groups").insert(document_group).execute() + print(f"Inserted records: {inserted_records}") # Extract and return the IDs of the inserted document groups - inserted_ids = [record['id'] for record in inserted_records.data] + inserted_ids = inserted_records.data[0]['id'] return inserted_ids def updateDocumentsDocGroupsBulk(self, document_ids: List[int], doc_group_id: int): diff --git a/ai_ta_backend/service/retrieval_service.py b/ai_ta_backend/service/retrieval_service.py index 104f6c50..725a8dad 100644 --- a/ai_ta_backend/service/retrieval_service.py +++ b/ai_ta_backend/service/retrieval_service.py @@ -475,54 +475,51 @@ def insertDocumentGroups(self, course_name, csv_path): """ urls = [] doc_group_count = 0 + # Count of documents in all document groups docs_doc_groups_count = 0 # Fetch existing documents by URLs from CSV with open(csv_path, newline='') as csvfile: for row in csv.DictReader(csvfile): urls = eval(row['start_urls']) - print(f"fetching existing documents for course_name: {course_name} and URLs: {urls}") - existing_documents = self.sqlDb.fetchDocumentsByURLs(urls, course_name) - # Prepare data for bulk operations - data = parse_csv_and_prepare_data(course_name, existing_documents.data, row) - - document_group = { - 'name': row['university'], - 'course_name': course_name, - } - documents_to_update = existing_documents.data - - # Insert document groups in bulk and get inserted IDs - # inserted_doc_groups = self.sqlDb.insertDocumentGroupsBulk(document_groups) - print("document groups for insertion: ", document_group) - print("documents for update: ", len(documents_to_update)) - doc_group_count += 1 # Corrected to increment by 1 instead of len(document_group) which was incorrect - docs_doc_groups_count += len(documents_to_update) - with open(csv_path + 'output.csv', newline='', mode='a') as csvfile: - writer = csv.writer(csvfile) - for doc in documents_to_update: - # Include document group name in the output CSV file - writer.writerow([document_group['name']] + list(doc.values())) - # # Map documents to new document groups and update documents_doc_groups in bulk - # for doc_group, inserted_id in zip(document_groups, inserted_doc_groups): - # doc_ids = [doc['id'] for doc in documents_to_update] - # self.sqlDb.updateDocumentsDocGroupsBulk(doc_ids, inserted_id) + doc_group_name = row['university'] + # Count of documents in the document group + docs_doc_group_count = 0 + # print(f"fetching existing documents for course_name: {course_name} and URLs: {urls}") + page = 1 + while True: + print(f"fetching page: {page} for doc_group: {doc_group_name}, with urls: {urls}") + existing_documents_response = self.sqlDb.fetchDocumentsByURLs(urls, course_name, page) + existing_documents = existing_documents_response.data + if not existing_documents: + break # Exit loop if no more documents are returned + + # Process documents in batches here + document_group = {'name': doc_group_name, 'course_name': course_name} + documents_to_update = existing_documents + + print("Document groups for insertion: ", document_group) + print("Documents for update: ", len(documents_to_update)) + + docs_doc_groups_count += len(documents_to_update) + docs_doc_group_count += len(documents_to_update) + + doc_group_id = self.sqlDb.insertDocumentGroupsBulk(document_group) + + # Bulk update + self.sqlDb.updateDocumentsDocGroupsBulk(documents_to_update, doc_group_id) + + page += 1 # Move to the next page + + # Output to CSV is handled outside the pagination loop + with open('updated_documents.csv', newline='', mode='a') as output_csvfile: + writer = csv.writer(output_csvfile) + for doc in documents_to_update: + writer.writerow([document_group['name']] + list(doc.values())) + + with open('doc_groups.csv', newline='', mode='a') as output_csvfile: + writer = csv.writer(output_csvfile) + writer.writerow([doc_group_name, docs_doc_group_count]) + + doc_group_count += 1 return doc_group_count, docs_doc_groups_count - -def parse_csv_and_prepare_data(course_name: str, existing_documents: List[Dict], row) -> Dict[str, List[Dict]]: - """ - Parses the CSV file and prepares the data for document groups and documents insertion. - Adjusted to match documents based on start_urls and prepare updates for doc_groups and documents_doc_groups. - """ - document_groups = [] - documents_to_update = [] - - start_urls = eval(row['start_urls']) # Convert string representation of list to actual list - matched_documents = [doc for doc in existing_documents if doc['base_url'] in start_urls] - if matched_documents: - document_groups.append({ - 'name': row['university'], - 'course_name': course_name, - }) - documents_to_update.extend(matched_documents) - return {'document_groups': document_groups, 'documents_to_update': documents_to_update} From 9e675f3a2c21ef80ae76194c87088fa1d3dcd688 Mon Sep 17 00:00:00 2001 From: rohanmarwaha Date: Fri, 26 Apr 2024 12:40:05 -0500 Subject: [PATCH 3/4] Add exception handling and logging --- ai_ta_backend/database/sql.py | 6 +- ai_ta_backend/service/retrieval_service.py | 109 +++++++++++++-------- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index 092c8f4e..27337711 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -119,8 +119,8 @@ def fetchDocumentsByURLs(self, urls: List[str], course_name: str, page: int = 1, def insertDocumentGroupsBulk(self, document_group): # Assuming the Supabase client's insert method supports returning inserted records - inserted_records = self.supabase_client.table("doc_groups").insert(document_group).execute() - print(f"Inserted records: {inserted_records}") + inserted_records = self.supabase_client.table("doc_groups").upsert(document_group, on_conflict="name, course_name", ignore_duplicates=True).execute() + print(f"Inserted records: {inserted_records.data}") # Extract and return the IDs of the inserted document groups inserted_ids = inserted_records.data[0]['id'] return inserted_ids @@ -129,6 +129,6 @@ def updateDocumentsDocGroupsBulk(self, document_ids: List[int], doc_group_id: in # Prepare updates updates = [{"document_id": doc_id, "doc_group_id": doc_group_id} for doc_id in document_ids] # Perform bulk update - self.supabase_client.table("documents_doc_groups").upsert(updates).execute() + self.supabase_client.table("documents_doc_groups").upsert(updates,on_conflict="document_id, doc_group_id", ignore_duplicates=True).execute() diff --git a/ai_ta_backend/service/retrieval_service.py b/ai_ta_backend/service/retrieval_service.py index 725a8dad..e227568c 100644 --- a/ai_ta_backend/service/retrieval_service.py +++ b/ai_ta_backend/service/retrieval_service.py @@ -478,48 +478,73 @@ def insertDocumentGroups(self, course_name, csv_path): # Count of documents in all document groups docs_doc_groups_count = 0 # Fetch existing documents by URLs from CSV - with open(csv_path, newline='') as csvfile: - for row in csv.DictReader(csvfile): - urls = eval(row['start_urls']) - doc_group_name = row['university'] - # Count of documents in the document group - docs_doc_group_count = 0 - # print(f"fetching existing documents for course_name: {course_name} and URLs: {urls}") - page = 1 - while True: - print(f"fetching page: {page} for doc_group: {doc_group_name}, with urls: {urls}") - existing_documents_response = self.sqlDb.fetchDocumentsByURLs(urls, course_name, page) - existing_documents = existing_documents_response.data - if not existing_documents: - break # Exit loop if no more documents are returned - - # Process documents in batches here + try: + with open(csv_path, newline='') as csvfile: + for row in csv.DictReader(csvfile): + urls = eval(row['start_urls']) + doc_group_name = row['university'] + # Count of documents in the document group + docs_doc_group_count = 0 + # print(f"fetching existing documents for course_name: {course_name} and URLs: {urls}") + page = 1 + # Create document group document_group = {'name': doc_group_name, 'course_name': course_name} - documents_to_update = existing_documents - - print("Document groups for insertion: ", document_group) - print("Documents for update: ", len(documents_to_update)) - - docs_doc_groups_count += len(documents_to_update) - docs_doc_group_count += len(documents_to_update) - - doc_group_id = self.sqlDb.insertDocumentGroupsBulk(document_group) - - # Bulk update - self.sqlDb.updateDocumentsDocGroupsBulk(documents_to_update, doc_group_id) - - page += 1 # Move to the next page - - # Output to CSV is handled outside the pagination loop - with open('updated_documents.csv', newline='', mode='a') as output_csvfile: - writer = csv.writer(output_csvfile) - for doc in documents_to_update: - writer.writerow([document_group['name']] + list(doc.values())) - - with open('doc_groups.csv', newline='', mode='a') as output_csvfile: - writer = csv.writer(output_csvfile) - writer.writerow([doc_group_name, docs_doc_group_count]) - - doc_group_count += 1 + # Ingest document group + try: + doc_group_id = self.sqlDb.insertDocumentGroupsBulk(document_group) + except Exception as e: + print(f"Failed to insert document group {doc_group_name} due to: {str(e)}") + continue + while True: + print(f"fetching page: {page} for doc_group: {doc_group_name}, with urls: {urls}") + try: + existing_documents_response = self.sqlDb.fetchDocumentsByURLs(urls, course_name, page) + existing_documents = existing_documents_response.data + if not existing_documents: + break # Exit loop if no more documents are returned + except Exception as e: + print(f"Failed to fetch documents for page {page} and doc_group {doc_group_name} due to: {str(e)}") + break + + # Process documents in batches here + + documents_to_update = [doc["id"] for doc in existing_documents] + + print("Document groups for insertion: ", document_group) + print("Documents for update: ", documents_to_update) + + docs_doc_groups_count += len(documents_to_update) + docs_doc_group_count += len(documents_to_update) + + try: + # Bulk update + self.sqlDb.updateDocumentsDocGroupsBulk(documents_to_update, doc_group_id) + # print("skipping ingest for testing") + except Exception as e: + print(f"Failed to insert/update document groups for {doc_group_name} and {page} due to: {str(e)}") + + page += 1 # Move to the next page + + # Output to CSV is handled outside the pagination loop + try: + with open('final_output/updated_documents.csv', newline='', mode='a') as output_csvfile: + writer = csv.writer(output_csvfile) + for doc in existing_documents: + writer.writerow([document_group['name']] + list(doc.values())) + except Exception as e: + print(f"Failed to write updated documents to CSV for {doc_group_name} due to: {str(e.with_traceback)}") + + try: + with open('final_output/doc_groups.csv', newline='', mode='a') as output_csvfile: + writer = csv.writer(output_csvfile) + writer.writerow([doc_group_name, docs_doc_group_count]) + except Exception as e: + print(f"Failed to write document group counts to CSV for {doc_group_name} due to: {str(e.with_traceback)}") + + doc_group_count += 1 + except FileNotFoundError as e: + print(f"CSV file not found: {str(e)}") + except Exception as e: + print(f"An error occurred while processing the CSV file: {str(e.with_traceback)}") return doc_group_count, docs_doc_groups_count From 34f3c08a4f78bbc5a51335ecbc80b3f25c899ea4 Mon Sep 17 00:00:00 2001 From: rohanmarwaha Date: Tue, 14 May 2024 10:43:00 -0500 Subject: [PATCH 4/4] Update document group insertion and retrieval for Vector DB --- ai_ta_backend/database/sql.py | 7 +- ai_ta_backend/database/vector.py | 36 +++++ ai_ta_backend/main.py | 4 +- ai_ta_backend/service/retrieval_service.py | 145 +++++++++++---------- 4 files changed, 117 insertions(+), 75 deletions(-) diff --git a/ai_ta_backend/database/sql.py b/ai_ta_backend/database/sql.py index 27337711..5ce03fae 100644 --- a/ai_ta_backend/database/sql.py +++ b/ai_ta_backend/database/sql.py @@ -115,12 +115,12 @@ def fetchDocumentsByURLs(self, urls: List[str], course_name: str, page: int = 1, """ Fetch documents that have base_url matching any of the URLs in the provided list. """ - return self.supabase_client.table("documents").select("id, readable_filename, base_url").in_("base_url", urls).eq("course_name", course_name).range((page - 1) * items_per_page, page * items_per_page - 1).execute() + return self.supabase_client.table("documents").select("id, readable_filename, url, s3_path, base_url, doc_groups(name)").in_("base_url", urls).eq("course_name", course_name).range((page - 1) * items_per_page, page * items_per_page - 1).execute() def insertDocumentGroupsBulk(self, document_group): # Assuming the Supabase client's insert method supports returning inserted records - inserted_records = self.supabase_client.table("doc_groups").upsert(document_group, on_conflict="name, course_name", ignore_duplicates=True).execute() - print(f"Inserted records: {inserted_records.data}") + inserted_records = self.supabase_client.table("doc_groups").upsert(document_group, on_conflict="name, course_name", ignore_duplicates=False).execute() + print(f"Inserted document groups: {inserted_records.data}") # Extract and return the IDs of the inserted document groups inserted_ids = inserted_records.data[0]['id'] return inserted_ids @@ -130,5 +130,6 @@ def updateDocumentsDocGroupsBulk(self, document_ids: List[int], doc_group_id: in updates = [{"document_id": doc_id, "doc_group_id": doc_group_id} for doc_id in document_ids] # Perform bulk update self.supabase_client.table("documents_doc_groups").upsert(updates,on_conflict="document_id, doc_group_id", ignore_duplicates=True).execute() + diff --git a/ai_ta_backend/database/vector.py b/ai_ta_backend/database/vector.py index 9da53234..66b350b6 100644 --- a/ai_ta_backend/database/vector.py +++ b/ai_ta_backend/database/vector.py @@ -5,6 +5,7 @@ from langchain.embeddings.openai import OpenAIEmbeddings from langchain.vectorstores import Qdrant from qdrant_client import QdrantClient, models +from qdrant_client.conversions.common_types import WriteOrdering OPENAI_API_TYPE = "azure" # "openai" or "azure" @@ -91,3 +92,38 @@ def delete_data(self, collection_name: str, key: str, value: str): ), ]), ) + + def add_document_groups_to_documents(self, course_name: str, documents: List[dict], doc_group_name: str): + """ + Add document groups to documents in the vector database. + """ + print(f"Adding document groups: {doc_group_name} to documents in the vector database for course: {course_name} and {len(documents)} documents.") + update_operations = [] + for document in documents: + # print(f"Adding document groups to document: {document} ") + key = "url" if "url" in document else "s3_path" + value = models.MatchValue(value=document[key]) + searchFilter = models.Filter( + must=[ + models.FieldCondition(key="course_name", match=models.MatchValue(value=course_name)), + models.FieldCondition(key=key, + match=value)]) + + payload = { + "doc_groups": [group["name"] for group in document["doc_groups"]] + [doc_group_name], + } + + # print(f"Updating to Payload: {payload}") + + update_operations.append(models.SetPayloadOperation( + set_payload=models.SetPayload( + payload=payload, + filter=searchFilter + ), + )) + + print(f"update_operations for qdrant: {len(update_operations)}") + result = self.qdrant_client.batch_update_points( + collection_name=os.environ['QDRANT_COLLECTION_NAME'], + update_operations=update_operations, wait=False) + return result \ No newline at end of file diff --git a/ai_ta_backend/main.py b/ai_ta_backend/main.py index 785ad721..784213bf 100644 --- a/ai_ta_backend/main.py +++ b/ai_ta_backend/main.py @@ -384,9 +384,9 @@ def insert_document_groups(service: RetrievalService) -> Response: data = request.get_json() csv_path: str = data.get('csv_path', '') course_name: str = data.get('course_name', '') - doc_group_count, docs_doc_group_count = service.insertDocumentGroups(course_name, csv_path) + doc_group_count, docs_doc_group_count_sql, docs_doc_group_count_vdb = service.insertDocumentGroups(course_name, csv_path) - return jsonify({"message": "Document groups and documents inserted successfully.", "doc_group_count": doc_group_count, "docs_doc_group_count": docs_doc_group_count}) + return jsonify({"message": "Document groups and documents inserted successfully.", "doc_group_count": doc_group_count, "docs_doc_group_count_sql": docs_doc_group_count_sql, "docs_doc_group_count_vdb": docs_doc_group_count_vdb}) def configure(binder: Binder) -> None: binder.bind(RetrievalService, to=RetrievalService, scope=RequestScope) diff --git a/ai_ta_backend/service/retrieval_service.py b/ai_ta_backend/service/retrieval_service.py index e227568c..2eb81c56 100644 --- a/ai_ta_backend/service/retrieval_service.py +++ b/ai_ta_backend/service/retrieval_service.py @@ -473,78 +473,83 @@ def insertDocumentGroups(self, course_name, csv_path): Inserts document groups and documents into the database based on the CSV mapping. Additionally, saves doc group name along with the entire document info in the output csv file. """ - urls = [] - doc_group_count = 0 - # Count of documents in all document groups - docs_doc_groups_count = 0 - # Fetch existing documents by URLs from CSV + doc_group_count, docs_doc_groups_count_sql, docs_doc_groups_count_vdb = 0, 0, 0 try: - with open(csv_path, newline='') as csvfile: - for row in csv.DictReader(csvfile): - urls = eval(row['start_urls']) - doc_group_name = row['university'] - # Count of documents in the document group - docs_doc_group_count = 0 - # print(f"fetching existing documents for course_name: {course_name} and URLs: {urls}") - page = 1 - # Create document group - document_group = {'name': doc_group_name, 'course_name': course_name} - # Ingest document group - try: - doc_group_id = self.sqlDb.insertDocumentGroupsBulk(document_group) - except Exception as e: - print(f"Failed to insert document group {doc_group_name} due to: {str(e)}") - continue - while True: - print(f"fetching page: {page} for doc_group: {doc_group_name}, with urls: {urls}") - try: - existing_documents_response = self.sqlDb.fetchDocumentsByURLs(urls, course_name, page) - existing_documents = existing_documents_response.data - if not existing_documents: - break # Exit loop if no more documents are returned - except Exception as e: - print(f"Failed to fetch documents for page {page} and doc_group {doc_group_name} due to: {str(e)}") - break - - # Process documents in batches here - - documents_to_update = [doc["id"] for doc in existing_documents] - - print("Document groups for insertion: ", document_group) - print("Documents for update: ", documents_to_update) - - docs_doc_groups_count += len(documents_to_update) - docs_doc_group_count += len(documents_to_update) - - try: - # Bulk update - self.sqlDb.updateDocumentsDocGroupsBulk(documents_to_update, doc_group_id) - # print("skipping ingest for testing") - except Exception as e: - print(f"Failed to insert/update document groups for {doc_group_name} and {page} due to: {str(e)}") - - page += 1 # Move to the next page - - # Output to CSV is handled outside the pagination loop - try: - with open('final_output/updated_documents.csv', newline='', mode='a') as output_csvfile: - writer = csv.writer(output_csvfile) - for doc in existing_documents: - writer.writerow([document_group['name']] + list(doc.values())) - except Exception as e: - print(f"Failed to write updated documents to CSV for {doc_group_name} due to: {str(e.with_traceback)}") - - try: - with open('final_output/doc_groups.csv', newline='', mode='a') as output_csvfile: - writer = csv.writer(output_csvfile) - writer.writerow([doc_group_name, docs_doc_group_count]) - except Exception as e: - print(f"Failed to write document group counts to CSV for {doc_group_name} due to: {str(e.with_traceback)}") - - doc_group_count += 1 + with open(csv_path, newline='') as csvfile: + for row in csv.DictReader(csvfile): + urls = eval(row['start_urls']) + doc_group_name = row['university'] + document_group = {'name': doc_group_name, 'course_name': course_name} + document_group["id"] = self._ingest_document_group(document_group) + if document_group["id"]: + doc_group_count += 1 + sql_count, vdb_count = self._process_documents_in_group(urls, course_name, document_group) + docs_doc_groups_count_sql += sql_count + docs_doc_groups_count_vdb += vdb_count except FileNotFoundError as e: print(f"CSV file not found: {str(e)}") except Exception as e: - print(f"An error occurred while processing the CSV file: {str(e.with_traceback)}") + print(f"An error occurred while processing the CSV file: {str(e)}") - return doc_group_count, docs_doc_groups_count + return doc_group_count, docs_doc_groups_count_sql, docs_doc_groups_count_vdb + + def _ingest_document_group(self, document_group): + print(f"Inserting document group: {document_group['name']}") + try: + doc_group_id = self.sqlDb.insertDocumentGroupsBulk(document_group) + return doc_group_id + except Exception as e: + print(f"Failed to insert document group {document_group['name']} due to: {str(e)}") + return None + + def _process_documents_in_group(self, urls, course_name, document_group): + page, docs_doc_group_count_sql, docs_doc_group_count_vdb = 1, 0, 0 + while True: + print(f"fetching page: {page} for doc_group: {document_group['name']}, with urls: {urls}") + existing_documents, updated_points = self._fetch_and_update_documents(urls, course_name, page, document_group) + if not existing_documents: + break + docs_doc_group_count_sql += len(existing_documents) + docs_doc_group_count_vdb += len(updated_points if updated_points else []) + page += 1 + return docs_doc_group_count_sql, docs_doc_group_count_vdb + + def _fetch_and_update_documents(self, urls, course_name, page, document_group): + try: + existing_documents_response = self.sqlDb.fetchDocumentsByURLs(urls, course_name, page) + existing_documents = existing_documents_response.data + print(f"Existing documents for page {page} and doc_group {document_group['name']}: {len(existing_documents)}") + if not existing_documents: + return None, None + documents_to_update = [doc["id"] for doc in existing_documents] + + print(f"Updating documents for page {page} and doc_group {document_group['name']} in SQL") + self.sqlDb.updateDocumentsDocGroupsBulk(documents_to_update, document_group["id"]) + + print(f"Updating documents for page {page} and doc_group {document_group['name']} in VDB") + updated_points = self.vdb.add_document_groups_to_documents(course_name, existing_documents, document_group['name']) + + print(f"Writing updated documents to CSV for doc_group {document_group['name']} and page {page}") + + self._write_to_csv('result/updated_documents_sql.csv', existing_documents, document_group) + self._write_to_csv('result/updated_documents_vdb.csv', updated_points, document_group, is_vdb=True) + return existing_documents, updated_points + except Exception as e: + print(f"Failed to fetch/update documents for page {page} and doc_group {document_group['name']} due to: {str(e)}") + return None, None + + def _write_to_csv(self, file_path, data, document_group, is_vdb=False): + try: + print(f"Writing updated documents to CSV for {document_group['name']}, is_vdb: {is_vdb}") + with open(file_path, newline='', mode='a') as output_csvfile: + writer = csv.writer(output_csvfile) + if is_vdb: + writer.writerow(['doc_group_name', 'operation results']) + for doc in data: + writer.writerow([document_group['name']] + list(doc)) + else: + writer.writerow(['doc_group_name', 'id', 'readable_filename', 'url', 's3_path', 'base_url']) + for doc in data: + writer.writerow([document_group['name']] + list(doc.values())) + except Exception as e: + print(f"Failed to write updated documents to CSV for {document_group['name']} due to: {str(e)}")