Skip to content

Commit

Permalink
updated logging function to include missed records
Browse files Browse the repository at this point in the history
  • Loading branch information
star-nox committed Mar 22, 2024
1 parent a3578d9 commit 7d83c75
Showing 1 changed file with 87 additions and 54 deletions.
141 changes: 87 additions & 54 deletions ai_ta_backend/beam/nomic_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,14 +453,15 @@ def create_document_map(course_name: str):
return "Cannot create a map because there are less than 20 documents in the course."

first_id = response.data[0]['id']

combined_dfs = []
curr_total_doc_count = 0
doc_count = 0
first_batch = True

# iteratively query in batches of 25
while curr_total_doc_count < total_doc_count:

response = supabase_client.table("documents").select(
"id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gte(
'id', first_id).order('id', desc=False).limit(25).execute()
Expand All @@ -486,15 +487,18 @@ def create_document_map(course_name: str):
topic_label_field = "text"
colorable_fields = ["readable_filename", "text", "base_url", "created_at"]
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
# update flag
first_batch = False
# log project info to supabas
project = AtlasProject(name=project_name, add_datums_if_exists=True)
project_id = project.id
last_id = final_df['id'].iloc[-1].split("_")[0]
project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id}
response = supabase_client.table("projects").insert(project_info).execute()
print("Response from supabase: ", response)

if result == "success":
# update flag
first_batch = False
# log project info to supabase
project = AtlasProject(name=project_name, add_datums_if_exists=True)
project_id = project.id
print("Last id: ", final_df['id'].iloc[-1])
last_id = int(final_df['id'].iloc[-1])
project_info = {'course_name': course_name, 'doc_map_id': project_id, 'last_uploaded_doc_id': last_id}
update_response = supabase_client.table("projects").insert(project_info).execute()
print("Response from supabase: ", update_response)

else:
# append to existing map
Expand All @@ -504,14 +508,16 @@ def create_document_map(course_name: str):
result = append_to_map(embeddings, metadata, project_name)
if result == "success":
# update the last uploaded id in supabase
last_id = final_df['id'].iloc[-1].split("_")[0]
project_info = {'last_uploaded_doc_id': last_id}
response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute()
print("Response from supabase: ", response)
last_id = int(final_df['id'].iloc[-1])
info = {'last_uploaded_doc_id': last_id}
print("info:", info)
update_response = supabase_client.table("projects").update(info).eq("course_name", course_name).execute()
print("Response from supabase: ", update_response)

# reset variables
combined_dfs = []
doc_count = 0
print("Records uploaded: ", curr_total_doc_count)

# set first_id for next iteration
first_id = response.data[-1]['id'] + 1
Expand All @@ -528,12 +534,15 @@ def create_document_map(course_name: str):
result = create_map(embeddings, metadata, project_name, index_name, topic_label_field, colorable_fields)
else:
result = append_to_map(embeddings, metadata, project_name)

# update the last uploaded id in supabase
if result == "success":
# update the last uploaded id in supabase
last_id = final_df['id'].iloc[-1].split("_")[0]
last_id = int(final_df['id'].iloc[-1])
project_info = {'last_uploaded_doc_id': last_id}
response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute()
print("Response from supabase: ", response)
print("project_info: ", project_info)
update_response = supabase_client.table("projects").update(project_info).eq("course_name", course_name).execute()
print("Response from supabase: ", update_response)
print("Atlas upload status: ", result)

# rebuild the map
Expand Down Expand Up @@ -576,7 +585,7 @@ def delete_from_document_map(course_name: str, ids: list):
return "Error in deleting from document map: {e}"


def log_to_document_map(data: dict):
def log_to_document_map(course_name: str):
"""
This is a function which appends new documents to an existing document map. It's called
at the end of split_and_upload() after inserting data to Supabase.
Expand All @@ -586,8 +595,7 @@ def log_to_document_map(data: dict):
print("in add_to_document_map()")

try:
# check if map exists
course_name = data['course_name']
# check if map exists
response = SUPABASE_CLIENT.table("projects").select("doc_map_id, last_uploaded_doc_id").eq("course_name", course_name).execute()
if response.data:
project_id = response.data[0]['doc_map_id']
Expand All @@ -604,51 +612,76 @@ def log_to_document_map(data: dict):
last_uploaded_doc_id = response.data[0]['last_uploaded_doc_id']

project = AtlasProject(project_id=project_id, add_datums_if_exists=True)
#print("Inserted data: ", data)
project_name = "Document Map for " + course_name
# check if project is locked, if yes -> skip logging
if not project.is_accepting_data:
return "Skipping Nomic logging because project is locked."
else:

# fetch count of records greater than last_uploaded_doc_id
print("last uploaded doc id: ", last_uploaded_doc_id)
response = SUPABASE_CLIENT.table("documents").select("id", count="exact").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute()
print("Number of new documents: ", response.count)

total_doc_count = response.count
current_doc_count = 0
combined_dfs = []

while current_doc_count < total_doc_count:
# fetch all records from supabase greater than last_uploaded_doc_id
response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).execute()
response = SUPABASE_CLIENT.table("documents").select("id, created_at, s3_path, url, base_url, readable_filename, contexts").eq("course_name", course_name).gt("id", last_uploaded_doc_id).limit(25).execute()
df = pd.DataFrame(response.data)


combined_dfs.append(df) # list of dfs

current_doc_count += len(response.data)
doc_count += len(response.data)

embeddings = []
metadata = []
context_count = 0
# prep data for nomic upload
for row in data['contexts']:
context_count += 1
embeddings.append(row['embedding'])
metadata.append({
"id": str(data['id']) + "_" + str(context_count),
"created_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"s3_path": data['s3_path'],
"url": data['url'],
"base_url": data['base_url'],
"readable_filename": data['readable_filename'],
"modified_at": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"text": row['text']
})
embeddings = np.array(embeddings)
metadata = pd.DataFrame(metadata)
print("Shape of embeddings: ", embeddings.shape)

# append to existing map
project_name = "Document Map for " + course_name
result = append_to_map(embeddings, metadata, project_name)
if doc_count >= 1000: # upload to Nomic in batches of 1000
# concat all dfs from the combined_dfs list
final_df = pd.concat(combined_dfs, ignore_index=True)
# prep data for nomic upload
embeddings, metadata = data_prep_for_doc_map(final_df)

return result
# append to existing map
print("Appending data to existing map...")

result = append_to_map(embeddings, metadata, project_name)
if result == "success":
# update the last uploaded id in supabase
last_id = int(final_df['id'].iloc[-1])
info = {'last_uploaded_doc_id': last_id}
print("info:", info)
update_response = SUPABASE_CLIENT.table("projects").update(info).eq("course_name", course_name).execute()
print("Response from supabase: ", update_response)

# reset variables
combined_dfs = []
doc_count = 0
print("Records uploaded: ", curr_total_doc_count)

# set first_id for next iteration
first_id = response.data[-1]['id'] + 1

# upload last set of docs
if doc_count > 0:
final_df = pd.concat(combined_dfs, ignore_index=True)
embeddings, metadata = data_prep_for_doc_map(final_df)
result = append_to_map(embeddings, metadata, project_name)

# update the last uploaded id in supabase
if result == "success":
# update the last uploaded id in supabase
last_id = int(final_df['id'].iloc[-1])
project_info = {'last_uploaded_doc_id': last_id}
print("project_info: ", project_info)
update_response = SUPABASE_CLIENT.table("projects").update(project_info).eq("course_name", course_name).execute()
print("Response from supabase: ", update_response)

return "success"
except Exception as e:
print(e)
sentry_sdk.capture_exception(e)
return "Error in appending to map: {e}"


return "failed"


def create_map(embeddings, metadata, map_name, index_name, topic_label_field, colorable_fields):
"""
Generic function to create a Nomic map from given parameters.
Expand Down

0 comments on commit 7d83c75

Please sign in to comment.