diff --git a/app/repositories/user_weight_repository.py b/app/repositories/user_weight_repository.py index 3898683..dbea855 100644 --- a/app/repositories/user_weight_repository.py +++ b/app/repositories/user_weight_repository.py @@ -1,8 +1,6 @@ from typing import List, Dict -from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection from pymongo import UpdateOne - -from app.models import db_w2v_mapper +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection class UserWeightRepository: @@ -15,7 +13,6 @@ def update_user_weights( ): operations = [] for meta_id, name in meta_info: - name = db_w2v_mapper.translate_genre(name) operations.append( UpdateOne( {"user_id": user_id, "meta_info_id": meta_id}, @@ -36,8 +33,7 @@ async def update_user_weights_from_log(self, log: dict, weight: float): operations = [] # 1. 장르 - for genre in meta_info.get("genres", []): - name = db_w2v_mapper.translate_genre(genre) + for name in meta_info.get("genres", []): operations.append( UpdateOne( {"user_id": user_id, "name": name}, @@ -86,9 +82,9 @@ async def find_by_user_id(self, user_id: int) -> List[Dict]: results = await cursor.to_list(length=None) return results - async def reset_weight(self, user_id: int, genre: str, weight: float): - filter = {"user_id": user_id, "name": genre} - update = {"$set": {"weight": weight}} + async def update_user_weight(self, user_id: int, meta_info_id: str, diff: float): + filter = {"user_id": user_id, "meta_info_id": meta_info_id} + update = {"$inc": {"weight": diff}} await self.collection.update_one(filter, update, upsert=True) async def decrease_user_weights_from_log(self, log: dict, weight: float): @@ -97,8 +93,7 @@ async def decrease_user_weights_from_log(self, log: dict, weight: float): operations = [] - for genre in meta_info.get("genres", []): - name = db_w2v_mapper.translate_genre(genre) + for name in meta_info.get("genres", []): operations.append( UpdateOne( {"user_id": user_id, "name": name}, diff --git a/app/services/daily_weight_resizer.py b/app/services/daily_weight_resizer.py index 74862c1..87a56e8 100644 --- a/app/services/daily_weight_resizer.py +++ b/app/services/daily_weight_resizer.py @@ -1,7 +1,5 @@ import asyncio import logging -import math -import time import numpy as np from collections import defaultdict import app @@ -48,6 +46,7 @@ async def resize_weight( actor_dict = defaultdict(int) director_dict = defaultdict(int) country_dict = defaultdict(int) + genre_name_dict = defaultdict(int) # 벡터 업데이트용 for log in logs: action_type = ActionType[log['action']] value = int(log['value']) @@ -56,6 +55,7 @@ async def resize_weight( weight = weight_strategy.convert_to_weight(action_type, value) # 가중치 resize resized_weight = exponential_decay_weight(weight, log['timestamp']) + diff = resized_weight - weight meta = log.get('metaInfo', {}) genres = meta.get('genres', {}) @@ -63,50 +63,51 @@ async def resize_weight( directors = meta.get('director', {}) countries = meta.get('country', {}) - for _, genre_name in genres.items(): - genre_dict[genre_name] += resized_weight - for _, actor_name in actors.items(): - actor_dict[actor_name] += resized_weight - for _, director_name in directors.items(): - director_dict[director_name] += resized_weight - for _, country_name in countries.items(): - country_dict[country_name] += resized_weight + for meta_info_id, genre_name in genres.items(): + genre_dict[meta_info_id] += diff + genre_name_dict[genre_name] += diff + for meta_info_id, _ in actors.items(): + actor_dict[meta_info_id] += diff + for meta_info_id, _ in directors.items(): + director_dict[meta_info_id] += diff + for meta_info_id, _ in countries.items(): + country_dict[meta_info_id] += diff # MongoDB에 resized 가중치 저장 - for genre_name, resized_weight in genre_dict.items(): + for genre_id, diff in genre_dict.items(): try: - await user_weight_repo.reset_weight(user_id, genre_name, resized_weight) + await user_weight_repo.update_user_weight(user_id, genre_id, diff) except Exception: - failed.append((user_id, genre_name, resize_weight)) + failed.append((user_id, genre_id, diff)) await asyncio.sleep(5) - for actor_name, resized_weight in actor_dict.items(): + for actor_id, diff in actor_dict.items(): try: - await user_weight_repo.reset_weight(user_id, actor_name, resized_weight) + await user_weight_repo.update_user_weight(user_id, actor_id, diff) except Exception: - failed.append((user_id, actor_name, resize_weight)) + failed.append((user_id, actor_id, diff)) - for director_name, resized_weight in director_dict.items(): + for director_id, diff in director_dict.items(): try: - await user_weight_repo.reset_weight(user_id, director_name, resized_weight) + await user_weight_repo.update_user_weight(user_id, director_id, diff) except Exception: - failed.append((user_id, director_name, resize_weight)) + failed.append((user_id, director_id, diff)) - for country_name, resized_weight in country_dict.items(): + for country_id, diff in country_dict.items(): try: - await user_weight_repo.reset_weight(user_id, country_name, resized_weight) + await user_weight_repo.update_user_weight(user_id, country_id, diff) except Exception: - failed.append((user_id, country_name, resize_weight)) + failed.append((user_id, country_id, diff)) # resized 가중치 기반으로 벡터 계산 - vector = calc_user_vector(genre_dict) + vector = calc_user_vector(genre_name_dict) vector_str = np.array2string(vector, separator=', ') for attempt in range(1, MAX_RETRIES + 1): try: await redis.save_user_vector(user_id, vector_str) - print(f'{LOG_PREFIX} 사용자 {user_id} 벡터 Redis 저장 완료') + print(f'사용자 {user_id} 벡터 Redis 저장 완료') break except Exception as e: if attempt < MAX_RETRIES: @@ -117,35 +118,26 @@ async def resize_weight( # 실패 로그 재시도 error_logs_cnt = 0 if (len(failed) > 0): - for user_id, genre_name, resized_weight in failed: + for user_id, meta_info_id, diff in failed: for attempt in range(1, MAX_RETRIES + 1): try: - await user_weight_repo.reset_weight(user_id, genre_name, resized_weight) + await user_weight_repo.update_user_weight(user_id, meta_info_id, diff) break except Exception as e: if attempt < MAX_RETRIES: await gen_warning_log(f"[{attempt}/{MAX_RETRIES}] 보상 트랜잭션 재시도", e) else: error_logs_cnt += 1 - await gen_error_log(f"보상트랜잭션 실패, log_id: {log['_id']}", e) + await gen_error_log(f"보상 트랜잭션 실패, log_id: {log['_id']}", e) if error_logs_cnt == 0: print("✅ 보상 트랜잭션 재시도 성공") else: - print(f"{LOG_PREFIX} 💥 보상 트랜잭션 재시도 {error_logs_cnt}개 실패") + print(f"💥 보상 트랜잭션 재시도 {error_logs_cnt}개 실패") else: - print(f"{LOG_PREFIX} 시도할 보상 트랜잭션 없음") + print("시도할 보상 트랜잭션 없음") print("✅ 가중치 resizing 완료") return - -def calc_resized_weight(timestamp : int, weight : float): - current_timestamp_ms = int(time.time() * 1000) - delta_ms = current_timestamp_ms - timestamp - delta_days = delta_ms / (1000 * 60 * 60 * 24) - - resized_weight = weight * math.exp(-1 * delta_days) - return resized_weight - def group_logs_by_user_id(logs: List[Dict]) -> Dict[int, List[Dict]]: grouped = defaultdict(list) for log in logs: