diff --git a/app/main.py b/app/main.py index e0e5f4f..bf6d1bb 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,5 @@ import asyncio from contextlib import asynccontextmanager -from functools import partial from logging import getLogger from app.logger import setup_logging @@ -10,14 +9,10 @@ from motor.motor_asyncio import AsyncIOMotorClient from app.models.word2vec_model import Word2VecModel -from app.repositories.action_log_repository import ActionLogRepository -from app.repositories.postgresql_repository import get_genres_by_content_id -from app.repositories.user_weight_repository import UserWeightRepository -from app.router import recommend, content, scheduler_router, user, embedding +from app.router import recommend, content, user, embedding from app.services.consumer import start_consumer from app.services.redis import init_redis, close_redis from app.router import test_log -from app.services.scheduler_service import resize_weight from app.settings import settings scheduler = BackgroundScheduler(timezone='Asia/Seoul') @@ -37,8 +32,6 @@ async def load_w2v(app: FastAPI): # MongoDB 연결 mongo_client = AsyncIOMotorClient(MONGO_URI) app.state.mongo_client = mongo_client - action_log_repo = ActionLogRepository(mongo_client) - user_weight_repo = UserWeightRepository(mongo_client) # PostgreSQL 연결 pg_pool = await asyncpg.create_pool( @@ -51,23 +44,6 @@ async def load_w2v(app: FastAPI): app.state.pg_pool = pg_pool print("✅ PostgreSQL 연결 완료") - # resize_weight를 위한 스케줄링 함수 정의 - async def schedule_resize_weight(): - await resize_weight( - action_log_repo, - user_weight_repo, - partial(get_genres_by_content_id, pg_pool), - ) - - loop = asyncio.get_running_loop() - - def schedule_resize_weight_wrapper(): - asyncio.run_coroutine_threadsafe(schedule_resize_weight(), loop) - - scheduler.add_job(schedule_resize_weight_wrapper, "cron", hour=3, minute=0) - scheduler.start() - print("✅ APScheduler 설정 완료") - @asynccontextmanager async def lifespan(app: FastAPI): @@ -97,7 +73,6 @@ async def lifespan(app: FastAPI): app.include_router(content.router) app.include_router(user.router) app.include_router(embedding.router) -app.include_router(scheduler_router.router) app.include_router(test_log.router) diff --git a/app/router/scheduler_router.py b/app/router/scheduler_router.py deleted file mode 100644 index 86bc8d6..0000000 --- a/app/router/scheduler_router.py +++ /dev/null @@ -1,23 +0,0 @@ -from fastapi import APIRouter, Request -from functools import partial -from app.repositories.action_log_repository import ActionLogRepository -from app.repositories.user_weight_repository import UserWeightRepository -from app.repositories.postgresql_repository import get_genres_by_content_id -from app.services.scheduler_service import resize_weight - -router = APIRouter() - -@router.post("/contents/resize-weight") -async def trigger_resize_weight(request: Request): - # FastAPI app에서 공유 객체 가져오기 - pg_pool = request.app.state.pg_pool - mongo_client = request.app.state.mongo_client - - action_log_repo = ActionLogRepository(mongo_client) - user_weight_repo = UserWeightRepository(mongo_client) - get_genres_func = partial(get_genres_by_content_id, pg_pool) - - # 실제 비동기 가중치 재계산 실행 - await resize_weight(action_log_repo, user_weight_repo, get_genres_func) - - return {"message": "ok"} diff --git a/app/services/scheduler_service.py b/app/services/scheduler_service.py deleted file mode 100644 index 52f6160..0000000 --- a/app/services/scheduler_service.py +++ /dev/null @@ -1,67 +0,0 @@ -import math -import time -import numpy as np -from typing import Awaitable, Callable, Dict, List -from collections import defaultdict -from app.models.word2vec_util import calc_user_vector -from app.services import weight_strategy -from app.enum.action_type import ActionType -from app.models import db_w2v_mapper -from app.services.redis import save_user_vector -from app.util.weight_aging import exponential_decay_weight -from app.repositories.action_log_repository import ActionLogRepository -from app.repositories.user_weight_repository import UserWeightRepository - -async def resize_weight( - action_log_repo: ActionLogRepository, - user_weight_repo: UserWeightRepository, - get_genres_by_content_id_func: Callable[[int], Awaitable[List[str]]] -): - all_logs = await action_log_repo.find_all_order_by_user_id() - grouped_logs = group_logs_by_user_id(all_logs) - - for user_id, logs in grouped_logs.items(): - genre_dict = defaultdict(int) - for log in logs: - try: - action_type = ActionType[log['action']] - except KeyError: - continue - - value = int(log['value']) - weight = weight_strategy.convert_to_weight(action_type, value) - weight = exponential_decay_weight(weight, log['timestamp']) - - content_id = log['contentId'] - genres = await get_genres_by_content_id_func(content_id) - for genre in genres: - translated = db_w2v_mapper.translate_genre(genre) - if translated: - genre_dict[translated] += weight - - for genre_name, weight in genre_dict.items(): - await user_weight_repo.reset_weight(user_id, genre_name, weight) - - vector = calc_user_vector(genre_dict) - vector_str = np.array2string(vector, separator=', ') - print(vector_str) - await save_user_vector(user_id, vector_str) - - print("✅ 가중치 재계산 완료") - - return - -def calc_resized_weight(timestamp : int, weight : float): - current_timestamp_ms = int(time.time() * 1000) # 현재 시간 (밀리초) - delta_ms = current_timestamp_ms - timestamp - delta_days = delta_ms / (1000 * 60 * 60 * 24) # 하루 단위 경과 시간 - - resized_weight = weight * math.exp(-1 * delta_days) - return resized_weight - -def group_logs_by_user_id(logs: List[Dict]) -> Dict[int, List[Dict]]: - grouped = defaultdict(list) - for log in logs: - user_id = log["userId"] - grouped[user_id].append(log) - return dict(grouped) \ No newline at end of file