1+ import sys
2+ import os
3+ import asyncio
4+ import pandas as pd
5+ import random
6+ from datetime import datetime , timedelta , timezone
7+
8+ # 프로젝트 루트 경로를 sys.path에 추가
9+ PROJECT_ROOT_PATH = os .path .dirname (os .path .dirname (os .path .dirname (os .path .abspath (__file__ ))))
10+ sys .path .append (os .path .join (PROJECT_ROOT_PATH , 'ELK' ))
11+
12+ from app .services .elasticsearch_service import ElasticsearchService
13+
14+ # --- 설정 ---
15+ # 이 스크립트는 Docker 외부에서 실행되므로, 명시된 IP로 접속합니다.
16+ ES_HOST = "15.164.50.188"
17+ USER_DATA_PATH = os .path .join (PROJECT_ROOT_PATH , "data" , "user_db.json" )
18+ NUM_SEARCHES_PER_USER = 2
19+ NUM_CLICKS_PER_SEARCH = (3 , 4 )
20+
21+ # 현실적인 검색어 목록
22+ SEARCH_QUERIES = ["강남" , "홍대" , "맛집" , "카페" , "성수" , "잠실" , "이태원" , "한남동" ]
23+
24+ class DummyDataGenerator :
25+ def __init__ (self , es_host : str , user_data_path : str ):
26+ print ("Elasticsearch 서비스에 연결 중..." )
27+ self .es_service = ElasticsearchService (host = es_host )
28+ if not self .es_service .is_connected ():
29+ print ("Elasticsearch 연결 실패. 스크립트를 종료합니다." )
30+ sys .exit (1 )
31+
32+ print (f"사용자 데이터 로드 중: { user_data_path } " )
33+ try :
34+ # JSON 파일을 DataFrame으로 읽습니다.
35+ self .users_df = pd .read_json (user_data_path )
36+ except FileNotFoundError :
37+ print (f"오류: 사용자 데이터 파일({ user_data_path } )을 찾을 수 없습니다." )
38+ print ("프로젝트 루트에서 'dvc pull data/user_db.json.dvc'를 실행했는지 확인하세요." )
39+ sys .exit (1 )
40+ except Exception as e :
41+ print (f"오류: JSON 파일 파싱 중 문제 발생 - { e } " )
42+ sys .exit (1 )
43+
44+ self .search_results_pool = {}
45+
46+ async def _prepare_search_pool (self ):
47+ """미리 정의된 검색어에 대한 실제 장소 목록을 가져와 풀을 만듭니다."""
48+ print ("현실적인 데이터 생성을 위해 검색어 풀을 준비합니다..." )
49+ for query in SEARCH_QUERIES :
50+ print (f" '{ query } '에 대한 장소 검색 중..." )
51+ # max_results를 기본값(23)으로 사용
52+ places = self .es_service .search_places (query = query )
53+ if places :
54+ self .search_results_pool [query ] = [p ['uuid' ] for p in places ]
55+
56+ if not self .search_results_pool :
57+ print ("오류: 검색 결과 풀을 생성할 수 없습니다. place_data 인덱스에 데이터가 있는지 확인하세요." )
58+ sys .exit (1 )
59+ print ("검색어 풀 준비 완료." )
60+
61+ async def generate (self ):
62+ """전체 더미 데이터 생성 프로세스를 실행합니다."""
63+ await self ._prepare_search_pool ()
64+
65+ user_ids = self .users_df ['id' ].unique ()
66+ total_users = len (user_ids )
67+ print (f"\n 총 { total_users } 명의 사용자에 대해 더미 데이터 생성을 시작합니다." )
68+
69+ for i , user_id in enumerate (user_ids ):
70+ print (f"--- [{ i + 1 } /{ total_users } ] 사용자 '{ user_id } ' 작업 시작 ---" )
71+
72+ # 각 사용자의 시간은 과거의 특정 시점부터 시작
73+ current_time = datetime .now (timezone .utc ) - timedelta (days = random .randint (1 , 30 ))
74+
75+ for j in range (NUM_SEARCHES_PER_USER ):
76+ # 1. 검색 로그 생성
77+ search_query = random .choice (list (self .search_results_pool .keys ()))
78+ place_ids = self .search_results_pool [search_query ]
79+
80+ # 검색 시간은 현재 시간
81+ search_time = current_time
82+ search_log = {
83+ "userId" : str (user_id ),
84+ "query" : search_query ,
85+ "placeIds" : place_ids ,
86+ "timestamp" : search_time
87+ }
88+ success , _ = self .es_service .insert_search_log (search_log )
89+ if success :
90+ print (f" ({ j + 1 } ) 검색 로그 저장: '{ search_query } ' (결과 { len (place_ids )} 개)" )
91+ else :
92+ print (f" ({ j + 1 } ) 검색 로그 저장 실패" )
93+
94+ # 2. 클릭 로그 생성 (현재 검색과 다음 검색 사이)
95+
96+ # 다음 검색을 위한 시간 간격 미리 설정
97+ time_to_next_search = timedelta (minutes = random .randint (5 , 120 ))
98+
99+ num_clicks = random .randint (* NUM_CLICKS_PER_SEARCH )
100+ # 검색 결과 수보다 많이 클릭할 수 없음
101+ num_clicks = min (num_clicks , len (place_ids ))
102+
103+ clicked_places = random .sample (place_ids , num_clicks )
104+
105+ print (f" -> { num_clicks } 개의 클릭 로그 생성 시뮬레이션..." )
106+ for k , place_id in enumerate (clicked_places ):
107+ # 클릭 시간: 검색 시간 이후, 다음 검색 시간 이전
108+ # 클릭 사이의 시간 간격을 랜덤하게 부여
109+ time_after_search = timedelta (seconds = random .randint (10 , 200 ))
110+ click_time = search_time + time_after_search
111+
112+ # 현재 시간을 클릭 시간 이후로 업데이트
113+ current_time = click_time
114+
115+ click_log = {
116+ "userId" : str (user_id ),
117+ "placeId" : place_id ,
118+ "timestamp" : click_time
119+ }
120+ success , _ = self .es_service .insert_click_log (click_log )
121+
122+ # 3. 다음 검색을 위해 시간 점프
123+ current_time += time_to_next_search
124+
125+ print (f"--- 사용자 '{ user_id } ' 작업 완료 ---\n " )
126+ print ("모든 더미 데이터 생성이 완료되었습니다." )
127+
128+ async def main ():
129+ generator = DummyDataGenerator (es_host = ES_HOST , user_data_path = USER_DATA_PATH )
130+ await generator .generate ()
131+
132+ if __name__ == "__main__" :
133+ asyncio .run (main ())
0 commit comments