Skip to content

Commit

Permalink
modified create_doc_map() to track last uploaded ids
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox committed Mar 22, 2024
1 parent b4792f7 commit a3578d9
Showing 1 changed file with 59 additions and 36 deletions.
95 changes: 59 additions & 36 deletions ai_ta_backend/beam/nomic_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ def create_document_map(course_name: str):

try:
# check if map exists
# response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute()
# if response.data:
# return "Map already exists for this course."
response = supabase_client.table("projects").select("doc_map_id").eq("course_name", course_name).execute()
if response.data:
return "Map already exists for this course."

# fetch relevant document data from Supabase
response = supabase_client.table("documents").select("id",
Expand Down Expand Up @@ -470,8 +470,7 @@ def create_document_map(course_name: str):
curr_total_doc_count += len(response.data)
doc_count += len(response.data)


if doc_count >= 100: # upload to Nomic in batches of 1000
if doc_count >= 1000: # upload to Nomic in batches of 1000

# concat all dfs from the combined_dfs list
final_df = pd.concat(combined_dfs, ignore_index=True)
Expand All @@ -489,14 +488,27 @@ def create_document_map(course_name: str):
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
# update flag
first_batch = False
# log project info to supabas
project = AtlasProject(name=project_name, add_datums_if_exists=True)
project_id = project.id
last_id = final_df['id'].iloc[-1].split("_")[0]
project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id}
response = supabase_client.table("projects").insert(project_info).execute()
print("Response from supabase: ", response)

else:
# append to existing map
print("Appending data to existing map...")
project_name = NOMIC_MAP_NAME_PREFIX + course_name
# add project lock logic here
result = append_to_map(embeddings, metadata, project_name)

if result == "success":
# update the last uploaded id in supabase
last_id = final_df['id'].iloc[-1].split("_")[0]
project_info = {'last_uploaded_doc_id': last_id}
response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute()
print("Response from supabase: ", response)

# reset variables
combined_dfs = []
doc_count = 0
Expand All @@ -505,35 +517,33 @@ def create_document_map(course_name: str):
first_id = response.data[-1]['id'] + 1

# upload last set of docs
final_df = pd.concat(combined_dfs, ignore_index=True)
embeddings, metadata = data_prep_for_doc_map(final_df)
project_name = NOMIC_MAP_NAME_PREFIX + course_name
if first_batch:
index_name = course_name + "_doc_index"
topic_label_field = "text"
colorable_fields = ["readable_filename", "text", "base_url", "created_at"]
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
else:
result = append_to_map(embeddings, metadata, project_name)
print("Atlas upload status: ", result)

# log info to supabase
project = AtlasProject(name=project_name, add_datums_if_exists=True)
project_id = project.id
project.rebuild_maps()
project_info = {'course_name': course_name, 'doc_map_id': project_id}
response = supabase_client.table("projects").insert(project_info).execute()
print("Response from supabase: ", response)
return "success"
if doc_count > 0:
final_df = pd.concat(combined_dfs, ignore_index=True)
embeddings, metadata = data_prep_for_doc_map(final_df)
project_name = NOMIC_MAP_NAME_PREFIX + course_name
if first_batch:
index_name = course_name + "_doc_index"
topic_label_field = "text"
colorable_fields = ["readable_filename", "text", "base_url", "created_at"]
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
else:
result = append_to_map(embeddings, metadata, project_name)
if result == "success":
# update the last uploaded id in supabase
last_id = final_df['id'].iloc[-1].split("_")[0]
project_info = {'last_uploaded_doc_id': last_id}
response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute()
print("Response from supabase: ", response)
print("Atlas upload status: ", result)

# rebuild the map
rebuild_map(course_name, "document")

except Exception as e:
print(e)
sentry_sdk.capture_exception(e)

return "failed"



def delete_from_document_map(course_name: str, ids: list):
"""
This function is used to delete datapoints from a document map.
Expand All @@ -559,7 +569,7 @@ def delete_from_document_map(course_name: str, ids: list):
print("Deleting point from document map:", project.delete_data(ids))
with project.wait_for_project_lock():
project.rebuild_maps()
return "Successfully deleted from Nomic map"
return "success"
except Exception as e:
print(e)
sentry_sdk.capture_exception(e)
Expand All @@ -578,21 +588,33 @@ def log_to_document_map(data: dict):
try:
# check if map exists
course_name = data['course_name']
response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute()
response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute()
if response.data:
project_id = response.data[0]['doc_map_id']
last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id']
else:
# create a map
map_creation_result = create_document_map(course_name)
if map_creation_result != "success":
return "The project has less than 20 documents and a map cannot be created."
else:
# fetch project id
response = SUPABASE_CLIENT.table("projects").select("doc_map_id").eq("course_name", course_name).execute()
response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute()
project_id = response.data[0]['doc_map_id']
last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id']

project = AtlasProject(project_id=project_id, add_datums_if_exists=True)
#print("Inserted data: ", data)
# check if project is locked, if yes -> skip logging
if not project.is_accepting_data:
return "Skipping Nomic logging because project is locked."
else:
# fetch all records from supabase greater than last_uploaded_doc_id
response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute()
df = pd.DataFrame(response.data)




embeddings = []
metadata = []
Expand All @@ -603,11 +625,12 @@ def log_to_document_map(data: dict):
embeddings.append(row['embedding'])
metadata.append({
"id": str(data['id']) + "_" + str(context_count),
"doc_ingested_at": data['created_at'],
"created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"s3_path": data['s3_path'],
"url": data['url'],
"base_url": data['base_url'],
"readable_filename": data['readable_filename'],
"created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"modified_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"text": row['text']
})
embeddings = np.array(embeddings)
Expand Down Expand Up @@ -667,7 +690,7 @@ def append_to_map(embeddings, metadata, map_name):
project = atlas.AtlasProject(name=map_name, add_datums_if_exists=True)
with project.wait_for_project_lock():
project.add_embeddings(embeddings=embeddings, data=metadata)
return "Successfully appended to Nomic map"
return "success"
except Exception as e:
print(e)
return "Error in appending to map: {e}"
Expand Down Expand Up @@ -750,7 +773,7 @@ def rebuild_map(course_name:str, map_type:str):

with project.wait_for_project_lock():
project.rebuild_maps()
return "Successfully rebuilt map"
return "success"
except Exception as e:
print(e)
sentry_sdk.capture_exception(e)
Expand Down

0 comments on commit a3578d9

Please sign in to comment.