Skip to content
Merged
17 changes: 6 additions & 11 deletions app/repositories/user_weight_repository.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import List, Dict
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection
from pymongo import UpdateOne

from app.models import db_w2v_mapper
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection


class UserWeightRepository:
Expand All @@ -15,7 +13,6 @@ def update_user_weights(
):
operations = []
for meta_id, name in meta_info:
name = db_w2v_mapper.translate_genre(name)
operations.append(
UpdateOne(
{"user_id": user_id, "meta_info_id": meta_id},
Expand All @@ -36,8 +33,7 @@ async def update_user_weights_from_log(self, log: dict, weight: float):
operations = []

# 1. 장르
for genre in meta_info.get("genres", []):
name = db_w2v_mapper.translate_genre(genre)
for name in meta_info.get("genres", []):
operations.append(
UpdateOne(
{"user_id": user_id, "name": name},
Expand Down Expand Up @@ -86,9 +82,9 @@ async def find_by_user_id(self, user_id: int) -> List[Dict]:
results = await cursor.to_list(length=None)
return results

async def reset_weight(self, user_id: int, genre: str, weight: float):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: reset_weight 메서드가 UserWeightRepository에서 제거되었는데
daily_wieght_resizer.py의 실패 로그 재시도 부분(124줄)에서 사용되고 있습니다! 확인 부탁드립니다

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

미처 확인하지 못한 부분이네요. 반영 완료했습니다!

filter = {"user_id": user_id, "name": genre}
update = {"$set": {"weight": weight}}
async def update_user_weight(self, user_id: int, meta_info_id: str, diff: float):
filter = {"user_id": user_id, "meta_info_id": meta_info_id}
update = {"$inc": {"weight": diff}}
await self.collection.update_one(filter, update, upsert=True)

async def decrease_user_weights_from_log(self, log: dict, weight: float):
Expand All @@ -97,8 +93,7 @@ async def decrease_user_weights_from_log(self, log: dict, weight: float):

operations = []

for genre in meta_info.get("genres", []):
name = db_w2v_mapper.translate_genre(genre)
for name in meta_info.get("genres", []):
operations.append(
UpdateOne(
{"user_id": user_id, "name": name},
Expand Down
68 changes: 30 additions & 38 deletions app/services/daily_weight_resizer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
import logging
import math
import time
import numpy as np
from collections import defaultdict
import app
Expand Down Expand Up @@ -48,6 +46,7 @@ async def resize_weight(
actor_dict = defaultdict(int)
director_dict = defaultdict(int)
country_dict = defaultdict(int)
genre_name_dict = defaultdict(int) # 벡터 업데이트용
for log in logs:
action_type = ActionType[log['action']]
value = int(log['value'])
Expand All @@ -56,57 +55,59 @@ async def resize_weight(
weight = weight_strategy.convert_to_weight(action_type, value)
# 가중치 resize
resized_weight = exponential_decay_weight(weight, log['timestamp'])
diff = resized_weight - weight

meta = log.get('metaInfo', {})
genres = meta.get('genres', {})
actors = meta.get('actors', {})
directors = meta.get('director', {})
countries = meta.get('country', {})

for _, genre_name in genres.items():
genre_dict[genre_name] += resized_weight
for _, actor_name in actors.items():
actor_dict[actor_name] += resized_weight
for _, director_name in directors.items():
director_dict[director_name] += resized_weight
for _, country_name in countries.items():
country_dict[country_name] += resized_weight
for meta_info_id, genre_name in genres.items():
genre_dict[meta_info_id] += diff
genre_name_dict[genre_name] += diff
for meta_info_id, _ in actors.items():
actor_dict[meta_info_id] += diff
for meta_info_id, _ in directors.items():
director_dict[meta_info_id] += diff
for meta_info_id, _ in countries.items():
country_dict[meta_info_id] += diff


# MongoDB에 resized 가중치 저장
for genre_name, resized_weight in genre_dict.items():
for genre_id, diff in genre_dict.items():
try:
await user_weight_repo.reset_weight(user_id, genre_name, resized_weight)
await user_weight_repo.update_user_weight(user_id, genre_id, diff)
except Exception:
failed.append((user_id, genre_name, resize_weight))
failed.append((user_id, genre_id, diff))

await asyncio.sleep(5)
for actor_name, resized_weight in actor_dict.items():
for actor_id, diff in actor_dict.items():
try:
await user_weight_repo.reset_weight(user_id, actor_name, resized_weight)
await user_weight_repo.update_user_weight(user_id, actor_id, diff)
except Exception:
failed.append((user_id, actor_name, resize_weight))
failed.append((user_id, actor_id, diff))

for director_name, resized_weight in director_dict.items():
for director_id, diff in director_dict.items():
try:
await user_weight_repo.reset_weight(user_id, director_name, resized_weight)
await user_weight_repo.update_user_weight(user_id, director_id, diff)
except Exception:
failed.append((user_id, director_name, resize_weight))
failed.append((user_id, director_id, diff))

for country_name, resized_weight in country_dict.items():
for country_id, diff in country_dict.items():
try:
await user_weight_repo.reset_weight(user_id, country_name, resized_weight)
await user_weight_repo.update_user_weight(user_id, country_id, diff)
except Exception:
failed.append((user_id, country_name, resize_weight))
failed.append((user_id, country_id, diff))

# resized 가중치 기반으로 벡터 계산
vector = calc_user_vector(genre_dict)
vector = calc_user_vector(genre_name_dict)
vector_str = np.array2string(vector, separator=', ')

for attempt in range(1, MAX_RETRIES + 1):
try:
await redis.save_user_vector(user_id, vector_str)
print(f'{LOG_PREFIX} 사용자 {user_id} 벡터 Redis 저장 완료')
print(f'사용자 {user_id} 벡터 Redis 저장 완료')
break
except Exception as e:
if attempt < MAX_RETRIES:
Expand All @@ -117,35 +118,26 @@ async def resize_weight(
# 실패 로그 재시도
error_logs_cnt = 0
if (len(failed) > 0):
for user_id, genre_name, resized_weight in failed:
for user_id, meta_info_id, diff in failed:
for attempt in range(1, MAX_RETRIES + 1):
try:
await user_weight_repo.reset_weight(user_id, genre_name, resized_weight)
await user_weight_repo.update_user_weight(user_id, meta_info_id, diff)
break
except Exception as e:
if attempt < MAX_RETRIES:
await gen_warning_log(f"[{attempt}/{MAX_RETRIES}] 보상 트랜잭션 재시도", e)
else:
error_logs_cnt += 1
await gen_error_log(f"보상트랜잭션 실패, log_id: {log['_id']}", e)
await gen_error_log(f"보상 트랜잭션 실패, log_id: {log['_id']}", e)
if error_logs_cnt == 0:
print("✅ 보상 트랜잭션 재시도 성공")
else:
print(f"{LOG_PREFIX} 💥 보상 트랜잭션 재시도 {error_logs_cnt}개 실패")
print(f"💥 보상 트랜잭션 재시도 {error_logs_cnt}개 실패")
else:
print(f"{LOG_PREFIX} 시도할 보상 트랜잭션 없음")
print("시도할 보상 트랜잭션 없음")
print("✅ 가중치 resizing 완료")
return


def calc_resized_weight(timestamp : int, weight : float):
current_timestamp_ms = int(time.time() * 1000)
delta_ms = current_timestamp_ms - timestamp
delta_days = delta_ms / (1000 * 60 * 60 * 24)

resized_weight = weight * math.exp(-1 * delta_days)
return resized_weight

def group_logs_by_user_id(logs: List[Dict]) -> Dict[int, List[Dict]]:
grouped = defaultdict(list)
for log in logs:
Expand Down